OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::TcpDataLink Class Reference

#include <TcpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::TcpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpDataLink:
Collaboration graph
[legend]

Public Member Functions

 TcpDataLink (const TcpTransport_rch &transport_impl, const ACE_INET_Addr &remote_address, Priority priority, bool is_loopback, bool is_active)
 
virtual ~TcpDataLink ()
 
const ACE_INET_Addrremote_address () const
 Accessor for the remote address. More...
 
int connect (const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
 
int reuse_existing_connection (const TcpConnection_rch &connection)
 
int reconnect (const TcpConnection_rch &connection)
 
TcpConnection_rch get_connection ()
 
bool check_active_client (const GUID_t &local_id)
 
void client_stop (const GUID_t &local_id)
 
virtual void pre_stop_i ()
 
void set_release_pending (bool flag)
 Set release pending flag. More...
 
bool is_release_pending () const
 Get release pending flag. More...
 
void ack_received (const ReceivedDataSample &sample)
 
void request_ack_received (const ReceivedDataSample &sample)
 
void drop_pending_request_acks ()
 
TcpSendStrategy_rch send_strategy ()
 
TcpReceiveStrategy_rch receive_strategy ()
 
int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
void do_association_actions ()
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
virtual RemoveResult remove_sample (const DataSampleElement *sample)
 
virtual void remove_all_msgs (const GUID_t &pub_id)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &, const GUID_t &) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual void stop_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
virtual void send_stop_i (GUID_t repoId)
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Member Functions

bool handle_send_request_ack (TransportQueueElement *element)
 
void send_graceful_disconnect_message ()
 
void send_association_msg (const GUID_t &local, const GUID_t &remote)
 
typedef OPENDDS_VECTOR (TransportQueueElement *) PendingRequestAcks
 
typedef OPENDDS_SET_CMP (GUID_t, GUID_tKeyLessThan) RepoIdSetType
 

Private Attributes

ACE_INET_Addr remote_address_
 
WeakRcHandle< TcpConnectionconnection_
 
bool graceful_disconnect_sent_
 
AtomicBool release_is_pending_
 
ACE_SYNCH_MUTEX pending_request_acks_lock_
 
PendingRequestAcks pending_request_acks_
 
RepoIdSetType stopped_clients_
 
ACE_Thread_Mutex stopped_clients_mutex_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 25 of file TcpDataLink.h.

Constructor & Destructor Documentation

◆ TcpDataLink()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TcpDataLink::TcpDataLink ( const TcpTransport_rch transport_impl,
const ACE_INET_Addr remote_address,
Priority  priority,
bool  is_loopback,
bool  is_active 
)

Definition at line 30 of file TcpDataLink.cpp.

References DBG_ENTRY_LVL.

36  : DataLink(transport_impl, priority, is_loopback, is_active)
37  , remote_address_(remote_address)
39  , release_is_pending_(false)
40 {
41  DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
42 }
ACE_INET_Addr remote_address_
Definition: TcpDataLink.h:95
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
virtual int priority(void) const
AtomicBool release_is_pending_
Definition: TcpDataLink.h:98
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ~TcpDataLink()

OpenDDS::DCPS::TcpDataLink::~TcpDataLink ( )
virtual

Definition at line 44 of file TcpDataLink.cpp.

References DBG_ENTRY_LVL.

45 {
46  DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
47 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ ack_received()

void OpenDDS::DCPS::TcpDataLink::ack_received ( const ReceivedDataSample sample)

Definition at line 408 of file TcpDataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, LM_DEBUG, pending_request_acks_, pending_request_acks_lock_, OpenDDS::DCPS::DataSampleHeader::publication_id_, send_strategy(), OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample().

409 {
410  const SequenceNumber sequence = sample.header_.sequence_;
411 
412  if (sequence == -1) {
413  return;
414  }
415 
416  if (Transport_debug_level >= 1) {
417  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received sequence number %q, publiction_id=%C\n"),
418  sequence.getValue(), LogGuid(sample.header_.publication_id_).c_str()));
419  }
420 
421  TransportQueueElement* elem=0;
422  {
423  // find the pending request with the same sequence number.
425  PendingRequestAcks::iterator it;
426  for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it) {
427  if ((*it)->sequence() == sequence && (*it)->publication_id() == sample.header_.publication_id_) {
428  elem = *it;
429  pending_request_acks_.erase(it);
430  break;
431  }
432  }
433  }
434 
435  if (elem) {
436  if (Transport_debug_level >= 1) {
437  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() found matching element %@\n"),
438  elem));
439  }
440  send_strategy()->deliver_ack_request(elem);
441  }
442  else {
443  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received unknown sequence number %q\n"),
444  sequence.getValue()));
445  }
446 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
PendingRequestAcks pending_request_acks_
Definition: TcpDataLink.h:101
ACE_SYNCH_MUTEX pending_request_acks_lock_
Definition: TcpDataLink.h:100
TcpSendStrategy_rch send_strategy()
ACE_TEXT("TCP_Factory")

◆ check_active_client()

bool OpenDDS::DCPS::TcpDataLink::check_active_client ( const GUID_t local_id)

Definition at line 88 of file TcpDataLink.cpp.

References stopped_clients_, and stopped_clients_mutex_.

89 {
91  return stopped_clients_.count(local_id) == 0;
92 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103

◆ client_stop()

void OpenDDS::DCPS::TcpDataLink::client_stop ( const GUID_t local_id)

Definition at line 95 of file TcpDataLink.cpp.

References send_strategy(), stopped_clients_, and stopped_clients_mutex_.

96 {
98  stopped_clients_.insert(local_id);
99 
100  TcpSendStrategy_rch strategy = send_strategy();
101  if (strategy) {
102  strategy->remove_all_msgs(local_id);
103  }
104 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
TcpSendStrategy_rch send_strategy()
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ connect()

int OpenDDS::DCPS::TcpDataLink::connect ( const TcpConnection_rch connection,
const RcHandle< TcpSendStrategy > &  send_strategy,
const RcHandle< TcpReceiveStrategy > &  receive_strategy 
)

Called when an established connection object is available for this TcpDataLink. Called by the TcpTransport's connect_datalink() method.

The TcpTransport calls this method when it has an established connection object for us. This call puts this TcpDataLink into the "connected" state.

Definition at line 138 of file TcpDataLink.cpp.

References ACE_ERROR_RETURN, ACE_NONBLOCK, ACE_TEXT(), connection_, DBG_ENTRY_LVL, do_association_actions(), LM_ERROR, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::DataLink::start(), and OpenDDS::DCPS::DataLink::strategy_lock_.

Referenced by OpenDDS::DCPS::TcpTransport::connect_tcp_datalink().

142 {
143  DBG_ENTRY_LVL("TcpDataLink","connect",6);
144 
145  {
146  GuardType guard(strategy_lock_);
147  this->connection_ = connection;
148  }
149 
150  if (connection->peer().enable(ACE_NONBLOCK) == -1) {
151  ACE_ERROR_RETURN((LM_ERROR,
152  "(%P|%t) ERROR: TcpDataLink::connect failed to set "
153  "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
154  }
155 
156  // Let connection know the datalink for callbacks upon reconnect failure.
157  connection->set_datalink(rchandle_from(this));
158 
159  // And lastly, inform our base class (DataLink) that we are now "connected",
160  // and it should start the strategy objects.
161  if (this->start(send_strategy, receive_strategy, false) != 0) {
162  // Our base (DataLink) class failed to start the strategy objects.
163  // We need to "undo" some things here before we return -1 to indicate
164  // that an error has taken place.
165 
166  // Drop our reference to the connection object.
167  this->connection_.reset();
168 
169  return -1;
170  }
171 
173  return 0;
174 }
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
TcpSendStrategy_rch send_strategy()
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TcpReceiveStrategy_rch receive_strategy()
#define ACE_ERROR_RETURN(X, Y)

◆ do_association_actions()

void OpenDDS::DCPS::TcpDataLink::do_association_actions ( )

Definition at line 498 of file TcpDataLink.cpp.

References connection_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::TransportSendStrategy::link_released(), OpenDDS::DCPS::DataLink::on_start_callbacks_, send_association_msg(), OpenDDS::DCPS::DataLink::send_strategy_, and OpenDDS::DCPS::DataLink::strategy_lock_.

Referenced by OpenDDS::DCPS::TcpTransport::accept_datalink(), connect(), OpenDDS::DCPS::TcpTransport::connect_datalink(), reconnect(), and reuse_existing_connection().

499 {
500  // We have a connection.
501  // Invoke callbacks for readers so we can receive messages and let writers know we are ready.
502  typedef std::vector<std::pair<GUID_t, GUID_t> > PairVec;
503  PairVec to_call_and_send;
504 
505  {
506  GuardType guard(strategy_lock_);
507 
508  if (!connection_ || !send_strategy_) {
509  return;
510  }
511 
512  for (OnStartCallbackMap::const_iterator it = on_start_callbacks_.begin(); it != on_start_callbacks_.end(); ++it) {
513  for (RepoToClientMap::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) {
514  if (GuidConverter(it2->first).isReader()) {
515  to_call_and_send.push_back(std::make_pair(it2->first, it->first));
516  }
517  }
518  }
519  }
520 
522 
523  for (PairVec::const_iterator it = to_call_and_send.begin(); it != to_call_and_send.end(); ++it) {
524  invoke_on_start_callbacks(it->first, it->second, true);
525  send_association_msg(it->first, it->second);
526  }
527 }
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
void send_association_msg(const GUID_t &local, const GUID_t &remote)

◆ drop_pending_request_acks()

void OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks ( )

Definition at line 563 of file TcpDataLink.cpp.

References pending_request_acks_, and pending_request_acks_lock_.

Referenced by OpenDDS::DCPS::TcpConnection::disconnect(), OpenDDS::DCPS::TcpConnection::notify_connection_lost(), OpenDDS::DCPS::TcpReceiveStrategy::reset(), and OpenDDS::DCPS::TcpReceiveStrategy::stop_i().

564 {
566  PendingRequestAcks::iterator it;
567  for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it) {
568  (*it)->data_dropped(true);
569  }
570  pending_request_acks_.clear();
571 }
PendingRequestAcks pending_request_acks_
Definition: TcpDataLink.h:101
ACE_SYNCH_MUTEX pending_request_acks_lock_
Definition: TcpDataLink.h:100

◆ get_connection()

ACE_INLINE OpenDDS::DCPS::TcpConnection_rch OpenDDS::DCPS::TcpDataLink::get_connection ( void  )

◆ handle_send_request_ack()

bool OpenDDS::DCPS::TcpDataLink::handle_send_request_ack ( TransportQueueElement element)
privatevirtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 389 of file TcpDataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), connection_, OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, pending_request_acks_, pending_request_acks_lock_, OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::TransportQueueElement::sequence(), and OpenDDS::DCPS::Transport_debug_level.

390 {
391  if (Transport_debug_level >= 1) {
392  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::handle_send_request_ack(%@) sequence number %q, publication_id=%C\n"),
393  element, element->sequence().getValue(), LogGuid(element->publication_id()).c_str()));
394  }
395  bool result = false;
396  TcpConnection_rch connection(connection_.lock());
397  if (connection) {
399  pending_request_acks_.push_back(element);
400  } else {
401  element->data_dropped(true);
402  result = true;
403  }
404  return result;
405 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
PendingRequestAcks pending_request_acks_
Definition: TcpDataLink.h:101
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
ACE_SYNCH_MUTEX pending_request_acks_lock_
Definition: TcpDataLink.h:100
RcHandle< TcpConnection > TcpConnection_rch
ACE_TEXT("TCP_Factory")

◆ is_release_pending()

bool OpenDDS::DCPS::TcpDataLink::is_release_pending ( ) const

Get release pending flag.

Definition at line 383 of file TcpDataLink.cpp.

References release_is_pending_.

Referenced by OpenDDS::DCPS::TcpConnection::active_reconnect_i().

384 {
385  return release_is_pending_;
386 }
AtomicBool release_is_pending_
Definition: TcpDataLink.h:98

◆ make_reservation() [1/2]

int OpenDDS::DCPS::TcpDataLink::make_reservation ( const GUID_t remote_subscription_id,
const GUID_t local_publication_id,
const TransportSendListener_wrch send_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 588 of file TcpDataLink.cpp.

References OpenDDS::DCPS::DataLink::make_reservation(), send_association_msg(), stopped_clients_, and stopped_clients_mutex_.

592 {
593  {
595  stopped_clients_.erase(local_publication_id);
596  }
597  const int result = DataLink::make_reservation(remote_subscription_id, local_publication_id, send_listener, reliable);
598  send_association_msg(local_publication_id, remote_subscription_id);
599  return result;
600 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
void send_association_msg(const GUID_t &local, const GUID_t &remote)

◆ make_reservation() [2/2]

int OpenDDS::DCPS::TcpDataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 603 of file TcpDataLink.cpp.

References OpenDDS::DCPS::DataLink::make_reservation(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, send_association_msg(), stopped_clients_, and stopped_clients_mutex_.

607 {
608  {
610  stopped_clients_.erase(local_subscription_id);
611  }
612  const int result = DataLink::make_reservation(remote_publication_id, local_subscription_id, receive_listener, reliable);
613  send_association_msg(local_subscription_id, remote_publication_id);
614 
615  return result;
616 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
void send_association_msg(const GUID_t &local, const GUID_t &remote)

◆ OPENDDS_SET_CMP()

typedef OpenDDS::DCPS::TcpDataLink::OPENDDS_SET_CMP ( GUID_t  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::TcpDataLink::OPENDDS_VECTOR ( TransportQueueElement )
private

◆ pre_stop_i()

void OpenDDS::DCPS::TcpDataLink::pre_stop_i ( )
virtual

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 107 of file TcpDataLink.cpp.

References connection_, DBG_ENTRY_LVL, graceful_disconnect_sent_, OpenDDS::DCPS::DataLink::pre_stop_i(), receive_strategy(), and send_graceful_disconnect_message().

108 {
109  DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
110 
112 
114 
115  TcpConnection_rch connection(connection_.lock());
116 
117  if (rs) {
118  // If we received the GRACEFUL_DISCONNECT message from peer before we
119  // initiate the disconnecting of the datalink, then we will not send
120  // the GRACEFUL_DISCONNECT message to the peer.
121  bool disconnected = rs->gracefully_disconnected();
122 
123  if (connection && !graceful_disconnect_sent_ && !disconnected) {
126  }
127  }
128 
129  if (connection) {
130  connection->shutdown();
131  }
132 }
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
RcHandle< TcpConnection > TcpConnection_rch
virtual void pre_stop_i()
Definition: DataLink.cpp:993
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TcpReceiveStrategy_rch receive_strategy()
RcHandle< TcpReceiveStrategy > TcpReceiveStrategy_rch

◆ receive_strategy()

OpenDDS::DCPS::TcpReceiveStrategy_rch OpenDDS::DCPS::TcpDataLink::receive_strategy ( )

◆ reconnect()

int OpenDDS::DCPS::TcpDataLink::reconnect ( const TcpConnection_rch connection)

Associate the new connection object with this datalink object. The states of the "old" connection object are copied to the new connection object and the "old" connection object is replaced by the new connection object.

Definition at line 240 of file TcpDataLink.cpp.

References OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), connection_, DBG_ENTRY_LVL, do_association_actions(), OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::DataLink::impl(), OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataLink::strategy_lock_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpTransport::fresh_link().

241 {
242  DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
243 
244  TcpConnection_rch existing_connection(connection_.lock());
245  // Sanity check - the connection should exist already since we are reconnecting.
246  if (!existing_connection) {
247  VDBG_LVL((LM_ERROR,
248  "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
249  , 1);
250  return -1;
251  }
252 
253  existing_connection->transfer(connection.in());
254 
255  bool released = false;
258 
259  {
260  GuardType strategy_guard(strategy_lock_);
261 
262  trs = dynamic_rchandle_cast<TcpReceiveStrategy>(receive_strategy_);
263  tss = dynamic_rchandle_cast<TcpSendStrategy>(send_strategy_);
264 
265  if (!trs || !tss) {
266  // if either are invalid, both should be
269  released = true;
270  }
271  }
272 
273  if (released) {
274  RcHandle<TcpTransport> transport = dynamic_rchandle_cast<TcpTransport>(impl());
275  if (transport) {
276  const int result = transport->connect_tcp_datalink(*this, connection);
277  if (result == 0) {
279  }
280  return result;
281  }
282  return -1;
283  }
284 
285  connection_ = connection;
286 
287  // Associate the new connection object with the receiveing strategy and disassociate
288  // the old connection object with the receiveing strategy.
289  int rs_result = trs->reset(existing_connection.in(), connection.in());
290 
291  // Associate the new connection object with the sending strategy and disassociate
292  // the old connection object with the sending strategy.
293  int ss_result = tss->reset();
294 
295  if (rs_result == 0 && ss_result == 0) {
297  return 0;
298  }
299 
300  return -1;
301 }
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< TcpConnection > TcpConnection_rch
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
RcHandle< TcpReceiveStrategy > TcpReceiveStrategy_rch
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ remote_address()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const ACE_INET_Addr & OpenDDS::DCPS::TcpDataLink::remote_address ( ) const

Accessor for the remote address.

Definition at line 15 of file TcpDataLink.inl.

References ACE_INLINE, DBG_ENTRY_LVL, and remote_address_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

16 {
17  DBG_ENTRY_LVL("TcpDataLink","remote_address",6);
18  return this->remote_address_;
19 }
ACE_INET_Addr remote_address_
Definition: TcpDataLink.h:95
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ request_ack_received()

void OpenDDS::DCPS::TcpDataLink::request_ack_received ( const ReceivedDataSample sample)

Definition at line 449 of file TcpDataLink.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), OpenDDS::DCPS::guid_cdr_size, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, receive_strategy(), OpenDDS::DCPS::SAMPLE_ACK, send_i(), OpenDDS::DCPS::DataSampleHeader::sequence_, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample().

450 {
451  if (sample.header_.sequence_ == -1 && sample.header_.message_length_ == guid_cdr_size) {
452  GUID_t local;
453  Message_Block_Ptr payload(receive_strategy()->to_msgblock(sample));
454  Serializer ser(payload.get(), encoding_unaligned_native);
455  if (ser >> local) {
456  invoke_on_start_callbacks(local, sample.header_.publication_id_, true);
457  }
458  return;
459  }
460 
461  DataSampleHeader header_data;
462  // The message_id_ is the most important value for the DataSampleHeader.
463  header_data.message_id_ = SAMPLE_ACK;
464 
465  // Other data in the DataSampleHeader are not necessary set. The bogus values
466  // can be used.
467 
468  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
469  header_data.message_length_ = 0;
470  header_data.sequence_ = sample.header_.sequence_;
471  header_data.publication_id_ = sample.header_.publication_id_;
472  header_data.publisher_id_ = sample.header_.publisher_id_;
473 
474  Message_Block_Ptr message(
475  new ACE_Message_Block(header_data.get_max_serialized_size(),
477  0, //cont
478  0, //data
479  0, //allocator_strategy
480  0, //locking_strategy
484  0,
485  0));
486 
487  *message << header_data;
488 
489  TransportControlElement* send_element = new TransportControlElement(move(message));
490 
491 
492  // I don't want to rebuild a connection in order to send
493  // a sample ack message
494  this->send_i(send_element, false);
495 }
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
static const ACE_Time_Value max_time
const size_t guid_cdr_size
Definition: GuidUtils.h:115
#define ACE_CDR_BYTE_ORDER
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static const ACE_Time_Value zero
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
TcpReceiveStrategy_rch receive_strategy()
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: TcpDataLink.cpp:68

◆ reuse_existing_connection()

int OpenDDS::DCPS::TcpDataLink::reuse_existing_connection ( const TcpConnection_rch connection)

Definition at line 183 of file TcpDataLink.cpp.

References connection_, DBG_ENTRY_LVL, do_association_actions(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::is_active_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TcpReceiveStrategy::reset(), OpenDDS::DCPS::TcpSendStrategy::reset(), OpenDDS::DCPS::DataLink::send_strategy_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpTransport::connect_tcp_datalink().

184 {
185  DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
186 
187  if (this->is_active_) {
188  return -1;
189  }
190  //Need to check if connection is nil. If connection is not nil, then connection
191  //has previously gone through connection phase so this is a reuse of the connection
192  //proceed to determine if we can reuse/reset existing mechanisms or need to start from
193  //scratch.
194 
195  TcpConnection_rch old_connection(this->connection_.lock());
196 
197  if (old_connection) {
198  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
199  "trying to reuse existing connection\n"), 0);
200  old_connection->transfer(connection.in());
201 
202  //Connection already exists.
205 
206  if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
207  return -1;
208  } else {
209  brs = this->receive_strategy_;
210  bss = this->send_strategy_;
211 
212  this->connection_ = connection;
213 
214  TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
215 
216  TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
217 
218  // Associate the new connection object with the receiving strategy and disassociate
219  // the old connection object with the receiving strategy.
220  int rs_result = rs->reset(0, connection.in());
221 
222  // Associate the new connection object with the sending strategy and disassociate
223  // the old connection object with the sending strategy.
224  int ss_result = ss->reset(true);
225 
226  if (rs_result == 0 && ss_result == 0) {
228  return 0;
229  }
230  }
231  }
232  return -1;
233 }
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463
RcHandle< TcpConnection > TcpConnection_rch
RcHandle< TransportStrategy > TransportStrategy_rch
RcHandle< TransportSendStrategy > TransportSendStrategy_rch
The type definition for the smart-pointer to the underlying type.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ send_association_msg()

void OpenDDS::DCPS::TcpDataLink::send_association_msg ( const GUID_t local,
const GUID_t remote 
)
private

Definition at line 530 of file TcpDataLink.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), OpenDDS::DCPS::guid_cdr_size, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, OpenDDS::DCPS::REQUEST_ACK, send_i(), OpenDDS::DCPS::DataSampleHeader::sequence_, and ACE_Time_Value::zero.

Referenced by do_association_actions(), and make_reservation().

531 {
532  DataSampleHeader header_data;
533  header_data.message_id_ = REQUEST_ACK;
534  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
535  header_data.message_length_ = guid_cdr_size;
536  header_data.sequence_ = -1;
537  header_data.publication_id_ = local;
538  header_data.publisher_id_ = remote;
539 
540  Message_Block_Ptr message(
541  new ACE_Message_Block(header_data.get_max_serialized_size(),
543  0, //cont
544  0, //data
545  0, //allocator_strategy
546  0, //locking_strategy
550  0,
551  0));
552 
553  *message << header_data;
554  Serializer ser(message.get(), encoding_unaligned_native);
555  ser << remote;
556 
557  TransportControlElement* send_element = new TransportControlElement(move(message));
558 
559  this->send_i(send_element, false);
560 }
static const ACE_Time_Value max_time
const size_t guid_cdr_size
Definition: GuidUtils.h:115
#define ACE_CDR_BYTE_ORDER
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static const ACE_Time_Value zero
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: TcpDataLink.cpp:68

◆ send_graceful_disconnect_message()

void OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message ( )
private

Definition at line 304 of file TcpDataLink.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), OpenDDS::DCPS::GRACEFUL_DISCONNECT, ACE_Message_Block::length(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::move(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), send_i(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::TransportSendStrategy::terminate_send(), ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

Referenced by pre_stop_i().

305 {
306  DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
307 
308  // Will clear all queued messages but still let the disconnect message
309  // sent.
310  this->send_strategy_->terminate_send(true);
311 
312  DataSampleHeader header_data;
313  // The message_id_ is the most important value for the DataSampleHeader.
314  header_data.message_id_ = GRACEFUL_DISCONNECT;
315 
316  // Other data in the DataSampleHeader are not necessary set. The bogus values
317  // can be used.
318 
319  //header_data.byte_order_
320  // = this->transport_->config()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER;
321  //header_data.message_length_ = 0;
322  //header_data.sequence_ = 0;
323  //DDS::Time_t source_timestamp
324  // = SystemTimePoint::now().to_dds_time();
325  //header_data.source_timestamp_sec_ = source_timestamp.sec;
326  //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
327  //header_data.coherency_group_ = 0;
328  //header_data.publication_id_ = 0;
329 
330  // TODO:
331  // It seems a bug in the transport implementation that the receiving side can
332  // not receive the message when the message has no sample data and is sent
333  // in a single packet.
334 
335  // To work around this problem, I have to add bogus data to chain with the
336  // DataSampleHeader to make the receiving work.
337 
338  Message_Block_Ptr data(
339  new ACE_Message_Block(20,
341  0, //cont
342  0, //data
343  0, //allocator_strategy
344  0, //locking_strategy
345  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
348  0,
349  0));
350  data->wr_ptr(20);
351 
352  header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
353 
354  Message_Block_Ptr message(
355  new ACE_Message_Block(header_data.get_max_serialized_size(),
357  data.release(), //cont
358  0, //data
359  0, //allocator_strategy
360  0, //locking_strategy
364  0,
365  0));
366 
367  *message << header_data;
368 
369  TransportControlElement* send_element = new TransportControlElement(move(message));
370 
371  // I don't want to rebuild a connection in order to send
372  // a graceful disconnect message.
373  this->send_i(send_element, false);
374 }
static const ACE_Time_Value max_time
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: TcpDataLink.cpp:68

◆ send_i()

void OpenDDS::DCPS::TcpDataLink::send_i ( TransportQueueElement element,
bool  relink = true 
)
protectedvirtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 68 of file TcpDataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::DataLink::send_i(), stopped_clients_, and stopped_clients_mutex_.

Referenced by request_ack_received(), send_association_msg(), and send_graceful_disconnect_message().

69 {
71  if (stopped_clients_.count(element->publication_id())) {
72  element->data_dropped(true);
73  } else {
74  DCPS::DataLink::send_i(element, relink);
75  }
76 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103

◆ send_stop_i()

void OpenDDS::DCPS::TcpDataLink::send_stop_i ( GUID_t  repoId)
protectedvirtual

Definition at line 79 of file TcpDataLink.cpp.

References OpenDDS::DCPS::DataLink::send_stop_i(), stopped_clients_, and stopped_clients_mutex_.

80 {
82  if (!stopped_clients_.count(repoId)) {
84  }
85 }
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103

◆ send_strategy()

OpenDDS::DCPS::TcpSendStrategy_rch OpenDDS::DCPS::TcpDataLink::send_strategy ( )

◆ set_release_pending()

void OpenDDS::DCPS::TcpDataLink::set_release_pending ( bool  flag)

Set release pending flag.

Definition at line 377 of file TcpDataLink.cpp.

References release_is_pending_.

Referenced by OpenDDS::DCPS::TcpTransport::find_datalink_i(), and OpenDDS::DCPS::TcpTransport::release_datalink().

378 {
379  this->release_is_pending_ = flag;
380 }
AtomicBool release_is_pending_
Definition: TcpDataLink.h:98

◆ stop_i()

void OpenDDS::DCPS::TcpDataLink::stop_i ( )
protectedvirtual

Called when the DataLink is self-releasing because all of its reservations have been released, or when the TransportImpl is handling a shutdown() call.

Called when the DataLink has been "stopped" for some reason. It could be called from the DataLink::transport_shutdown() method (when the TransportImpl is handling a shutdown() call). Or, it could be called from the DataLink::release_reservations() method, when it discovers that it has just released the last remaining reservations from the DataLink, and the DataLink is in the process of "releasing" itself.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 56 of file TcpDataLink.cpp.

References connection_, and DBG_ENTRY_LVL.

57 {
58  DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
59 
60  TcpConnection_rch connection(this->connection_.lock());
61  if (connection) {
62  // Tell the connection object to disconnect.
63  connection->disconnect();
64  }
65 }
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
RcHandle< TcpConnection > TcpConnection_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Data Documentation

◆ connection_

WeakRcHandle<TcpConnection> OpenDDS::DCPS::TcpDataLink::connection_
private

◆ graceful_disconnect_sent_

bool OpenDDS::DCPS::TcpDataLink::graceful_disconnect_sent_
private

Definition at line 97 of file TcpDataLink.h.

Referenced by pre_stop_i().

◆ pending_request_acks_

PendingRequestAcks OpenDDS::DCPS::TcpDataLink::pending_request_acks_
private

Definition at line 101 of file TcpDataLink.h.

Referenced by ack_received(), drop_pending_request_acks(), and handle_send_request_ack().

◆ pending_request_acks_lock_

ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpDataLink::pending_request_acks_lock_
private

Definition at line 100 of file TcpDataLink.h.

Referenced by ack_received(), drop_pending_request_acks(), and handle_send_request_ack().

◆ release_is_pending_

AtomicBool OpenDDS::DCPS::TcpDataLink::release_is_pending_
private

Definition at line 98 of file TcpDataLink.h.

Referenced by is_release_pending(), and set_release_pending().

◆ remote_address_

ACE_INET_Addr OpenDDS::DCPS::TcpDataLink::remote_address_
private

Definition at line 95 of file TcpDataLink.h.

Referenced by remote_address().

◆ stopped_clients_

RepoIdSetType OpenDDS::DCPS::TcpDataLink::stopped_clients_
private

◆ stopped_clients_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::TcpDataLink::stopped_clients_mutex_
mutableprivate

The documentation for this class was generated from the following files: