OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ShmemDataLink Class Reference

#include <ShmemDataLink.h>

Inheritance diagram for OpenDDS::DCPS::ShmemDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemDataLink:
Collaboration graph
[legend]

Public Member Functions

 ShmemDataLink (const ShmemTransport_rch &transport)
 
bool open (const std::string &peer_address)
 
int make_reservation (const GUID_t &remote_pub, const GUID_t &local_sub, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
void request_ack_received (ReceivedDataSample &sample)
 
void stop_resend_association_msgs (const GUID_t &local, const GUID_t &remote)
 
void control_received (ReceivedDataSample &sample)
 
std::string local_address ()
 
std::string peer_address ()
 
pid_t peer_pid ()
 
ShmemAllocatorlocal_allocator ()
 
ShmemAllocatorpeer_allocator ()
 
void read ()
 
void signal_semaphore ()
 
ShmemTransport_rch transport () const
 
ShmemInst_rch config () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
virtual RemoveResult remove_sample (const DataSampleElement *sample)
 
virtual void remove_all_msgs (const GUID_t &pub_id)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
virtual void pre_stop_i ()
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &, const GUID_t &) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual void stop_i ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Protected Attributes

ShmemSendStrategy_rch send_strategy_
 
ShmemReceiveStrategy_rch recv_strategy_
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Private Types

typedef std::set< GuidPairAssocResends
 
typedef PmfPeriodicTask< ShmemDataLinkSmPeriodicTask
 

Private Member Functions

void send_association_msg (const GUID_t &local, const GUID_t &remote)
 
void resend_association_msgs (const MonotonicTimePoint &now)
 

Private Attributes

std::string peer_address_
 
ShmemAllocatorpeer_alloc_
 
ACE_Thread_Mutex peer_alloc_mutex_
 
ReactorTask_rch reactor_task_
 
ACE_Thread_Mutex assoc_resends_mutex_
 
AssocResends assoc_resends_
 
DCPS::RcHandle< SmPeriodicTaskassoc_resends_task_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 

Detailed Description

Definition at line 50 of file ShmemDataLink.h.

Member Typedef Documentation

◆ AssocResends

Definition at line 96 of file ShmemDataLink.h.

◆ SmPeriodicTask

Definition at line 98 of file ShmemDataLink.h.

Constructor & Destructor Documentation

◆ ShmemDataLink()

OpenDDS::DCPS::ShmemDataLink::ShmemDataLink ( const ShmemTransport_rch transport)

Definition at line 34 of file ShmemDataLink.cpp.

36  0, // priority
37  false, // is_loopback,
38  false) // is_active
39  , send_strategy_(make_rch<ShmemSendStrategy>(this))
40  , recv_strategy_(make_rch<ShmemReceiveStrategy>(this))
41  , peer_alloc_(0)
42  , reactor_task_(transport->reactor_task())
43 {
44 }
ShmemReceiveStrategy_rch recv_strategy_
Definition: ShmemDataLink.h:82
ShmemAllocator * peer_alloc_
Definition: ShmemDataLink.h:91
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
ShmemSendStrategy_rch send_strategy_
Definition: ShmemDataLink.h:81
ReactorTask_rch reactor_task_
Definition: ShmemDataLink.h:93
ShmemTransport_rch transport() const

Member Function Documentation

◆ config()

ShmemInst_rch OpenDDS::DCPS::ShmemDataLink::config ( ) const

Definition at line 283 of file ShmemDataLink.cpp.

References OpenDDS::DCPS::dynamic_rchandle_cast(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, and transport().

Referenced by open().

284 {
285  return dynamic_rchandle_cast<ShmemInst>(transport()->config());
286 }
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
ShmemTransport_rch transport() const

◆ control_received()

void OpenDDS::DCPS::ShmemDataLink::control_received ( ReceivedDataSample sample)

Definition at line 218 of file ShmemDataLink.cpp.

Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample().

219 {
220 }

◆ local_address()

std::string OpenDDS::DCPS::ShmemDataLink::local_address ( )

Definition at line 263 of file ShmemDataLink.cpp.

References OPENDDS_TEST_AND_CALL_ASSIGN, and transport().

Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::read().

264 {
265  std::string result;
267  return result;
268 }
RcHandle< ShmemTransport > ShmemTransport_rch
ShmemTransport_rch transport() const
#define OPENDDS_TEST_AND_CALL_ASSIGN(TYPE, TEST, CALL, VAL)
Definition: Definitions.h:70

◆ local_allocator()

ShmemAllocator * OpenDDS::DCPS::ShmemDataLink::local_allocator ( )

Definition at line 255 of file ShmemDataLink.cpp.

References OPENDDS_TEST_AND_CALL_ASSIGN, and transport().

Referenced by OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i(), and OpenDDS::DCPS::ShmemSendStrategy::start_i().

256 {
257  ShmemAllocator* result = 0;
259  return result;
260 }
RcHandle< ShmemTransport > ShmemTransport_rch
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
ShmemTransport_rch transport() const
#define OPENDDS_TEST_AND_CALL_ASSIGN(TYPE, TEST, CALL, VAL)
Definition: Definitions.h:70

◆ make_reservation()

int OpenDDS::DCPS::ShmemDataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 114 of file ShmemDataLink.cpp.

References ACE_GUARD_RETURN, assoc_resends_, assoc_resends_mutex_, OpenDDS::DCPS::DataLink::make_reservation(), and send_association_msg().

116 {
117  const int result = DataLink::make_reservation(remote_pub, local_sub, receive_listener, reliable);
118  if (result != 0) {
119  return result;
120  }
121 
122  // Tell writer we are ready and resend that message until we get a response.
124  if (assoc_resends_.insert(GuidPair(local_sub, remote_pub)).second) {
125  send_association_msg(local_sub, remote_pub);
126  }
127 
128  return 0;
129 }
void send_association_msg(const GUID_t &local, const GUID_t &remote)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
ACE_Thread_Mutex assoc_resends_mutex_
Definition: ShmemDataLink.h:95

◆ open()

bool OpenDDS::DCPS::ShmemDataLink::open ( const std::string &  peer_address)

Definition at line 47 of file ShmemDataLink.cpp.

References ACE_DEFAULT_PAGEFILE_POOL_CHUNK, ACE_ERROR_RETURN, ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, ACE_TEXT_CreateFileMapping, assoc_resends_task_, ACE_String_Base< char >::c_str(), config(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), OpenDDS::DCPS::ReactorTask::interceptor(), LM_DEBUG, LM_ERROR, name, peer_address(), peer_address_, peer_alloc_, reactor_task_, recv_strategy_, OpenDDS::DCPS::ref(), resend_association_msgs(), send_strategy_, OpenDDS::DCPS::DataLink::start(), stop_i(), and VDBG_LVL.

48 {
51 
52 #ifdef OPENDDS_SHMEM_WINDOWS
54  const ACE_TString name_under = name + ACE_TEXT('_');
55  // Find max size of peer's pool so enough local address space is reserved.
56  HANDLE fm = ACE_TEXT_CreateFileMapping(INVALID_HANDLE_VALUE, 0, PAGE_READONLY,
57  0, ACE_DEFAULT_PAGEFILE_POOL_CHUNK, name_under.c_str());
58  void* view;
59  if (fm == 0 || (view = MapViewOfFile(fm, FILE_MAP_READ, 0, 0, 0)) == 0) {
60  stop_i();
61  ACE_ERROR_RETURN((LM_ERROR,
62  ACE_TEXT("(%P|%t) ERROR: ShmemDataLink::open: ")
63  ACE_TEXT("peer's shared memory area not found (%C)\n"),
64  peer_address.c_str()),
65  false);
66  }
67  // location of max_size_ in ctrl block: a size_t after two void*s
68  const size_t* pmax = (const size_t*)(((void**)view) + 2);
69  alloc_opts.max_size_ = *pmax;
70  UnmapViewOfFile(view);
71  CloseHandle(fm);
72 #endif
73 
74  peer_alloc_ = new ShmemAllocator(name.c_str(), 0 /*lock_name*/
75 #ifdef OPENDDS_SHMEM_WINDOWS
76  , &alloc_opts
77 #endif
78  );
79 
80  if (-1 == peer_alloc_->find("Semaphore")) {
81  stop_i();
82  ACE_ERROR_RETURN((LM_ERROR,
83  ACE_TEXT("(%P|%t) ERROR: ShmemDataLink::open: ")
84  ACE_TEXT("peer's shared memory area not found (%C)\n"),
85  peer_address.c_str()),
86  false);
87  }
88 
89  if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_),
90  static_rchandle_cast<TransportStrategy>(recv_strategy_),
91  false)
92  != 0) {
93  stop_i();
94  ACE_ERROR_RETURN((LM_ERROR,
95  ACE_TEXT("(%P|%t) ERROR: ")
96  ACE_TEXT("ShmemDataLink::open: start failed!\n")),
97  false);
98  }
99 
100  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemDataLink::open: link %@ open to peer %C\n",
101  this, peer_address_.c_str()), 1);
102 
103  assoc_resends_task_ = make_rch<SmPeriodicTask>(reactor_task_->interceptor(),
105  ShmemInst_rch cfg = config();
106  if (!cfg) {
107  return false;
108  }
109  assoc_resends_task_->enable(false, cfg->association_resend_period());
110 
111  return true;
112 }
const char * c_str(void) const
ACE_MEM_POOL_OPTIONS MEMORY_POOL_OPTIONS
void resend_association_msgs(const MonotonicTimePoint &now)
ShmemReceiveStrategy_rch recv_strategy_
Definition: ShmemDataLink.h:82
DCPS::RcHandle< SmPeriodicTask > assoc_resends_task_
Definition: ShmemDataLink.h:99
ShmemAllocator * peer_alloc_
Definition: ShmemDataLink.h:91
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
int find(const char *name, void *&pointer)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
RcHandle< ShmemInst > ShmemInst_rch
Definition: ShmemInst_rch.h:18
ShmemInst_rch config() const
#define ACE_TEXT_CreateFileMapping
ShmemSendStrategy_rch send_strategy_
Definition: ShmemDataLink.h:81
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
ReactorTask_rch reactor_task_
Definition: ShmemDataLink.h:93
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define ACE_ERROR_RETURN(X, Y)

◆ peer_address()

ACE_INLINE std::string OpenDDS::DCPS::ShmemDataLink::peer_address ( )

Definition at line 15 of file ShmemDataLink.inl.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and peer_address_.

Referenced by open(), and OpenDDS::DCPS::ShmemSendStrategy::start_i().

16 {
17  return this->peer_address_;
18 }

◆ peer_allocator()

ShmemAllocator * OpenDDS::DCPS::ShmemDataLink::peer_allocator ( )

◆ peer_pid()

pid_t OpenDDS::DCPS::ShmemDataLink::peer_pid ( )

Definition at line 277 of file ShmemDataLink.cpp.

References peer_address_.

Referenced by OpenDDS::DCPS::ShmemSendStrategy::start_i().

278 {
279  return std::atoi(peer_address_.c_str() + peer_address_.find('-') + 1);
280 }

◆ read()

void OpenDDS::DCPS::ShmemDataLink::read ( void  )
inline

Definition at line 75 of file ShmemDataLink.h.

◆ request_ack_received()

void OpenDDS::DCPS::ShmemDataLink::request_ack_received ( ReceivedDataSample sample)

Definition at line 185 of file ShmemDataLink.cpp.

References OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::guid_cdr_size, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::DCPS::GuidConverter::isWriter(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::publication_id_, recv_strategy_, send_association_msg(), OpenDDS::DCPS::DataSampleHeader::sequence_, stop_resend_association_msgs(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::to_msgblock(), and VDBG.

Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample().

186 {
187  if (sample.header_.sequence_ == -1 && sample.header_.message_length_ == guid_cdr_size) {
188  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::request_ack_received: association msg\n"));
189  GUID_t local;
190  Message_Block_Ptr payload(recv_strategy_->to_msgblock(sample));
191  Serializer ser(payload.get(), encoding_unaligned_native);
192  if (ser >> local) {
193  const GUID_t& remote = sample.header_.publication_id_;
194  const GuidConverter local_gc(local);
195  const bool local_is_writer = local_gc.isWriter();
196  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::request_ack_received: "
197  "association msg from remote %C %C to local %C %C\n",
198  local_is_writer ? "reader" : "writer", LogGuid(remote).c_str(),
199  local_is_writer ? "writer" : "reader", std::string(local_gc).c_str()));
200  if (local_is_writer) {
201  // Reader has signaled it's ready to receive messages.
202  if (invoke_on_start_callbacks(local, remote, true)) {
203  // In case we're getting duplicates, only acknowledge if we can invoke
204  // the on start callback, which should only happen once.
205  send_association_msg(local, remote);
206  }
207  } else {
208  // Writer has responded to association ack, stop sending.
209  stop_resend_association_msgs(local, remote);
210  }
211  }
212  return;
213  }
214  data_received(sample);
215 }
void send_association_msg(const GUID_t &local, const GUID_t &remote)
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
ACE_Message_Block * to_msgblock(const ReceivedDataSample &sample)
ShmemReceiveStrategy_rch recv_strategy_
Definition: ShmemDataLink.h:82
const size_t guid_cdr_size
Definition: GuidUtils.h:115
#define VDBG(DBG_ARGS)
void stop_resend_association_msgs(const GUID_t &local, const GUID_t &remote)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690

◆ resend_association_msgs()

void OpenDDS::DCPS::ShmemDataLink::resend_association_msgs ( const MonotonicTimePoint now)
private

Definition at line 166 of file ShmemDataLink.cpp.

References ACE_GUARD, assoc_resends_, assoc_resends_mutex_, LM_DEBUG, send_association_msg(), and VDBG.

Referenced by open().

167 {
168  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::resend_association_msgs\n"));
169 
171  for (AssocResends::iterator i = assoc_resends_.begin(); i != assoc_resends_.end(); ++i) {
172  send_association_msg(i->local, i->remote);
173  }
174 }
void send_association_msg(const GUID_t &local, const GUID_t &remote)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define VDBG(DBG_ARGS)
ACE_Thread_Mutex assoc_resends_mutex_
Definition: ShmemDataLink.h:95

◆ send_association_msg()

void OpenDDS::DCPS::ShmemDataLink::send_association_msg ( const GUID_t local,
const GUID_t remote 
)
private

Definition at line 132 of file ShmemDataLink.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), OpenDDS::DCPS::guid_cdr_size, OpenDDS::DCPS::TransportSendStrategy::link_released(), LM_DEBUG, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::DataLink::send_i(), send_strategy_, OpenDDS::DCPS::DataSampleHeader::sequence_, VDBG, and ACE_Time_Value::zero.

Referenced by make_reservation(), request_ack_received(), and resend_association_msgs().

133 {
134  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::send_association_msg from %C to %C\n",
135  LogGuid(local).c_str(), LogGuid(remote).c_str()));
136 
137  DataSampleHeader header_data;
138  header_data.message_id_ = REQUEST_ACK;
139  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
140  header_data.message_length_ = guid_cdr_size;
141  header_data.sequence_ = -1;
142  header_data.publication_id_ = local;
143  header_data.publisher_id_ = remote;
144 
145  Message_Block_Ptr message(
146  new ACE_Message_Block(header_data.get_max_serialized_size(),
148  0, //cont
149  0, //data
150  0, //allocator_strategy
151  0, //locking_strategy
155  0,
156  0));
157 
158  *message << header_data;
159  Serializer ser(message.get(), encoding_unaligned_native);
160  ser << remote;
162  TransportControlElement* send_element = new TransportControlElement(move(message));
163  this->send_i(send_element, false);
164 }
static const ACE_Time_Value max_time
const size_t guid_cdr_size
Definition: GuidUtils.h:115
#define ACE_CDR_BYTE_ORDER
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
#define VDBG(DBG_ARGS)
ShmemSendStrategy_rch send_strategy_
Definition: ShmemDataLink.h:81
static const ACE_Time_Value zero
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY

◆ signal_semaphore()

void OpenDDS::DCPS::ShmemDataLink::signal_semaphore ( )

Definition at line 271 of file ShmemDataLink.cpp.

References OPENDDS_TEST_AND_CALL, and transport().

Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes().

272 {
274 }
RcHandle< ShmemTransport > ShmemTransport_rch
ShmemTransport_rch transport() const
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
Definition: Definitions.h:69

◆ stop_i()

void OpenDDS::DCPS::ShmemDataLink::stop_i ( )
protectedvirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 223 of file ShmemDataLink.cpp.

References ACE_GUARD, assoc_resends_, assoc_resends_mutex_, assoc_resends_task_, peer_alloc_, peer_alloc_mutex_, and ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::release().

Referenced by open().

224 {
225  {
227  assoc_resends_.clear();
228  assoc_resends_task_->disable();
229  }
230 
231  {
233  if (peer_alloc_) {
234  peer_alloc_->release(0 /*don't close*/);
235  }
236  delete peer_alloc_;
237  peer_alloc_ = 0;
238  }
239 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::RcHandle< SmPeriodicTask > assoc_resends_task_
Definition: ShmemDataLink.h:99
ShmemAllocator * peer_alloc_
Definition: ShmemDataLink.h:91
int release(int close=0)
ACE_Thread_Mutex peer_alloc_mutex_
Definition: ShmemDataLink.h:92
ACE_Thread_Mutex assoc_resends_mutex_
Definition: ShmemDataLink.h:95

◆ stop_resend_association_msgs()

void OpenDDS::DCPS::ShmemDataLink::stop_resend_association_msgs ( const GUID_t local,
const GUID_t remote 
)

Definition at line 176 of file ShmemDataLink.cpp.

References ACE_GUARD, assoc_resends_, assoc_resends_mutex_, LM_DEBUG, and VDBG.

Referenced by request_ack_received().

177 {
178  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::stop_resend_association_msgs: "
179  "local %C remote %C\n", LogGuid(local).c_str(), LogGuid(remote).c_str()));
181  assoc_resends_.erase(GuidPair(local, remote));
182 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define VDBG(DBG_ARGS)
ACE_Thread_Mutex assoc_resends_mutex_
Definition: ShmemDataLink.h:95

◆ transport()

RcHandle< ShmemTransport > OpenDDS::DCPS::ShmemDataLink::transport ( void  ) const

Definition at line 242 of file ShmemDataLink.cpp.

References OpenDDS::DCPS::dynamic_rchandle_cast(), and OpenDDS::DCPS::DataLink::impl().

Referenced by config(), local_address(), local_allocator(), and signal_semaphore().

243 {
244  return dynamic_rchandle_cast<ShmemTransport>(impl());
245 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214

Member Data Documentation

◆ assoc_resends_

AssocResends OpenDDS::DCPS::ShmemDataLink::assoc_resends_
private

◆ assoc_resends_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::ShmemDataLink::assoc_resends_mutex_
private

◆ assoc_resends_task_

DCPS::RcHandle<SmPeriodicTask> OpenDDS::DCPS::ShmemDataLink::assoc_resends_task_
private

Definition at line 99 of file ShmemDataLink.h.

Referenced by open(), and stop_i().

◆ peer_address_

std::string OpenDDS::DCPS::ShmemDataLink::peer_address_
private

Definition at line 90 of file ShmemDataLink.h.

Referenced by open(), peer_address(), and peer_pid().

◆ peer_alloc_

ShmemAllocator* OpenDDS::DCPS::ShmemDataLink::peer_alloc_
private

Definition at line 91 of file ShmemDataLink.h.

Referenced by open(), peer_allocator(), and stop_i().

◆ peer_alloc_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::ShmemDataLink::peer_alloc_mutex_
private

Definition at line 92 of file ShmemDataLink.h.

Referenced by peer_allocator(), and stop_i().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::ShmemDataLink::reactor_task_
private

Definition at line 93 of file ShmemDataLink.h.

Referenced by open().

◆ recv_strategy_

ShmemReceiveStrategy_rch OpenDDS::DCPS::ShmemDataLink::recv_strategy_
protected

Definition at line 82 of file ShmemDataLink.h.

Referenced by open(), and request_ack_received().

◆ send_strategy_

ShmemSendStrategy_rch OpenDDS::DCPS::ShmemDataLink::send_strategy_
protected

Definition at line 81 of file ShmemDataLink.h.

Referenced by open(), and send_association_msg().


The documentation for this class was generated from the following files: