OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Protected Types | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::DataLink Class Reference

#include <DataLink.h>

Inheritance diagram for OpenDDS::DCPS::DataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataLink:
Collaboration graph
[legend]

Classes

class  ImmediateStart
 
class  Interceptor
 
struct  LocalAssociationInfo
 

Public Types

enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 

Public Member Functions

 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
virtual int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
virtual RemoveResult remove_sample (const DataSampleElement *sample)
 
virtual void remove_all_msgs (const GUID_t &pub_id)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
virtual void pre_stop_i ()
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &, const GUID_t &) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Types

typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 

Protected Member Functions

int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
virtual void stop_i ()
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Static Protected Member Functions

static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 

Protected Attributes

TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 

Private Member Functions

const char * connection_notice_as_str (ConnectionNotice notice)
 Helper function to output the enum as a string to help debugging. More...
 
TransportSendListener_rch send_listener_for (const GUID_t &pub_id) const
 
TransportReceiveListener_rch recv_listener_for (const GUID_t &sub_id) const
 
void prepare_release ()
 
virtual bool handle_send_request_ack (TransportQueueElement *element)
 
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
 
virtual void release_remote_i (const GUID_t &)
 
virtual void release_reservations_i (const GUID_t &, const GUID_t &)
 
void data_received_i (ReceivedDataSample &sample, const GUID_t &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
 
void notify_reactor ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap
 Map publication Id value to TransportSendListener. More...
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap
 Map subscription Id value to TransportReceieveListener. More...
 
typedef OPENDDS_MAP_CMP (GUID_t, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote
 
typedef OPENDDS_MAP_CMP (GUID_t, LocalAssociationInfo, GUID_tKeyLessThan) AssocByLocal
 

Private Attributes

bool stopped_
 
MonotonicTimePoint scheduled_to_stop_at_
 
IdToSendListenerMap send_listeners_
 
IdToRecvListenerMap recv_listeners_
 
TransportReceiveListener_wrch default_listener_
 
LockType pub_sub_maps_lock_
 
AssocByRemote assoc_by_remote_
 
AssocByLocal assoc_by_local_
 
WeakRcHandle< TransportImplimpl_
 A weak rchandle to the TransportImpl that created this DataLink. More...
 
ACE_UINT64 id_
 The id for this DataLink. More...
 
unique_ptr< ThreadPerConnectionSendTaskthr_per_con_send_task_
 
AssocByLocal assoc_releasing_
 
Priority transport_priority_
 TRANSPORT_PRIORITY value associated with the link. More...
 
bool scheduling_release_
 

Friends

class DataLinkCleanupTask
 
class ThreadPerConnectionSendTask
 
OpenDDS_Dcps_Export std::ostreamoperator<< (std::ostream &str, const DataLink &value)
 Convenience function for diagnostic information. More...
 

Additional Inherited Members

- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 

Detailed Description

This class manages the reservations based on the associations between datareader and datawriter. It basically delegate the samples to send strategy for sending and deliver the samples received by receive strategy to the listener.

Notes about object ownership: 1) Own the send strategy object and receive strategy object. 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection is enabled.

Definition at line 73 of file DataLink.h.

Member Typedef Documentation

◆ GuardType

Definition at line 437 of file DataLink.h.

◆ LockType

Definition at line 377 of file DataLink.h.

◆ OnStartCallback

Definition at line 259 of file DataLink.h.

◆ TransportClient_wrch

Definition at line 258 of file DataLink.h.

Member Enumeration Documentation

◆ ConnectionNotice

Enumerator
DISCONNECTED 
RECONNECTED 
LOST 

Definition at line 80 of file DataLink.h.

Constructor & Destructor Documentation

◆ DataLink()

OpenDDS::DCPS::DataLink::DataLink ( const TransportImpl_rch impl,
Priority  priority,
bool  is_loopback,
bool  is_active 
)

Only called by our TransportImpl object.

A DataLink object is always created by a TransportImpl object. Thus, the TransportImpl object passed-in here is the object that created this DataLink. The ability to specify a priority for individual links is included for construction so its value can be available for activating any threads.

Definition at line 42 of file DataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportImpl::config(), OpenDDS::DCPS::TransportInst::datalink_control_chunks_, datalink_release_delay(), OpenDDS::DCPS::TransportInst::datalink_release_delay_, datalink_release_delay_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportInst::DEFAULT_DATALINK_CONTROL_CHUNKS, OpenDDS::DCPS::TransportInst::DEFAULT_DATALINK_RELEASE_DELAY, OpenDDS::DCPS::TimeDuration::from_msec(), get_next_datalink_id(), id_, LM_DEBUG, LM_ERROR, mb_allocator_, thr_per_con_send_task_, OpenDDS::DCPS::TransportInst::thread_per_connection_, and ThreadPerConnectionSendTask.

44  : stopped_(false),
45  impl_(impl),
47  scheduling_release_(false),
50  started_(false),
51  send_response_listener_("DataLink"),
53 {
54  DBG_ENTRY_LVL("DataLink", "DataLink", 6);
55 
57 
59  size_t control_chunks = TransportInst::DEFAULT_DATALINK_CONTROL_CHUNKS;
60 
61  TransportInst_rch cfg = impl->config();
62  if (cfg) {
63  datalink_release_delay = cfg->datalink_release_delay_;
64  if (cfg->thread_per_connection_) {
66 
67  if (thr_per_con_send_task_->open() == -1) {
68  ACE_ERROR((LM_ERROR,
69  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
70  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
71 
72  } else if (DCPS_debug_level > 4) {
73  ACE_DEBUG((LM_DEBUG,
74  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
75  ACE_TEXT("started new thread to send data with.\n")));
76  }
77  }
78  control_chunks = cfg->datalink_control_chunks_;
79  }
80 
81  // Initialize transport control sample allocators:
82  datalink_release_delay_ = TimeDuration::from_msec(datalink_release_delay);
83 
84  this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
85  this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
86 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static ACE_UINT64 get_next_datalink_id()
Used to provide unique Ids to all DataLink methods.
Definition: DataLink.cpp:805
const TimeDuration & datalink_release_delay() const
Definition: DataLink.inl:62
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_control.
Definition: DataLink.h:467
ACE_thread_t reactor_owner() const
ACE_Reactor * reactor() const
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463
friend class ThreadPerConnectionSendTask
Definition: DataLink.h:326
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
static const long DEFAULT_DATALINK_RELEASE_DELAY
Definition: TransportInst.h:67
Priority transport_priority_
TRANSPORT_PRIORITY value associated with the link.
Definition: DataLink.h:431
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
static const size_t DEFAULT_DATALINK_CONTROL_CHUNKS
Definition: TransportInst.h:68
virtual int priority(void) const
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
TransportInst_rch config() const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
static TimeDuration from_msec(const ACE_UINT64 &ms)
ACE_TEXT("TCP_Factory")
Interceptor interceptor_
Definition: DataLink.h:469
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
TimeDuration datalink_release_delay_
Definition: DataLink.h:453
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: DataLink.h:457
unique_ptr< DataBlockAllocator > db_allocator_
Definition: DataLink.h:458
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420

◆ ~DataLink()

OpenDDS::DCPS::DataLink::~DataLink ( )
virtual

Definition at line 88 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), assoc_by_local_, DBG_ENTRY_LVL, LM_WARNING, and thr_per_con_send_task_.

89 {
90  DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
91 
92  if (!assoc_by_local_.empty()) {
93  ACE_DEBUG((LM_WARNING,
94  ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
95  ACE_TEXT("link still in use by %d entities when deleted!\n"),
96  this, assoc_by_local_.size()));
97  }
98 
99  if (this->thr_per_con_send_task_ != 0) {
100  this->thr_per_con_send_task_->close(1);
101  }
102 }
#define ACE_DEBUG(X)
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ add_on_start_callback()

bool OpenDDS::DCPS::DataLink::add_on_start_callback ( const TransportClient_wrch client,
const GUID_t remote 
)

Definition at line 111 of file DataLink.cpp.

References OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), OpenDDS::DCPS::GUID_UNKNOWN, interceptor_, OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), on_start_callbacks_, pending_on_starts_, ACE_Guard< ACE_LOCK >::release(), send_strategy_, started_, and strategy_lock_.

Referenced by OpenDDS::DCPS::TcpTransport::accept_datalink(), OpenDDS::DCPS::RtpsUdpDataLink::associated(), and OpenDDS::DCPS::TcpTransport::connect_datalink().

112 {
113  const DataLink_rch link(this, inc_count());
114 
115  TransportClient_rch client_lock = client.lock();
116  const GUID_t client_id = client_lock ? client_lock->get_guid() : GUID_UNKNOWN;
117 
118  GuardType guard(strategy_lock_);
119 
120  if (client_lock) {
121  PendingOnStartsMap::iterator it = pending_on_starts_.find(remote);
122  if (it != pending_on_starts_.end()) {
123  RepoIdSet::iterator it2 = it->second.find(client_id);
124  if (it2 != it->second.end()) {
125  it->second.erase(it2);
126  if (it->second.empty()) {
127  pending_on_starts_.erase(it);
128  }
129  guard.release();
130  interceptor_.execute_or_enqueue(make_rch<ImmediateStart>(link, client, remote));
131  } else {
132  on_start_callbacks_[remote][client_id] = client;
133  }
134  } else {
135  on_start_callbacks_[remote][client_id] = client;
136  }
137  }
138 
139  if (started_ && !send_strategy_.is_nil()) {
140  return false; // link already started
141  }
142  return true;
143 }
RcHandle< TransportClient > TransportClient_rch
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
PendingOnStartsMap pending_on_starts_
Definition: DataLink.h:449
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
Interceptor interceptor_
Definition: DataLink.h:469
CommandPtr execute_or_enqueue(CommandPtr command)

◆ cancel_release()

bool OpenDDS::DCPS::DataLink::cancel_release ( )

Definition at line 601 of file DataLink.cpp.

References ACE_DEBUG, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, scheduling_release_, set_scheduling_release(), stopped_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by OpenDDS::DCPS::TcpTransport::find_datalink_i().

602 {
603  DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
604  if (stopped_) {
605  if (DCPS_debug_level > 0) {
606  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
607  }
608  return false;
609  }
610  if (scheduling_release_) {
611  if (DCPS_debug_level > 0) {
612  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
613  }
614  this->set_scheduling_release(false);
616  notify_reactor();
617  }
618  return true;
619 }
#define ACE_DEBUG(X)
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void set_scheduling_release(bool scheduling_release)
Definition: DataLink.inl:161
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ clear_associations()

void OpenDDS::DCPS::DataLink::clear_associations ( )

This is called by DataLinkCleanupTask thread to remove the associations based on the snapshot in release_resources().

Definition at line 1059 of file DataLink.cpp.

References assoc_releasing_, recv_listener_for(), OpenDDS::DCPS::TransportReceiveListener::remove_associations(), send_listener_for(), and OpenDDS::DCPS::set_to_seq().

Referenced by OpenDDS::DCPS::TransportImpl::DoClear::handle_event().

1060 {
1061  for (AssocByLocal::iterator iter = assoc_releasing_.begin();
1062  iter != assoc_releasing_.end(); ++iter) {
1063  TransportSendListener_rch tsl = send_listener_for(iter->first);
1064  if (tsl) {
1065  ReaderIdSeq sub_ids;
1066  set_to_seq(iter->second.associated_, sub_ids);
1067  tsl->remove_associations(sub_ids, false);
1068  continue;
1069  }
1071  if (trl) {
1072  WriterIdSeq pub_ids;
1073  set_to_seq(iter->second.associated_, pub_ids);
1074  trl->remove_associations(pub_ids, false);
1075  }
1076  }
1077  assoc_releasing_.clear();
1078 }
RcHandle< TransportSendListener > TransportSendListener_rch
TransportSendListener_rch send_listener_for(const GUID_t &pub_id) const
Definition: DataLink.inl:324
void set_to_seq(const RepoIdSet &rids, Seq &seq)
Definition: DataLink.cpp:480
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
sequence< GUID_t > WriterIdSeq
sequence< GUID_t > ReaderIdSeq
TransportReceiveListener_rch recv_listener_for(const GUID_t &sub_id) const
Definition: DataLink.inl:338
AssocByLocal assoc_releasing_
Definition: DataLink.h:428

◆ connection_notice_as_str()

ACE_INLINE const char * OpenDDS::DCPS::DataLink::connection_notice_as_str ( ConnectionNotice  notice)
private

Helper function to output the enum as a string to help debugging.

Definition at line 267 of file DataLink.inl.

References ACE_INLINE.

Referenced by notify().

268 {
269  static const char* NoticeStr[] = { "DISCONNECTED",
270  "RECONNECTED",
271  "LOST"
272  };
273 
274  return NoticeStr [notice];
275 }

◆ create_control()

ACE_Message_Block * OpenDDS::DCPS::DataLink::create_control ( char  submessage_id,
DataSampleHeader header,
Message_Block_Ptr  data 
)

This allows a subclass to easily create a transport control sample to send via send_control.

Definition at line 628 of file DataLink.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_DES_FREE, ACE_ERROR, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), LM_ERROR, ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::DataSampleHeader::submessage_id_, ACE_Message_Block::total_length(), OpenDDS::DCPS::TRANSPORT_CONTROL, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

631 {
632  DBG_ENTRY_LVL("DataLink", "create_control", 6);
633 
634  header.byte_order_ = ACE_CDR_BYTE_ORDER;
635  header.message_id_ = TRANSPORT_CONTROL;
636  header.submessage_id_ = submessage_id;
637  header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
638 
639  ACE_Message_Block* message = 0;
640  ACE_NEW_MALLOC_RETURN(message,
641  static_cast<ACE_Message_Block*>(
642  this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
643  ACE_Message_Block(header.get_max_serialized_size(),
645  data.release(),
646  0, // data
647  0, // allocator_strategy
648  0, // locking_strategy
652  this->db_allocator_.get(),
653  this->mb_allocator_.get()),
654  0);
655 
656  if (!(*message << header)) {
657  ACE_ERROR((LM_ERROR,
658  ACE_TEXT("(%P|%t) DataLink::create_control: ")
659  ACE_TEXT("cannot put header in message\n")));
660  ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
661  message = 0;
662  }
663 
664  return message;
665 }
#define ACE_ERROR(X)
static const ACE_Time_Value max_time
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_CDR_BYTE_ORDER
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: DataLink.h:457
unique_ptr< DataBlockAllocator > db_allocator_
Definition: DataLink.h:458
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY

◆ customize_queue_element()

virtual TransportQueueElement* OpenDDS::DCPS::DataLink::customize_queue_element ( TransportQueueElement element)
inlineprivatevirtual

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 360 of file DataLink.h.

Referenced by send().

362  {
363  return element;
364  }

◆ data_received()

int OpenDDS::DCPS::DataLink::data_received ( ReceivedDataSample sample,
const GUID_t readerId = GUID_UNKNOWN 
)

This is called by our TransportReceiveStrategy object when it has received a complete data sample. This method will cause the appropriate TransportReceiveListener objects to be told that data_received(). If readerId is not GUID_UNKNOWN, only the TransportReceiveListener with that ID (if one exists) will receive the data.

This method will "deliver" the sample to all TransportReceiveListeners within this DataLink that are interested in the (remote) publisher id that sent the sample.

Definition at line 690 of file DataLink.cpp.

References data_received_i(), and OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED.

Referenced by OpenDDS::DCPS::ReliableSession::deliver_held_data(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::deliver_held_data(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::ShmemDataLink::request_ack_received(), and OpenDDS::DCPS::MulticastDataLink::sample_received().

692 {
694  return 0;
695 }
GuidSet RepoIdSet
Definition: GuidUtils.h:113
void data_received_i(ReceivedDataSample &sample, const GUID_t &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
Definition: DataLink.cpp:704

◆ data_received_i()

void OpenDDS::DCPS::DataLink::data_received_i ( ReceivedDataSample sample,
const GUID_t readerId,
const RepoIdSet incl_excl,
ReceiveListenerSet::ConstrainReceiveSet  constrain 
)
private

Definition at line 704 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), assoc_by_remote_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::DataSampleHeader::content_filter_entries_, OpenDDS::DCPS::TransportReceiveListener::data_received(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, default_listener_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceiveListenerSet::remove_all(), OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED, OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by data_received(), and data_received_include().

708 {
709  DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
710  // Which remote publication sent this message?
711  const GUID_t& publication_id = sample.header_.publication_id_;
712 
713  // Locate the set of TransportReceiveListeners associated with this
714  // DataLink that are interested in hearing about any samples received
715  // from the remote publisher_id.
716  if (DCPS_debug_level > 9) {
717  const GuidConverter converter(publication_id);
718  const GuidConverter reader(readerId);
719  ACE_DEBUG((LM_DEBUG,
720  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
721  ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
722  OPENDDS_STRING(converter).c_str(),
723  to_string(sample.header_).c_str(),
724  OPENDDS_STRING(reader).c_str(),
725  constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
726  }
727 
728  if (Transport_debug_level > 9) {
729  const GuidConverter converter(publication_id);
730  ACE_DEBUG((LM_DEBUG,
731  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
732  ACE_TEXT("from publication %C received sample: %C.\n"),
733  OPENDDS_STRING(converter).c_str(),
734  to_string(sample.header_).c_str()));
735  }
736 
737  ReceiveListenerSet_rch listener_set;
739  {
740  GuardType guard(this->pub_sub_maps_lock_);
741  AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
742  if (iter != assoc_by_remote_.end()) {
743  listener_set = iter->second;
744  } else {
745  listener = this->default_listener_.lock();
746  }
747  }
748 
749  if (listener_set.is_nil()) {
750  if (listener) {
751  listener->data_received(sample);
752  } else {
753  // Nobody has any interest in this message. Drop it on the floor.
754  if (Transport_debug_level > 4) {
755  const GuidConverter converter(publication_id);
756  ACE_DEBUG((LM_DEBUG,
757  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
758  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
759  OPENDDS_STRING(converter).c_str()));
760  }
761  }
762  return;
763  }
764 
765  if (readerId != GUID_UNKNOWN) {
766  listener_set->data_received(sample, readerId);
767  return;
768  }
769 
770 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
771 
772  if (sample.header_.content_filter_
773  && sample.header_.content_filter_entries_.length()) {
774  ReceiveListenerSet subset(*listener_set.in());
775  subset.remove_all(sample.header_.content_filter_entries_);
776  subset.data_received(sample, incl_excl, constrain);
777 
778  } else {
779 #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
780 
781  if (DCPS_debug_level > 9) {
782  // Just get the set to do our dirty work by having it iterate over its
783  // collection of TransportReceiveListeners, and invoke the data_received()
784  // method on each one.
785  OPENDDS_STRING included_ids;
786  bool first = true;
787  RepoIdSet::const_iterator iter = incl_excl.begin();
788  while(iter != incl_excl.end()) {
789  included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
790  first = false;
791  ++iter;
792  }
793  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
794  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
795  }
796  listener_set->data_received(sample, incl_excl, constrain);
797 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
798  }
799 
800 #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
801 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
virtual void data_received(const ReceivedDataSample &sample)=0
RcHandle< ReceiveListenerSet > ReceiveListenerSet_rch
The type definition for the smart-pointer to the underlying type.
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< T > lock() const
Definition: RcObject.h:188
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
#define OPENDDS_STRING
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401
const char * to_string(MessageId value)
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ data_received_include()

void OpenDDS::DCPS::DataLink::data_received_include ( ReceivedDataSample sample,
const RepoIdSet incl 
)

Varation of data_received() that allows for excluding a subset of readers by specifying which readers specifically should receive. Any reader ID that does not appear in the include set will be skipped.

Definition at line 698 of file DataLink.cpp.

References data_received_i(), OpenDDS::DCPS::GUID_UNKNOWN, and OpenDDS::DCPS::ReceiveListenerSet::SET_INCLUDED.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

699 {
701 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void data_received_i(ReceivedDataSample &sample, const GUID_t &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
Definition: DataLink.cpp:704

◆ datalink_release_delay()

ACE_INLINE const TimeDuration & OpenDDS::DCPS::DataLink::datalink_release_delay ( ) const

Definition at line 62 of file DataLink.inl.

References ACE_INLINE, and datalink_release_delay_.

Referenced by DataLink(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

63 {
64  return this->datalink_release_delay_;
65 }
TimeDuration datalink_release_delay_
Definition: DataLink.h:453

◆ default_listener() [1/2]

ACE_INLINE void OpenDDS::DCPS::DataLink::default_listener ( const TransportReceiveListener_wrch trl)

Definition at line 352 of file DataLink.inl.

References ACE_INLINE, default_listener_, and pub_sub_maps_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::configure_i().

353 {
354  GuardType guard(this->pub_sub_maps_lock_);
355  this->default_listener_ = trl;
356 }
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401

◆ default_listener() [2/2]

ACE_INLINE TransportReceiveListener_wrch OpenDDS::DCPS::DataLink::default_listener ( ) const

Definition at line 360 of file DataLink.inl.

References ACE_INLINE, default_listener_, and pub_sub_maps_lock_.

361 {
362  GuardType guard(this->pub_sub_maps_lock_);
363  return this->default_listener_;
364 }
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401

◆ get_ice_endpoint()

virtual WeakRcHandle<ICE::Endpoint> OpenDDS::DCPS::DataLink::get_ice_endpoint ( ) const
inlinevirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 287 of file DataLink.h.

287 { return WeakRcHandle<ICE::Endpoint>(); }

◆ get_next_datalink_id()

ACE_UINT64 OpenDDS::DCPS::DataLink::get_next_datalink_id ( )
staticprotected

Used to provide unique Ids to all DataLink methods.

Definition at line 805 of file DataLink.cpp.

References ACE_ERROR, ACE_TEXT(), id(), and LM_ERROR.

Referenced by DataLink().

806 {
807  static ACE_UINT64 next_id = 0;
808  static LockType lock;
809 
810  ACE_UINT64 id;
811  {
812  GuardType guard(lock);
813  id = next_id++;
814 
815  if (0 == next_id) {
816  ACE_ERROR((LM_ERROR,
817  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
818  ACE_TEXT("has rolled over and is reusing ids!\n")));
819  }
820  }
821 
822  return id;
823 }
#define ACE_ERROR(X)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
Definition: DataLink.inl:205
ACE_TEXT("TCP_Factory")
unsigned long long ACE_UINT64
ACE_SYNCH_MUTEX LockType
Definition: DataLink.h:377

◆ get_send_strategy()

ACE_INLINE TransportSendStrategy_rch OpenDDS::DCPS::DataLink::get_send_strategy ( )
protected

Definition at line 374 of file DataLink.inl.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, send_strategy_, and strategy_lock_.

Referenced by make_reservation(), remove_all_msgs(), remove_sample(), resume_send(), schedule_delayed_release(), send_i(), send_start_i(), send_stop_i(), terminate_send(), and terminate_send_if_suspended().

375 {
376  GuardType guard(strategy_lock_);
377  return send_strategy_;
378 }
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440

◆ handle_close()

int OpenDDS::DCPS::DataLink::handle_close ( ACE_HANDLE  h,
ACE_Reactor_Mask  m 
)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 1101 of file DataLink.cpp.

References handle_timeout(), TheServiceParticipant, ACE_Event_Handler::TIMER_MASK, and ACE_Time_Value::zero.

1102 {
1103  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1104 
1105  if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
1106  // Reactor is shutting down with this timer still pending.
1107  // Take the same cleanup actions as if the timeout had expired.
1109  }
1110 
1111  return 0;
1112 }
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Definition: DataLink.cpp:1081
static const ACE_Time_Value zero
#define TheServiceParticipant

◆ handle_exception()

int OpenDDS::DCPS::DataLink::handle_exception ( ACE_HANDLE  )
virtual

Reactor invokes this after being notified in schedule_stop or cancel_release.

Reimplemented from ACE_Event_Handler.

Definition at line 270 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Reactor_Timer_Interface::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, handle_timeout(), impl(), impl_, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), ACE_Event_Handler::reactor(), ACE_Reactor_Timer_Interface::schedule_timer(), scheduled_to_stop_at_, scheduling_release_, stop(), TheServiceParticipant, OpenDDS::DCPS::TransportImpl::timer(), OpenDDS::DCPS::TimeDuration::value(), and ACE_Time_Value::zero.

271 {
272  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
273 
276  if (DCPS_debug_level > 0) {
277  ACE_DEBUG((LM_DEBUG,
278  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
279  }
280  TransportImpl_rch impl = impl_.lock();
281  if (impl) {
282  ACE_Reactor_Timer_Interface* reactor = impl->timer();
283  if (reactor && reactor->cancel_timer(this) > 0) {
284  if (DCPS_debug_level > 0) {
285  ACE_DEBUG((LM_DEBUG,
286  ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
287  }
288  }
289  }
290  return 0;
291  } else if (scheduled_to_stop_at_ <= now) {
292  if (this->scheduling_release_) {
293  if (DCPS_debug_level > 0) {
294  ACE_DEBUG((LM_DEBUG,
295  ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
296  }
298  return 0;
299  }
300  if (DCPS_debug_level > 0) {
301  ACE_DEBUG((LM_DEBUG,
302  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
303  }
304  this->stop();
305  return 0;
306  } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
307  if (DCPS_debug_level > 0) {
308  ACE_DEBUG((LM_DEBUG,
309  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
310  }
311  TransportImpl_rch impl = impl_.lock();
312  if (impl) {
313  ACE_Reactor_Timer_Interface* reactor = impl->timer();
314  const TimeDuration future_release_time = scheduled_to_stop_at_ - now;
315  reactor->schedule_timer(this, 0, future_release_time.value());
316  }
317  }
318  return 0;
319 }
#define ACE_DEBUG(X)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
ACE_TEXT("TCP_Factory")
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Definition: DataLink.cpp:1081
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
static const ACE_Time_Value zero
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
#define TheServiceParticipant
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355

◆ handle_send_request_ack()

bool OpenDDS::DCPS::DataLink::handle_send_request_ack ( TransportQueueElement element)
privatevirtual

Reimplemented in OpenDDS::DCPS::TcpDataLink.

Definition at line 1191 of file DataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered().

Referenced by send().

1192 {
1193  element->data_delivered();
1194  return true;
1195 }

◆ handle_timeout()

int OpenDDS::DCPS::DataLink::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 1081 of file DataLink.cpp.

References assoc_by_local_, assoc_by_remote_, impl(), impl_, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, scheduled_to_stop_at_, stop(), TheServiceParticipant, OpenDDS::DCPS::TransportImpl::unbind_link(), and VDBG_LVL.

Referenced by handle_close(), and handle_exception().

1082 {
1083  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1084 
1085  if (!scheduled_to_stop_at_.is_zero()) {
1086  VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
1087  {
1088  TransportImpl_rch impl = impl_.lock();
1089  if (impl) {
1090  impl->unbind_link(this);
1091  }
1092  }
1093  if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
1094  this->stop();
1095  }
1096  }
1097  return 0;
1098 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define TheServiceParticipant
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355

◆ id()

ACE_INLINE DataLinkIdType OpenDDS::DCPS::DataLink::id ( void  ) const

Obtain a unique identifier for this DataLink object.

Definition at line 205 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, and id_.

Referenced by get_next_datalink_id(), OpenDDS::DCPS::DataLinkSet::insert_link(), and OpenDDS::DCPS::DataLinkSet::remove_link().

206 {
207  DBG_ENTRY_LVL("DataLink","id",6);
208  return id_;
209 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420

◆ impl()

TransportImpl_rch OpenDDS::DCPS::DataLink::impl ( void  ) const

◆ invoke_on_start_callbacks() [1/2]

void OpenDDS::DCPS::DataLink::invoke_on_start_callbacks ( bool  success)

Definition at line 194 of file DataLink.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::WeakRcHandle< T >::lock(), on_start_callbacks_, ACE_Guard< ACE_LOCK >::release(), and strategy_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::associated(), OpenDDS::DCPS::TcpTransport::async_connect_failed(), OpenDDS::DCPS::TcpTransport::connect_datalink(), OpenDDS::DCPS::TcpDataLink::do_association_actions(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_i(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::ShmemDataLink::request_ack_received(), start(), and OpenDDS::DCPS::MulticastSession::synack_received().

195 {
196  const DataLink_rch link(success ? this : 0, inc_count());
197 
198  while (true) {
199  GuardType guard(strategy_lock_);
200 
201  if (on_start_callbacks_.empty()) {
202  break;
203  }
204 
205  GUID_t remote = GUID_UNKNOWN;
206  TransportClient_wrch client;
207  OnStartCallbackMap::iterator it = on_start_callbacks_.begin();
208  if (it != on_start_callbacks_.end()) {
209  remote = it->first;
210  RepoToClientMap::iterator it2 = it->second.begin();
211  if (it2 != it->second.end()) {
212  client = it2->second;
213  it->second.erase(it2);
214  if (it->second.empty()) {
215  on_start_callbacks_.erase(it);
216  }
217  }
218  }
219 
220  guard.release();
221  if (success) {
222  TransportClient_rch client_lock = client.lock();
223  if (client_lock) {
224  client_lock->use_datalink(remote, link);
225  }
226  }
227  }
228 }
WeakRcHandle< TransportClient > TransportClient_wrch
Definition: DataLink.h:258
RcHandle< TransportClient > TransportClient_rch
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447

◆ invoke_on_start_callbacks() [2/2]

bool OpenDDS::DCPS::DataLink::invoke_on_start_callbacks ( const GUID_t local,
const GUID_t remote,
bool  success 
)

Definition at line 230 of file DataLink.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), on_start_callbacks_, pending_on_starts_, and strategy_lock_.

231 {
232  const DataLink_rch link(success ? this : 0, inc_count());
233 
234  TransportClient_wrch client;
235  bool made_callback = false;
236 
237  {
238  GuardType guard(strategy_lock_);
239 
240  OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
241  if (it != on_start_callbacks_.end()) {
242  RepoToClientMap::iterator it2 = it->second.find(local);
243  if (it2 != it->second.end()) {
244  client = it2->second;
245  it->second.erase(it2);
246  if (it->second.empty()) {
247  on_start_callbacks_.erase(it);
248  }
249  } else {
250  pending_on_starts_[remote].insert(local);
251  }
252  } else {
253  pending_on_starts_[remote].insert(local);
254  }
255  }
256 
257  if (success) {
258  TransportClient_rch client_lock = client.lock();
259  if (client_lock) {
260  client_lock->use_datalink(remote, link);
261  made_callback = true;
262  }
263  }
264 
265  return made_callback;
266 }
WeakRcHandle< TransportClient > TransportClient_wrch
Definition: DataLink.h:258
RcHandle< TransportClient > TransportClient_rch
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
PendingOnStartsMap pending_on_starts_
Definition: DataLink.h:449
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447

◆ is_active() [1/2]

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_active ( )

◆ is_active() [2/2]

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_active ( ) const

Definition at line 56 of file DataLink.inl.

References ACE_INLINE, and is_active_.

57 {
58  return this->is_active_;
59 }
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463

◆ is_leading()

virtual bool OpenDDS::DCPS::DataLink::is_leading ( const GUID_t ,
const GUID_t  
) const
inlinevirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 289 of file DataLink.h.

290  { return false; }

◆ is_loopback() [1/2]

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_loopback ( void  )

Definition at line 35 of file DataLink.inl.

References ACE_INLINE, and is_loopback_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

36 {
37  return this->is_loopback_;
38 }
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461

◆ is_loopback() [2/2]

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_loopback ( void  ) const

Definition at line 42 of file DataLink.inl.

References ACE_INLINE, and is_loopback_.

43 {
44  return this->is_loopback_;
45 }
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461

◆ is_target()

bool OpenDDS::DCPS::DataLink::is_target ( const GUID_t remote_id)

This is called on publisher side to see if this link communicates with the provided sub or by the subscriber side to see if this link communicates with the provided pub

Definition at line 1013 of file DataLink.cpp.

References assoc_by_remote_, and pub_sub_maps_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i().

1014 {
1015  GuardType guard(this->pub_sub_maps_lock_);
1016  return assoc_by_remote_.count(remote_id);
1017 }
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406

◆ make_reservation() [1/2]

int OpenDDS::DCPS::DataLink::make_reservation ( const GUID_t remote_subscription_id,
const GUID_t local_publication_id,
const TransportSendListener_wrch send_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented in OpenDDS::DCPS::TcpDataLink.

Definition at line 398 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), assoc_by_local_, assoc_by_remote_, OpenDDS::DCPS::DataLink::LocalAssociationInfo::associated_, OpenDDS::DCPS::LogGuid::c_str(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportSendStrategy::link_released(), LM_DEBUG, pub_sub_maps_lock_, OpenDDS::DCPS::DataLink::LocalAssociationInfo::reliable_, and send_listeners_.

Referenced by OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::ShmemDataLink::make_reservation(), OpenDDS::DCPS::TcpDataLink::make_reservation(), OpenDDS::DCPS::MulticastDataLink::make_reservation(), and OpenDDS::DCPS::RtpsUdpDataLink::make_reservation().

402 {
403  DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
404 
405  if (DCPS_debug_level > 9) {
406  LogGuid local_log(local_publication_id), remote_log(remote_subscription_id);
407  ACE_DEBUG((LM_DEBUG,
408  ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
409  ACE_TEXT("creating association local publication %C ")
410  ACE_TEXT("<--> with remote subscription %C.\n"),
411  local_log .c_str(),
412  remote_log.c_str()));
413  }
414 
416 
417  if (strategy) {
418  strategy->link_released(false);
419  }
420 
421  {
423 
424  LocalAssociationInfo& info = assoc_by_local_[local_publication_id];
425  info.reliable_ = reliable;
426  info.associated_.insert(remote_subscription_id);
427  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
428 
429  if (rls.is_nil())
430  rls = make_rch<ReceiveListenerSet>();
431  rls->insert(local_publication_id, TransportReceiveListener_rch());
432 
433  send_listeners_.insert(std::make_pair(local_publication_id, send_listener));
434  }
435  return 0;
436 }
#define ACE_DEBUG(X)
RcHandle< ReceiveListenerSet > ReceiveListenerSet_rch
The type definition for the smart-pointer to the underlying type.
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392

◆ make_reservation() [2/2]

int OpenDDS::DCPS::DataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::TcpDataLink, and OpenDDS::DCPS::ShmemDataLink.

Definition at line 439 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), assoc_by_local_, assoc_by_remote_, OpenDDS::DCPS::DataLink::LocalAssociationInfo::associated_, OpenDDS::DCPS::LogGuid::c_str(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportSendStrategy::link_released(), LM_DEBUG, pub_sub_maps_lock_, recv_listeners_, and OpenDDS::DCPS::DataLink::LocalAssociationInfo::reliable_.

443 {
444  DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
445 
446  if (DCPS_debug_level > 9) {
447  LogGuid local(local_subscription_id), remote(remote_publication_id);
448  ACE_DEBUG((LM_DEBUG,
449  ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
450  ACE_TEXT("creating association local subscription %C ")
451  ACE_TEXT("<--> with remote publication %C.\n"),
452  local.c_str(), remote.c_str()));
453  }
454 
456 
457  if (strategy) {
458  strategy->link_released(false);
459  }
460 
461  {
463 
464  LocalAssociationInfo& info = assoc_by_local_[local_subscription_id];
465  info.reliable_ = reliable;
466  info.associated_.insert(remote_publication_id);
467  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
468 
469  if (rls.is_nil())
470  rls = make_rch<ReceiveListenerSet>();
471  rls->insert(local_subscription_id, receive_listener);
472 
473  recv_listeners_.insert(std::make_pair(local_subscription_id,
474  receive_listener));
475  }
476  return 0;
477 }
#define ACE_DEBUG(X)
RcHandle< ReceiveListenerSet > ReceiveListenerSet_rch
The type definition for the smart-pointer to the underlying type.
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396

◆ network_change()

void OpenDDS::DCPS::DataLink::network_change ( ) const
protected

Definition at line 1212 of file DataLink.cpp.

References pub_sub_maps_lock_, recv_listeners_, send_listeners_, and OpenDDS::DCPS::TransportReceiveListener::transport_discovery_change().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::on_data_available().

1213 {
1214  IdToSendListenerMap send_listeners;
1215  IdToRecvListenerMap recv_listeners;
1216  {
1218  send_listeners = send_listeners_;
1219  recv_listeners = recv_listeners_;
1220  }
1221  for (IdToSendListenerMap::const_iterator itr = send_listeners.begin();
1222  itr != send_listeners.end(); ++itr) {
1223  TransportSendListener_rch tsl = itr->second.lock();
1224  if (tsl) {
1225  tsl->transport_discovery_change();
1226  }
1227  }
1228 
1229  for (IdToRecvListenerMap::const_iterator itr = recv_listeners.begin();
1230  itr != recv_listeners.end(); ++itr) {
1231  TransportReceiveListener_rch trl = itr->second.lock();
1232  if (trl) {
1234  }
1235  }
1236 }
RcHandle< TransportSendListener > TransportSendListener_rch
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392

◆ notify()

void OpenDDS::DCPS::DataLink::notify ( ConnectionNotice  notice)

Notify the datawriters and datareaders that the connection is disconnected, lost, or reconnected. The datareader/datawriter will notify the corresponding listener.

Definition at line 848 of file DataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), assoc_by_local_, connection_notice_as_str(), DBG_ENTRY_LVL, DISCONNECTED, LM_DEBUG, LM_ERROR, LOST, OpenDDS::DCPS::TransportReceiveListener::notify_subscription_disconnected(), OpenDDS::DCPS::TransportReceiveListener::notify_subscription_lost(), OpenDDS::DCPS::TransportReceiveListener::notify_subscription_reconnected(), OPENDDS_STRING, pub_sub_maps_lock_, RECONNECTED, recv_listeners_, send_listeners_, OpenDDS::DCPS::set_to_seq(), OpenDDS::DCPS::Transport_debug_level, and VDBG.

Referenced by OpenDDS::DCPS::TcpConnection::active_reconnect_i(), OpenDDS::DCPS::TcpConnection::active_reconnect_open(), OpenDDS::DCPS::TcpConnection::handle_close(), OpenDDS::DCPS::TcpConnection::notify_connection_lost(), OpenDDS::DCPS::TcpConnection::passive_reconnect_i(), and OpenDDS::DCPS::TcpConnection::transfer().

849 {
850  DBG_ENTRY_LVL("DataLink", "notify", 6);
851 
852  VDBG((LM_DEBUG,
853  ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
854  this,
855  connection_notice_as_str(notice)));
856 
857  GuardType guard(this->pub_sub_maps_lock_);
858 
859  // Notify the datawriters
860  // the lost publications due to a connection problem.
861  for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
862  itr != send_listeners_.end(); ++itr) {
863 
864  TransportSendListener_rch tsl = itr->second.lock();
865 
866  if (tsl) {
867  if (Transport_debug_level > 0) {
868  GuidConverter converter(itr->first);
869  ACE_DEBUG((LM_DEBUG,
870  ACE_TEXT("(%P|%t) DataLink::notify: ")
871  ACE_TEXT("notify pub %C %C.\n"),
872  OPENDDS_STRING(converter).c_str(),
873  connection_notice_as_str(notice)));
874  }
875  AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
876  if (local_it == assoc_by_local_.end()) {
877  if (Transport_debug_level) {
878  GuidConverter converter(itr->first);
879  ACE_DEBUG((LM_DEBUG,
880  ACE_TEXT("(%P|%t) DataLink::notify: ")
881  ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
882  OPENDDS_STRING(converter).c_str(),
883  connection_notice_as_str(notice)));
884  }
885  break;
886  }
887  const RepoIdSet& rids = local_it->second.associated_;
888 
889  ReaderIdSeq subids;
890  set_to_seq(rids, subids);
891 
892  switch (notice) {
893  case DISCONNECTED:
894  tsl->notify_publication_disconnected(subids);
895  break;
896 
897  case RECONNECTED:
898  tsl->notify_publication_reconnected(subids);
899  break;
900 
901  case LOST:
902  tsl->notify_publication_lost(subids);
903  break;
904 
905  default:
906  ACE_ERROR((LM_ERROR,
907  ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
908  ACE_TEXT("unknown notice to TransportSendListener\n")));
909  break;
910  }
911 
912  } else {
913  if (Transport_debug_level > 0) {
914  GuidConverter converter(itr->first);
915  ACE_DEBUG((LM_DEBUG,
916  ACE_TEXT("(%P|%t) DataLink::notify: ")
917  ACE_TEXT("not notify pub %C %C\n"),
918  OPENDDS_STRING(converter).c_str(),
919  connection_notice_as_str(notice)));
920  }
921  }
922  }
923 
924  // Notify the datareaders registered with TransportImpl
925  // the lost subscriptions due to a connection problem.
926  for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
927  itr != recv_listeners_.end(); ++itr) {
928 
929  TransportReceiveListener_rch trl = itr->second.lock();
930 
931  if (trl) {
932  if (Transport_debug_level > 0) {
933  GuidConverter converter(itr->first);
934  ACE_DEBUG((LM_DEBUG,
935  ACE_TEXT("(%P|%t) DataLink::notify: ")
936  ACE_TEXT("notify sub %C %C.\n"),
937  OPENDDS_STRING(converter).c_str(),
938  connection_notice_as_str(notice)));
939  }
940  AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
941  if (local_it == assoc_by_local_.end()) {
942  if (Transport_debug_level) {
943  GuidConverter converter(itr->first);
944  ACE_DEBUG((LM_DEBUG,
945  ACE_TEXT("(%P|%t) DataLink::notify: ")
946  ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
947  OPENDDS_STRING(converter).c_str(),
948  connection_notice_as_str(notice)));
949  }
950  break;
951  }
952  const RepoIdSet& rids = local_it->second.associated_;
953 
954  WriterIdSeq pubids;
955  set_to_seq(rids, pubids);
956 
957  switch (notice) {
958  case DISCONNECTED:
959  trl->notify_subscription_disconnected(pubids);
960  break;
961 
962  case RECONNECTED:
963  trl->notify_subscription_reconnected(pubids);
964  break;
965 
966  case LOST:
967  trl->notify_subscription_lost(pubids);
968  break;
969 
970  default:
971  ACE_ERROR((LM_ERROR,
972  ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
973  ACE_TEXT("unknown notice to datareader.\n")));
974  break;
975  }
976 
977  } else {
978  if (Transport_debug_level > 0) {
979  GuidConverter converter(itr->first);
980  ACE_DEBUG((LM_DEBUG,
981  ACE_TEXT("(%P|%t) DataLink::notify: ")
982  ACE_TEXT("not notify sub %C subscription lost.\n"),
983  OPENDDS_STRING(converter).c_str()));
984  }
985 
986  }
987  }
988 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
RcHandle< TransportSendListener > TransportSendListener_rch
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
GuidSet RepoIdSet
Definition: GuidUtils.h:113
const char * connection_notice_as_str(ConnectionNotice notice)
Helper function to output the enum as a string to help debugging.
Definition: DataLink.inl:267
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
#define OPENDDS_STRING
#define VDBG(DBG_ARGS)
void set_to_seq(const RepoIdSet &rids, Seq &seq)
Definition: DataLink.cpp:480
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
sequence< GUID_t > WriterIdSeq
ACE_TEXT("TCP_Factory")
sequence< GUID_t > ReaderIdSeq
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392

◆ notify_reactor()

void OpenDDS::DCPS::DataLink::notify_reactor ( void  )
private

Definition at line 340 of file DataLink.cpp.

References impl(), impl_, ACE_Reactor::notify(), ACE_Event_Handler::reactor(), and OpenDDS::DCPS::TransportImpl::reactor_task().

Referenced by cancel_release(), and schedule_stop().

341 {
342  TransportImpl_rch impl = impl_.lock();
343  if (impl) {
344  ReactorTask_rch rt(impl->reactor_task());
345  if (rt) {
346  ACE_Reactor* reactor = rt->get_reactor();
347  if (reactor) {
348  reactor->notify(this);
349  }
350  }
351  }
352 }
RcHandle< ReactorTask > ReactorTask_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual ACE_Reactor * reactor(void) const
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417

◆ OPENDDS_MAP_CMP() [1/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
TransportSendListener_wrch  ,
GUID_tKeyLessThan   
)
private

Map publication Id value to TransportSendListener.

◆ OPENDDS_MAP_CMP() [2/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
TransportReceiveListener_wrch  ,
GUID_tKeyLessThan   
)
private

Map subscription Id value to TransportReceieveListener.

◆ OPENDDS_MAP_CMP() [3/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
ReceiveListenerSet_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [4/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
LocalAssociationInfo  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [5/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
TransportClient_wrch  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [6/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RepoToClientMap  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [7/7]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RepoIdSet  ,
GUID_tKeyLessThan   
)
protected

◆ peer_ids()

GUIDSeq * OpenDDS::DCPS::DataLink::peer_ids ( const GUID_t local_id) const
protected

For a given local GUID_t (publication or subscription), return the list of remote peer GUID_ts (subscriptions or publications) that this link knows about due to make_reservation().

Definition at line 490 of file DataLink.cpp.

References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::set_to_seq().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), and OpenDDS::DCPS::RtpsUdpDataLink::get_addresses_i().

491 {
493 
494  const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
495 
496  if (iter == assoc_by_local_.end())
497  return 0;
498 
499  GUIDSeq_var result = new GUIDSeq;
500  set_to_seq(iter->second.associated_, static_cast<GUIDSeq&>(result));
501  return result._retn();
502 }
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
void set_to_seq(const RepoIdSet &rids, Seq &seq)
Definition: DataLink.cpp:480
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62

◆ pre_stop_i()

void OpenDDS::DCPS::DataLink::pre_stop_i ( )
virtual

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::TcpDataLink.

Definition at line 993 of file DataLink.cpp.

References thr_per_con_send_task_.

Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i(), and stop().

994 {
995  if (this->thr_per_con_send_task_ != 0) {
996  this->thr_per_con_send_task_->close(1);
997  }
998 }
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425

◆ prepare_release()

void OpenDDS::DCPS::DataLink::prepare_release ( )
private

Save current sub and pub association maps for releasing and create empty maps for new associations.

Definition at line 1045 of file DataLink.cpp.

References ACE_ERROR, ACE_TEXT(), assoc_by_local_, assoc_releasing_, LM_ERROR, and pub_sub_maps_lock_.

Referenced by release_resources().

1046 {
1047  GuardType guard(this->pub_sub_maps_lock_);
1048 
1049  if (!assoc_releasing_.empty()) {
1050  ACE_ERROR((LM_ERROR,
1051  ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
1052  ACE_TEXT("already prepared for release.\n")));
1053  return;
1054  }
1055 
1057 }
#define ACE_ERROR(X)
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
ACE_TEXT("TCP_Factory")
AssocByLocal assoc_releasing_
Definition: DataLink.h:428

◆ recv_listener_for()

ACE_INLINE TransportReceiveListener_rch OpenDDS::DCPS::DataLink::recv_listener_for ( const GUID_t sub_id) const
private

Definition at line 338 of file DataLink.inl.

References ACE_INLINE, and recv_listeners_.

Referenced by clear_associations().

339 {
340  // sub_map_ (and recv_listeners_) are already locked when entering this
341  // private method.
342  IdToRecvListenerMap::const_iterator found =
343  this->recv_listeners_.find(sub_id);
344  if (found == this->recv_listeners_.end()) {
346  }
347  return found->second.lock();
348 }
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396

◆ release_remote_i()

virtual void OpenDDS::DCPS::DataLink::release_remote_i ( const GUID_t )
inlineprivatevirtual

Reimplemented in OpenDDS::DCPS::MulticastDataLink.

Definition at line 366 of file DataLink.h.

Referenced by release_reservations().

366 {}

◆ release_reservations()

void OpenDDS::DCPS::DataLink::release_reservations ( GUID_t  remote_id,
GUID_t  local_id,
DataLinkSetMap &  released_locals 
)

This will release reservations that were made by one of the make_reservation() methods. All we know is that the supplied GUID_t is considered to be a remote id. It could be a remote subscriber or a remote publisher.

This gets invoked when a TransportClient::remove_associations() call has been made. Because this DataLink can be shared amongst different TransportClient objects, and different threads could be "managing" the different TransportClient objects, we need to make sure that this release_reservations() works in conjunction with a simultaneous call (in another thread) to one of this DataLink's make_reservation() methods.

Definition at line 512 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, impl(), impl_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::rchandle_from(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::release_datalink(), release_remote_i(), release_reservations_i(), remove_startup_callbacks(), stopped_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportClient::disassociate().

514 {
515  DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
516 
517  if (DCPS_debug_level > 9) {
518  GuidConverter local(local_id);
519  GuidConverter remote(remote_id);
520  ACE_DEBUG((LM_DEBUG,
521  ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
522  ACE_TEXT("releasing association local: %C ")
523  ACE_TEXT("<--> with remote %C.\n"),
524  OPENDDS_STRING(local).c_str(),
525  OPENDDS_STRING(remote).c_str()));
526  }
527 
528  remove_startup_callbacks(local_id, remote_id);
529 
530  //let the specific class release its reservations
531  //done this way to prevent deadlock of holding pub_sub_maps_lock_
532  //then obtaining a specific class lock in release_reservations_i
533  //which reverses lock ordering of the active send logic of needing
534  //the specific class lock before obtaining the over arching DataLink
535  //pub_sub_maps_lock_
536  this->release_reservations_i(remote_id, local_id);
537 
538  bool release_remote_required = false;
539  {
540  GuardType guard(this->pub_sub_maps_lock_);
541 
542  if (this->stopped_) return;
543 
544  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
545  if (rls->size() == 1) {
546  assoc_by_remote_.erase(remote_id);
547  release_remote_required = true;
548  } else {
549  rls->remove(local_id);
550  }
551  RepoIdSet& ris = assoc_by_local_[local_id].associated_;
552  if (ris.size() == 1) {
553  DataLinkSet_rch& links = released_locals[local_id];
554  if (links.is_nil()) {
555  links = make_rch<DataLinkSet>();
556  }
557  links->insert_link(rchandle_from(this));
558  assoc_by_local_.erase(local_id);
559  } else {
560  ris.erase(remote_id);
561  }
562 
563  if (assoc_by_local_.empty()) {
564  VDBG_LVL((LM_DEBUG,
565  ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
566  ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
567 
568  guard.release();
569  TransportImpl_rch impl = impl_.lock();
570  if (impl) {
571  impl->release_datalink(this);
572  }
573  }
574  }
575  if (release_remote_required) {
576  release_remote_i(remote_id);
577  }
578 }
#define ACE_DEBUG(X)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< ReceiveListenerSet > ReceiveListenerSet_rch
The type definition for the smart-pointer to the underlying type.
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
virtual void release_remote_i(const GUID_t &)
Definition: DataLink.h:366
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
#define OPENDDS_STRING
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLinkSet.h:27
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
#define VDBG_LVL(DBG_ARGS, LEVEL)
virtual void release_reservations_i(const GUID_t &, const GUID_t &)
Definition: DataLink.h:367
void remove_startup_callbacks(const GUID_t &local, const GUID_t &remote)
Definition: DataLink.cpp:146

◆ release_reservations_i()

virtual void OpenDDS::DCPS::DataLink::release_reservations_i ( const GUID_t ,
const GUID_t  
)
inlineprivatevirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::MulticastDataLink.

Definition at line 367 of file DataLink.h.

Referenced by release_reservations().

368  {}

◆ release_resources()

void OpenDDS::DCPS::DataLink::release_resources ( )

Definition at line 1001 of file DataLink.cpp.

References DBG_ENTRY_LVL, impl(), impl_, prepare_release(), and OpenDDS::DCPS::TransportImpl::release_link_resources().

Referenced by OpenDDS::DCPS::TcpConnection::tear_link().

1002 {
1003  DBG_ENTRY_LVL("DataLink", "release_resources", 6);
1004 
1005  this->prepare_release();
1006  TransportImpl_rch impl = impl_.lock();
1007  if (impl) {
1008  impl->release_link_resources(this);
1009  }
1010 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417

◆ remove_all_msgs()

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_all_msgs ( const GUID_t pub_id)
virtual

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 190 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs().

191 {
192  DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
193 
194  // This one is easy. Simply delegate to our TransportSendStrategy
195  // data member.
196 
198 
199  if (!strategy.is_nil()) {
200  strategy->remove_all_msgs(pub_id);
201  }
202 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ remove_listener()

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_listener ( const GUID_t local_id)

Either send or receive listener for this local_id should be removed from internal DataLink structures so it no longer receives events.

Definition at line 289 of file DataLink.inl.

References ACE_DEBUG, ACE_INLINE, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), LM_DEBUG, pub_sub_maps_lock_, recv_listeners_, send_listeners_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::TransportClient::disassociate().

290 {
292  {
293  IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
294  if (pos != send_listeners_.end()) {
295  send_listeners_.erase(pos);
296  if (Transport_debug_level > 5) {
297  LogGuid logger(local_id);
298  ACE_DEBUG((LM_DEBUG,
299  ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
300  ACE_TEXT("removed %C from send_listeners\n"),
301  logger.c_str()));
302  }
303  return;
304  }
305  }
306  {
307  IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
308  if (pos != recv_listeners_.end()) {
309  recv_listeners_.erase(pos);
310  if (Transport_debug_level > 5) {
311  LogGuid logger(local_id);
312  ACE_DEBUG((LM_DEBUG,
313  ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
314  ACE_TEXT("removed %C from recv_listeners\n"),
315  logger.c_str()));
316  }
317  return;
318  }
319  }
320 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
ACE_TEXT("TCP_Factory")
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392

◆ remove_on_start_callback()

void OpenDDS::DCPS::DataLink::remove_on_start_callback ( const TransportClient_wrch client,
const GUID_t remote 
)

Definition at line 173 of file DataLink.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), on_start_callbacks_, and strategy_lock_.

174 {
175  TransportClient_rch client_lock = client.lock();
176  if (client_lock) {
177  const GUID_t id = client_lock->get_guid();
178 
179  GuardType guard(strategy_lock_);
180  OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
181  if (it != on_start_callbacks_.end()) {
182  RepoToClientMap::iterator it2 = it->second.find(id);
183  if (it2 != it->second.end()) {
184  it->second.erase(it2);
185  if (it->second.empty()) {
186  on_start_callbacks_.erase(it);
187  }
188  }
189  }
190  }
191 }
RcHandle< TransportClient > TransportClient_rch
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447

◆ remove_sample()

ACE_INLINE RemoveResult OpenDDS::DCPS::DataLink::remove_sample ( const DataSampleElement sample)
virtual

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 167 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::TransportSendStrategy::remove_sample(), thr_per_con_send_task_, and VDBG.

168 {
169  DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
170 
171  if (this->thr_per_con_send_task_ != 0) {
172  const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
173  if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
174  VDBG((LM_DEBUG, "(%P|%t) DBG: "
175  "Removed sample from ThreadPerConnection queue.\n"));
176  return rr;
177  }
178  }
179 
181 
182  if (!strategy.is_nil()) {
183  return strategy->remove_sample(sample);
184  }
185 
186  return REMOVE_NOT_FOUND;
187 }
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
#define VDBG(DBG_ARGS)
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ remove_startup_callbacks()

void OpenDDS::DCPS::DataLink::remove_startup_callbacks ( const GUID_t local,
const GUID_t remote 
)

Definition at line 146 of file DataLink.cpp.

References on_start_callbacks_, pending_on_starts_, and strategy_lock_.

Referenced by release_reservations().

147 {
148  GuardType guard(strategy_lock_);
149 
150  OnStartCallbackMap::iterator oit = on_start_callbacks_.find(remote);
151  if (oit != on_start_callbacks_.end()) {
152  RepoToClientMap::iterator oit2 = oit->second.find(local);
153  if (oit2 != oit->second.end()) {
154  oit->second.erase(oit2);
155  if (oit->second.empty()) {
156  on_start_callbacks_.erase(oit);
157  }
158  }
159  }
160  PendingOnStartsMap::iterator pit = pending_on_starts_.find(remote);
161  if (pit != pending_on_starts_.end()) {
162  RepoIdSet::iterator pit2 = pit->second.find(local);
163  if (pit2 != pit->second.end()) {
164  pit->second.erase(pit2);
165  if (pit->second.empty()) {
166  pending_on_starts_.erase(pit);
167  }
168  }
169  }
170 }
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
PendingOnStartsMap pending_on_starts_
Definition: DataLink.h:449
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447

◆ replay_durable_data()

void OpenDDS::DCPS::DataLink::replay_durable_data ( const GUID_t local_pub_id,
const GUID_t remote_sub_id 
) const
protected

Definition at line 1239 of file DataLink.cpp.

References send_listener_for().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::ReplayDurableData::handle_event().

1240 {
1241  GuidConverter local(local_pub_id);
1242  GuidConverter remote(remote_sub_id);
1243  TransportSendListener_rch send_listener = send_listener_for(local_pub_id);
1244  if (send_listener) {
1245  send_listener->replay_durable_data_for(remote_sub_id);
1246  }
1247 }
RcHandle< TransportSendListener > TransportSendListener_rch
TransportSendListener_rch send_listener_for(const GUID_t &pub_id) const
Definition: DataLink.inl:324

◆ resume_send()

void OpenDDS::DCPS::DataLink::resume_send ( )

The resume_send is used in the case of reconnection on the subscriber's side.

Definition at line 388 of file DataLink.cpp.

References get_send_strategy(), OpenDDS::DCPS::TransportSendStrategy::isDirectMode(), and OpenDDS::DCPS::TransportSendStrategy::resume_send().

389 {
391 
392  if (strategy && strategy->isDirectMode()) {
393  strategy->resume_send();
394  }
395 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.

◆ schedule_delayed_release()

void OpenDDS::DCPS::DataLink::schedule_delayed_release ( )

Definition at line 581 of file DataLink.cpp.

References OpenDDS::DCPS::TransportSendStrategy::clear(), datalink_release_delay_, DBG_ENTRY_LVL, get_send_strategy(), LM_DEBUG, OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), schedule_stop(), and VDBG.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink().

582 {
583  DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
584 
585  VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
586 
587  // The samples have to be removed at this point, otherwise the samples
588  // can not be delivered when new association is added and still use
589  // this connection/datalink.
591 
592  if (strategy) {
593  strategy->clear(TransportSendStrategy::MODE_DIRECT);
594  }
595 
597  schedule_stop(future_release_time);
598 }
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define VDBG(DBG_ARGS)
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
void schedule_stop(const MonotonicTimePoint &schedule_to_stop_at)
Definition: DataLink.cpp:325
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TimeDuration datalink_release_delay_
Definition: DataLink.h:453

◆ schedule_stop()

void OpenDDS::DCPS::DataLink::schedule_stop ( const MonotonicTimePoint schedule_to_stop_at)

Allows DataLink::stop to be done on the reactor thread so that this thread avoids possibly deadlocking trying to access reactor to stop strategies or schedule timers

Definition at line 325 of file DataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, and stopped_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and schedule_delayed_release().

326 {
328  this->scheduled_to_stop_at_ = schedule_to_stop_at;
329  notify_reactor();
330  // reactor will invoke our DataLink::handle_exception()
331  } else {
332  if (DCPS_debug_level > 0) {
333  ACE_DEBUG((LM_DEBUG,
334  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
335  }
336  }
337 }
#define ACE_DEBUG(X)
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ send()

ACE_INLINE void OpenDDS::DCPS::DataLink::send ( TransportQueueElement element)

Definition at line 94 of file DataLink.inl.

References ACE_INLINE, customize_queue_element(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, handle_send_request_ack(), OpenDDS::DCPS::TransportQueueElement::is_request_ack(), OpenDDS::DCPS::SEND, send_i(), and thr_per_con_send_task_.

Referenced by send_control().

95 {
96  DBG_ENTRY_LVL("DataLink","send",6);
97 
98  if (element->is_request_ack() && handle_send_request_ack(element)) {
99  return;
100  }
101 
102  element = this->customize_queue_element(element);
103  if (!element) {
104  return;
105  }
106 
107  if (this->thr_per_con_send_task_ != 0) {
108  if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
109  element->data_dropped(true);
110  }
111 
112  } else {
113  this->send_i(element);
114 
115  }
116 }
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
virtual bool handle_send_request_ack(TransportQueueElement *element)
Definition: DataLink.cpp:1191
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual TransportQueueElement * customize_queue_element(TransportQueueElement *element)
Definition: DataLink.h:360

◆ send_control()

SendControlStatus OpenDDS::DCPS::DataLink::send_control ( const DataSampleHeader header,
Message_Block_Ptr  data 
)

This allows a subclass to send transport control samples over this DataLink. This is useful for sending transport-specific control messages between one or more endpoints under this DataLink's control.

Definition at line 668 of file DataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, send(), OpenDDS::DCPS::SEND_CONTROL_OK, send_response_listener_, send_start(), send_stop(), and OpenDDS::DCPS::SendResponseListener::track_message().

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

669 {
670  DBG_ENTRY_LVL("DataLink", "send_control", 6);
671 
672  TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
674  header, move(message));
675 
677 
678  GUID_t senderId(header.publication_id_);
679  send_start();
680  send(elem);
681  send_stop(senderId);
682 
683  return SEND_CONTROL_OK;
684 }
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_control.
Definition: DataLink.h:467
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void send(TransportQueueElement *element)
Definition: DataLink.inl:94
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void send_stop(GUID_t repoId)
Definition: DataLink.inl:135

◆ send_final_acks()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_final_acks ( const GUID_t readerid)
virtual

Definition at line 368 of file DataLink.inl.

References ACE_INLINE.

369 {
370 }

◆ send_i()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_i ( TransportQueueElement element,
bool  relink = true 
)
protectedvirtual

Reimplemented in OpenDDS::DCPS::TcpDataLink.

Definition at line 119 of file DataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, get_send_strategy(), and OpenDDS::DCPS::TransportSendStrategy::send().

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), send(), OpenDDS::DCPS::ShmemDataLink::send_association_msg(), and OpenDDS::DCPS::TcpDataLink::send_i().

120 {
121  DBG_ENTRY_LVL("DataLink","send_i",6);
122  // This one is easy. Simply delegate to our TransportSendStrategy
123  // data member.
124 
126 
127  if (strategy) {
128  strategy->send(element, relink);
129  } else {
130  element->data_dropped(true);
131  }
132 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_listener_for()

ACE_INLINE TransportSendListener_rch OpenDDS::DCPS::DataLink::send_listener_for ( const GUID_t pub_id) const
private

Definition at line 324 of file DataLink.inl.

References ACE_INLINE, and send_listeners_.

Referenced by clear_associations(), and replay_durable_data().

325 {
326  // pub_map_ (and send_listeners_) are already locked when entering this
327  // private method.
328  IdToSendListenerMap::const_iterator found =
329  this->send_listeners_.find(pub_id);
330  if (found == this->send_listeners_.end()) {
331  return TransportSendListener_rch();
332  }
333  return found->second.lock();
334 }
RcHandle< TransportSendListener > TransportSendListener_rch
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392

◆ send_start()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start ( )

Called by the TransportClient objects that reference this DataLink. Used by the TransportClient to send a sample, or to send a control message. These functions either give the request to the PerThreadConnectionSendTask when thread_per_connection configuration is true or just simply delegate to the send strategy.

Definition at line 68 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_START, send_start_i(), and thr_per_con_send_task_.

Referenced by send_control().

69 {
70  DBG_ENTRY_LVL("DataLink","send_start",6);
71 
72  if (this->thr_per_con_send_task_ != 0) {
73  this->thr_per_con_send_task_->add_request(SEND_START);
74 
75  } else
76  this->send_start_i();
77 }
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_start_i()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start_i ( )
protected

The implementation of the functions that accomplish the sample or control message delivery. They just simply delegate to the send strategy.

Definition at line 80 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and OpenDDS::DCPS::TransportSendStrategy::send_start().

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_start().

81 {
82  DBG_ENTRY_LVL("DataLink","send_start_i",6);
83  // This one is easy. Simply delegate to our TransportSendStrategy
84  // data member.
85 
87 
88  if (!strategy.is_nil()) {
89  strategy->send_start();
90  }
91 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_stop()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop ( GUID_t  repoId)

Definition at line 135 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_STOP, send_stop_i(), and thr_per_con_send_task_.

Referenced by send_control().

136 {
137  DBG_ENTRY_LVL("DataLink","send_stop",6);
138 
139  if (this->thr_per_con_send_task_ != 0) {
140  this->thr_per_con_send_task_->add_request(SEND_STOP);
141 
142  } else
143  this->send_stop_i(repoId);
144 }
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_stop_i()

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop_i ( GUID_t  repoId)
protected

Definition at line 147 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, get_send_strategy(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and OpenDDS::DCPS::TransportSendStrategy::send_stop().

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), send_stop(), and OpenDDS::DCPS::TcpDataLink::send_stop_i().

148 {
149  DBG_ENTRY_LVL("DataLink","send_stop_i",6);
150  // This one is easy. Simply delegate to our TransportSendStrategy
151  // data member.
152 
154 
155  if (!strategy.is_nil()) {
156  strategy->send_stop(repoId);
157  }
158 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ set_dscp_codepoint()

void OpenDDS::DCPS::DataLink::set_dscp_codepoint ( int  cp,
ACE_SOCK socket 
)

The following IPV6 code was lifted in spirit from the RTCORBA implementation of setting the DiffServ codepoint.

Definition at line 1115 of file DataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ENOTSUP, ACE_SOCK::get_local_addr(), ACE_Addr::get_type(), IPPROTO_IP, LM_DEBUG, LM_ERROR, and ACE_SOCK::set_option().

Referenced by OpenDDS::DCPS::TcpConnection::on_active_connection_established(), and OpenDDS::DCPS::UdpDataLink::open().

1116 {
1117  /**
1118  * The following IPV6 code was lifted in spirit from the RTCORBA
1119  * implementation of setting the DiffServ codepoint.
1120  */
1121  int result = 0;
1122 
1123  // Shift the code point up to bits, so that we only use the DS field
1124  int tos = cp << 2;
1125 
1126  const char* which = "IPV4 TOS";
1127 #if defined (ACE_HAS_IPV6)
1128  ACE_INET_Addr local_address;
1129 
1130  if (socket.get_local_addr(local_address) == -1) {
1131  return;
1132 
1133  } else if (local_address.get_type() == AF_INET6)
1134 #if !defined (IPV6_TCLASS)
1135  {
1136  if (DCPS_debug_level > 0) {
1137  ACE_ERROR((LM_ERROR,
1138  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
1139  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
1140  cp));
1141  }
1142 
1143  return;
1144  }
1145 
1146 #else /* IPV6_TCLASS */
1147  {
1148  which = "IPV6 TCLASS";
1149  result = socket.set_option(
1150  IPPROTO_IPV6,
1151  IPV6_TCLASS,
1152  &tos,
1153  sizeof(tos));
1154 
1155  } else // This is a bit tricky and might be hard to follow...
1156 
1157 #endif /* IPV6_TCLASS */
1158 #endif /* ACE_HAS_IPV6 */
1159 
1160 #ifdef IP_TOS
1161  result = socket.set_option(
1162  IPPROTO_IP,
1163  IP_TOS,
1164  &tos,
1165  sizeof(tos));
1166 
1167  if ((result == -1) && (errno != ENOTSUP)
1168 #ifdef WSAEINVAL
1169  && (errno != WSAEINVAL)
1170 #endif /* WSAINVAL */
1171  ) {
1172 #endif /* IP_TOS */
1173  ACE_DEBUG((LM_DEBUG,
1174  ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1175  ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
1176  ACE_TEXT("try running as superuser.\n"),
1177  which,
1178  cp));
1179 #ifdef IP_TOS
1180  } else if (DCPS_debug_level > 4) {
1181  ACE_DEBUG((LM_DEBUG,
1182  ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1183  ACE_TEXT("set %C codepoint to %d.\n"),
1184  which,
1185  cp));
1186  }
1187 #endif /* IP_TOS */
1188 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int set_option(int level, int option, void *optval, int optlen) const
int get_type(void) const
int get_local_addr(ACE_Addr &) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ set_scheduling_release()

ACE_INLINE void OpenDDS::DCPS::DataLink::set_scheduling_release ( bool  scheduling_release)

Definition at line 161 of file DataLink.inl.

References ACE_INLINE, and scheduling_release_.

Referenced by cancel_release(), OpenDDS::DCPS::TcpTransport::release_datalink(), and transport_shutdown().

162 {
163  this->scheduling_release_ = scheduling_release;
164 }

◆ start()

ACE_INLINE int OpenDDS::DCPS::DataLink::start ( const TransportSendStrategy_rch send_strategy,
const TransportStrategy_rch receive_strategy,
bool  invoke_all = true 
)
protected

This is how the subclass "announces" to this DataLink base class that this DataLink has now been "connected" and should start the supplied strategy objects. This start method is also going to keep a "copy" of the references to the strategy objects. Also note that it is acceptable to pass-in a NULL (0) TransportReceiveStrategy*, but it is assumed that the TransportSendStrategy* argument is not NULL.

If the start() method fails to start either strategy, then a -1 is returned. Otherwise, a 0 is returned. In the failure case, if one of the strategy objects was started successfully, then it will be stopped before the start() method returns -1.

Definition at line 212 of file DataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, invoke_on_start_callbacks(), OpenDDS::DCPS::RcHandle< T >::is_nil(), receive_strategy_, send_strategy_, OpenDDS::DCPS::TransportStrategy::start(), OpenDDS::DCPS::TransportSendStrategy::start(), started_, OpenDDS::DCPS::TransportSendStrategy::stop(), and strategy_lock_.

Referenced by OpenDDS::DCPS::TcpDataLink::connect(), OpenDDS::DCPS::MulticastDataLink::join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ShmemDataLink::open(), OpenDDS::DCPS::RtpsUdpDataLink::open(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_gap_i().

214 {
215  DBG_ENTRY_LVL("DataLink","start",6);
216 
217  // We assume that the send_strategy is not NULL, but the receive_strategy
218  // is allowed to be NULL.
219 
220  // Attempt to start the strategies, and if there is a start() failure,
221  // make sure to stop() any strategy that was already start()'ed.
222  if (send_strategy->start() != 0) {
223  // Failed to start the TransportSendStrategy.
225  return -1;
226  }
227 
228  if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
229  // Failed to start the TransportReceiveStrategy.
230 
231  // Remember to stop() the TransportSendStrategy since we did start it,
232  // and now need to "undo" that action.
233  send_strategy->stop();
235  return -1;
236  }
237 
238  // We started both strategy objects. Save them to data members since
239  // we will now take ownership of them.
240  {
241  GuardType guard(this->strategy_lock_);
242 
243  this->send_strategy_ = send_strategy;
244  this->receive_strategy_ = receive_strategy;
245  }
246  if (invoke_all) {
248  }
249  {
250  //catch any associations added during initial invoke_on_start_callbacks
251  //only after first use_datalink has resolved does datalink's state truly
252  //change to started, thus can't let pending associations proceed normally yet
253  GuardType guard(this->strategy_lock_);
254  this->started_ = true;
255  }
256  //Now state transitioned to started so no new on_start_callbacks will be added
257  //so resolve any added during transition to started.
258  if (invoke_all) {
260  }
261  return 0;
262 }
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ stop()

void OpenDDS::DCPS::DataLink::stop ( void  )

The stop method is used to stop the DataLink prior to shutdown.

Definition at line 355 of file DataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::is_nil(), pre_stop_i(), receive_strategy_, OpenDDS::DCPS::RcHandle< T >::reset(), scheduled_to_stop_at_, send_strategy_, OpenDDS::DCPS::TransportStrategy::stop(), OpenDDS::DCPS::TransportSendStrategy::stop(), stop_i(), stopped_, strategy_lock_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by handle_exception(), handle_timeout(), OpenDDS::DCPS::UdpTransport::release_datalink(), OpenDDS::DCPS::ShmemTransport::release_datalink(), and transport_shutdown().

356 {
357  pre_stop_i();
358 
359  TransportSendStrategy_rch send_strategy;
360  TransportStrategy_rch recv_strategy;
361 
362  {
363  GuardType guard(strategy_lock_);
364 
365  if (stopped_) return;
366 
367  send_strategy = send_strategy_;
369 
370  recv_strategy = receive_strategy_;
372  }
373 
374  if (!send_strategy.is_nil()) {
375  send_strategy->stop();
376  }
377 
378  if (!recv_strategy.is_nil()) {
379  recv_strategy->stop();
380  }
381 
382  stop_i();
383  stopped_ = true;
385 }
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
virtual void pre_stop_i()
Definition: DataLink.cpp:993
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
RcHandle< TransportStrategy > TransportStrategy_rch
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
virtual void stop_i()
Definition: DataLink.cpp:622

◆ stop_i()

void OpenDDS::DCPS::DataLink::stop_i ( )
protectedvirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::TcpDataLink, OpenDDS::DCPS::ShmemDataLink, and OpenDDS::DCPS::UdpDataLink.

Definition at line 622 of file DataLink.cpp.

References DBG_ENTRY_LVL.

Referenced by stop().

623 {
624  DBG_ENTRY_LVL("DataLink", "stop_i", 6);
625 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ target_intersection()

GUIDSeq * OpenDDS::DCPS::DataLink::target_intersection ( const GUID_t pub_id,
const GUIDSeq in,
size_t &  n_subs 
)

For a given publication "pub_id", store the total number of corresponding subscriptions in "n_subs" and given a set of subscriptions (the "in" sequence), return the subset of the input set "in" which are targets of this DataLink (see is_target()).

Definition at line 1020 of file DataLink.cpp.

References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::push_back().

1022 {
1023  GUIDSeq_var res;
1024  GuardType guard(this->pub_sub_maps_lock_);
1025  AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
1026 
1027  if (iter != assoc_by_local_.end()) {
1028  n_subs = iter->second.associated_.size();
1029  const CORBA::ULong len = in.length();
1030 
1031  for (CORBA::ULong i(0); i < len; ++i) {
1032  if (iter->second.associated_.count(in[i])) {
1033  if (res.ptr() == 0) {
1034  res = new GUIDSeq;
1035  }
1036 
1037  push_back(res.inout(), in[i]);
1038  }
1039  }
1040  }
1041 
1042  return res._retn();
1043 }
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
ACE_CDR::ULong ULong
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138

◆ terminate_send()

ACE_INLINE void OpenDDS::DCPS::DataLink::terminate_send ( )

Definition at line 279 of file DataLink.inl.

References ACE_INLINE, get_send_strategy(), and OpenDDS::DCPS::TransportSendStrategy::terminate_send().

280 {
282  if (strategy) {
283  strategy->terminate_send(false);
284  }
285 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.

◆ terminate_send_if_suspended()

void OpenDDS::DCPS::DataLink::terminate_send_if_suspended ( )

Definition at line 1275 of file DataLink.cpp.

References get_send_strategy(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OpenDDS::DCPS::TransportSendStrategy::terminate_send_if_suspended().

1276 {
1278 
1279  if (strategy) {
1280  strategy->terminate_send_if_suspended();
1281  }
1282 }
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.

◆ transport_priority() [1/2]

ACE_INLINE Priority & OpenDDS::DCPS::DataLink::transport_priority ( )

Accessors for the TRANSPORT_PRIORITY value associated with this link.

Definition at line 21 of file DataLink.inl.

References ACE_INLINE, and transport_priority_.

Referenced by OpenDDS::DCPS::TcpTransport::connect_datalink(), OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ThreadPerConnectionSendTask::open(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

22 {
23  return this->transport_priority_;
24 }
Priority transport_priority_
TRANSPORT_PRIORITY value associated with the link.
Definition: DataLink.h:431

◆ transport_priority() [2/2]

ACE_INLINE Priority OpenDDS::DCPS::DataLink::transport_priority ( ) const

Definition at line 28 of file DataLink.inl.

References ACE_INLINE, and transport_priority_.

29 {
30  return this->transport_priority_;
31 }
Priority transport_priority_
TRANSPORT_PRIORITY value associated with the link.
Definition: DataLink.h:431

◆ transport_shutdown()

void OpenDDS::DCPS::DataLink::transport_shutdown ( )

Our TransportImpl will inform us if it is being shutdown() by calling this method.

Definition at line 826 of file DataLink.cpp.

References ACE_Reactor_Timer_Interface::cancel_timer(), DBG_ENTRY_LVL, impl(), impl_, ACE_Event_Handler::reactor(), scheduled_to_stop_at_, set_scheduling_release(), stop(), OpenDDS::DCPS::TransportImpl::timer(), and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by OpenDDS::DCPS::UdpTransport::shutdown_i(), and OpenDDS::DCPS::RtpsUdpTransport::shutdown_i().

827 {
828  DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
829 
830  //this->cancel_release();
831  this->set_scheduling_release(false);
833 
834  {
835  TransportImpl_rch impl = impl_.lock();
836  if (impl) {
837  ACE_Reactor_Timer_Interface* reactor = impl->timer();
838  reactor->cancel_timer(this);
839  }
840  }
841  this->stop();
842  // this->send_listeners_.clear();
843  // this->recv_listeners_.clear();
844  // Drop our reference to the TransportImpl object
845 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
virtual ACE_Reactor * reactor(void) const
void set_scheduling_release(bool scheduling_release)
Definition: DataLink.inl:161
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

Friends And Related Function Documentation

◆ DataLinkCleanupTask

friend class DataLinkCleanupTask
friend

Definition at line 76 of file DataLink.h.

◆ operator<<

OpenDDS_Dcps_Export std::ostream& operator<< ( std::ostream str,
const DataLink value 
)
friend

Convenience function for diagnostic information.

Definition at line 1251 of file DataLink.cpp.

1252 {
1253  str << " There are " << value.assoc_by_local_.size()
1254  << " local entities currently using this link";
1255 
1256  if (!value.assoc_by_local_.empty()) {
1257  str << " comprising following associations:";
1258  }
1259  str << std::endl;
1260 
1261  typedef DataLink::AssocByLocal::const_iterator assoc_iter_t;
1262  const DataLink::AssocByLocal& abl = value.assoc_by_local_;
1263  for (assoc_iter_t ait = abl.begin(); ait != abl.end(); ++ait) {
1264  const RepoIdSet& set = ait->second.associated_;
1265  for (RepoIdSet::const_iterator rit = set.begin(); rit != set.end(); ++rit) {
1266  str << GuidConverter(ait->first) << " --> "
1267  << GuidConverter(*rit) << " " << std::endl;
1268  }
1269  }
1270  return str;
1271 }
const LogLevel::Value value
Definition: debug.cpp:61
GuidSet RepoIdSet
Definition: GuidUtils.h:113

◆ ThreadPerConnectionSendTask

friend class ThreadPerConnectionSendTask
friend

Definition at line 326 of file DataLink.h.

Referenced by DataLink().

Member Data Documentation

◆ assoc_by_local_

AssocByLocal OpenDDS::DCPS::DataLink::assoc_by_local_
private

◆ assoc_by_remote_

AssocByRemote OpenDDS::DCPS::DataLink::assoc_by_remote_
private

◆ assoc_releasing_

AssocByLocal OpenDDS::DCPS::DataLink::assoc_releasing_
private

Definition at line 428 of file DataLink.h.

Referenced by clear_associations(), and prepare_release().

◆ datalink_release_delay_

TimeDuration OpenDDS::DCPS::DataLink::datalink_release_delay_
protected

Configurable delay in milliseconds that the datalink should be released after all associations are removed.

Definition at line 453 of file DataLink.h.

Referenced by DataLink(), datalink_release_delay(), and schedule_delayed_release().

◆ db_allocator_

unique_ptr<DataBlockAllocator> OpenDDS::DCPS::DataLink::db_allocator_
protected

Definition at line 458 of file DataLink.h.

Referenced by create_control(), and DataLink().

◆ default_listener_

TransportReceiveListener_wrch OpenDDS::DCPS::DataLink::default_listener_
private

If default_listener_ is not null and this DataLink receives a sample from a publication GUID that's not in pub_map_, it will call data_received() on the default_listener_.

Definition at line 401 of file DataLink.h.

Referenced by data_received_i(), and default_listener().

◆ id_

ACE_UINT64 OpenDDS::DCPS::DataLink::id_
private

◆ impl_

WeakRcHandle<TransportImpl> OpenDDS::DCPS::DataLink::impl_
private

A weak rchandle to the TransportImpl that created this DataLink.

Definition at line 417 of file DataLink.h.

Referenced by handle_exception(), handle_timeout(), impl(), notify_reactor(), release_reservations(), release_resources(), and transport_shutdown().

◆ interceptor_

Interceptor OpenDDS::DCPS::DataLink::interceptor_
protected

Definition at line 469 of file DataLink.h.

Referenced by add_on_start_callback().

◆ is_active_

bool OpenDDS::DCPS::DataLink::is_active_
protected

Is pub or sub ?

Definition at line 463 of file DataLink.h.

Referenced by is_active(), and OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().

◆ is_loopback_

bool OpenDDS::DCPS::DataLink::is_loopback_
protected

Is remote attached to same transport ?

Definition at line 461 of file DataLink.h.

Referenced by is_loopback(), and OpenDDS::DCPS::UdpDataLink::open().

◆ mb_allocator_

unique_ptr<MessageBlockAllocator> OpenDDS::DCPS::DataLink::mb_allocator_
protected

Allocators for data and message blocks used by transport control samples when send_control is called.

Definition at line 457 of file DataLink.h.

Referenced by create_control(), and DataLink().

◆ on_start_callbacks_

OnStartCallbackMap OpenDDS::DCPS::DataLink::on_start_callbacks_
protected

◆ pending_on_starts_

PendingOnStartsMap OpenDDS::DCPS::DataLink::pending_on_starts_
protected

◆ pub_sub_maps_lock_

LockType OpenDDS::DCPS::DataLink::pub_sub_maps_lock_
mutableprivate

◆ receive_strategy_

TransportStrategy_rch OpenDDS::DCPS::DataLink::receive_strategy_
protected

◆ recv_listeners_

IdToRecvListenerMap OpenDDS::DCPS::DataLink::recv_listeners_
private

◆ scheduled_to_stop_at_

MonotonicTimePoint OpenDDS::DCPS::DataLink::scheduled_to_stop_at_
private

◆ scheduling_release_

bool OpenDDS::DCPS::DataLink::scheduling_release_
private

Definition at line 433 of file DataLink.h.

Referenced by cancel_release(), handle_exception(), and set_scheduling_release().

◆ send_listeners_

IdToSendListenerMap OpenDDS::DCPS::DataLink::send_listeners_
private

◆ send_response_listener_

SendResponseListener OpenDDS::DCPS::DataLink::send_response_listener_
protected

Listener for TransportSendControlElements created in send_control.

Definition at line 467 of file DataLink.h.

Referenced by send_control().

◆ send_strategy_

TransportSendStrategy_rch OpenDDS::DCPS::DataLink::send_strategy_
protected

◆ started_

bool OpenDDS::DCPS::DataLink::started_
protected

Definition at line 464 of file DataLink.h.

Referenced by add_on_start_callback(), and start().

◆ stopped_

bool OpenDDS::DCPS::DataLink::stopped_
private

A boolean indicating if the DataLink has been stopped. This value is protected by the strategy_lock_.

Definition at line 387 of file DataLink.h.

Referenced by cancel_release(), release_reservations(), schedule_stop(), and stop().

◆ strategy_lock_

LockType OpenDDS::DCPS::DataLink::strategy_lock_
protected

◆ thr_per_con_send_task_

unique_ptr<ThreadPerConnectionSendTask> OpenDDS::DCPS::DataLink::thr_per_con_send_task_
private

The task used to do the sending. This ThreadPerConnectionSendTask object is created when the thread_per_connection configuration is true. It only dedicate to this datalink.

Definition at line 425 of file DataLink.h.

Referenced by DataLink(), pre_stop_i(), remove_sample(), send(), send_start(), send_stop(), and ~DataLink().

◆ transport_priority_

Priority OpenDDS::DCPS::DataLink::transport_priority_
private

TRANSPORT_PRIORITY value associated with the link.

Definition at line 431 of file DataLink.h.

Referenced by transport_priority().


The documentation for this class was generated from the following files: