OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Types | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::TransportClient Class Referenceabstract

Mix-in class for DDS entities which directly use the transport layer. More...

#include <TransportClient.h>

Inheritance diagram for OpenDDS::DCPS::TransportClient:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportClient:
Collaboration graph
[legend]

Classes

struct  PendingAssoc
 
class  PendingAssocTimer
 

Public Types

enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
virtual GUID_t get_guid () const =0
 
virtual RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Types

typedef ACE_Guard< ACE_Thread_MutexGuard
 
typedef ACE_Reverse_Lock< ACE_Thread_MutexReverse_Lock_t
 
typedef RcHandle< PendingAssocPendingAssoc_rch
 

Private Member Functions

virtual bool check_transport_qos (const TransportInst &inst)=0
 
virtual DDS::DomainId_t domain_id () const =0
 
virtual Priority get_priority_value (const AssociationData &data) const =0
 
virtual void transport_assoc_done (int, const GUID_t &)
 
virtual SequenceNumber get_max_sn () const
 
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle () const
 
void use_datalink_i (const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)
 
TransportSendListener_rch get_send_listener ()
 
TransportReceiveListener_rch get_receive_listener ()
 
bool initiate_connect_i (TransportImpl::AcceptConnectResult &result, TransportImpl_rch impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
 
void send_i (SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
 
typedef OPENDDS_MAP_CMP (GUID_t, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex
 
typedef OPENDDS_VECTOR (WeakRcHandle< TransportImpl >) ImplsType
 
typedef OPENDDS_MAP_CMP (GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PrevPendingMap
 
void clean_prev_pending ()
 

Private Attributes

RcHandle< PendingAssocTimerpending_assoc_timer_
 
TransportConfig_rch config_
 
ImplsType impls_
 
PendingMap pending_
 
PrevPendingMap prev_pending_
 
DataLinkSet links_
 
DataLinkIndex data_link_index_
 
ACE_Thread_Mutex send_transaction_lock_
 
ACE_UINT64 expected_transaction_id_
 
ACE_UINT64 max_transaction_id_seen_
 
DataSampleElementmax_transaction_tail_
 
bool swap_bytes_
 
bool cdr_encapsulation_
 
bool reliable_
 
bool durable_
 
TimeDuration passive_connect_duration_
 
TransportLocatorSeq conn_info_
 
ACE_Thread_Mutex lock_
 Seems to protect accesses to impls_, pending_, links_, data_link_index_. More...
 
Reverse_Lock_t reverse_lock_
 
GUID_t repo_id_
 

Friends

class ::DDS_TEST
 

Detailed Description

Mix-in class for DDS entities which directly use the transport layer.

DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.

Definition at line 47 of file TransportClient.h.

Member Typedef Documentation

◆ Guard

Definition at line 174 of file TransportClient.h.

◆ PendingAssoc_rch

Definition at line 222 of file TransportClient.h.

◆ Reverse_Lock_t

Definition at line 199 of file TransportClient.h.

Member Enumeration Documentation

◆ anonymous enum

anonymous enum
Enumerator
ASSOC_OK 
ASSOC_ACTIVE 

Definition at line 55 of file TransportClient.h.

Constructor & Destructor Documentation

◆ TransportClient()

OpenDDS::DCPS::TransportClient::TransportClient ( )

Definition at line 34 of file TransportClient.cpp.

35  : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
39  , swap_bytes_(false)
40  , cdr_encapsulation_(false)
41  , reliable_(false)
42  , durable_(false)
45 {
46 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
DataSampleElement * max_transaction_tail_
RcHandle< PendingAssocTimer > pending_assoc_timer_
#define TheServiceParticipant

◆ ~TransportClient()

OpenDDS::DCPS::TransportClient::~TransportClient ( )
virtual

Definition at line 48 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), impls_, LM_DEBUG, lock_, prev_pending_, repo_id_, OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), stop_associating(), and OpenDDS::DCPS::Transport_debug_level.

49 {
50  if (Transport_debug_level > 5) {
51  LogGuid logger(repo_id_);
52  ACE_DEBUG((LM_DEBUG,
53  ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
54  logger.c_str()));
55  }
56 
58 
60 
61  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end(); ++it) {
62  for (size_t i = 0; i < impls_.size(); ++i) {
63  TransportImpl_rch impl = impls_[i].lock();
64  if (impl) {
65  impl->stop_accepting_or_connecting(it->second->client_, it->second->data_.remote_id_, false, false);
66  }
67  }
68  }
69 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_TEXT("TCP_Factory")

Member Function Documentation

◆ add_link()

void OpenDDS::DCPS::TransportClient::add_link ( const DataLink_rch link,
const GUID_t peer 
)
virtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 619 of file TransportClient.cpp.

References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLinkSet::insert_link(), links_, OpenDDS::DCPS::DataLink::make_reservation(), OPENDDS_ASSERT, reliable_, and repo_id_.

Referenced by OpenDDS::DCPS::DataReaderImpl::add_link(), and use_datalink_i().

620 {
621  links_.insert_link(link);
622  data_link_index_[peer] = link;
623 
625 
627  if (trl) {
628  link->make_reservation(peer, repo_id_, trl, reliable_);
629  } else {
630  link->make_reservation(peer, repo_id_, get_send_listener(), reliable_);
631  }
632 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
TransportReceiveListener_rch get_receive_listener()
int insert_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:42
TransportSendListener_rch get_send_listener()

◆ associate()

bool OpenDDS::DCPS::TransportClient::associate ( const AssociationData peer,
bool  active 
)

Definition at line 199 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportImpl::accept_datalink(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::TransportClient::PendingAssoc::active_, OpenDDS::DCPS::TransportClient::PendingAssoc::attribs_, OpenDDS::DCPS::back_inserter(), OpenDDS::DCPS::TransportClient::PendingAssoc::blob_index_, OpenDDS::DCPS::LogGuid::c_str(), clean_prev_pending(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, durable_, get_guid(), get_max_sn(), get_priority_value(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, impls_, OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::max_sn_, OpenDDS::DCPS::TransportClient::PendingAssoc::mutex_, OPENDDS_ASSERT, OPENDDS_STRING, OpenDDS::DCPS::AssociationData::participant_discovered_at_, pending_, pending_assoc_timer_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, OpenDDS::DCPS::rchandle_from(), reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, repo_id(), repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, OpenDDS::DCPS::TransportImpl::transport_type(), use_datalink_i(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::RecorderImpl::add_association(), OpenDDS::DCPS::ReplayerImpl::add_association(), OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Writer::assoc(), and OpenDDS::RTPS::Sedp::Reader::assoc().

200 {
201  GUID_t repo_id = get_guid();
202 
203  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
204 
205  repo_id_ = repo_id;
207 
208  if (impls_.empty()) {
209  if (DCPS_debug_level) {
210  LogGuid writer_log(repo_id_);
211  LogGuid reader_log(data.remote_id_);
212  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
213  ACE_TEXT("local %C remote %C no available impls\n"),
214  writer_log.c_str(),
215  reader_log.c_str()));
216  }
217  return false;
218  }
219 
220  bool all_impls_shut_down = true;
221  for (size_t i = 0; i < impls_.size(); ++i) {
222  TransportImpl_rch impl = impls_[i].lock();
223  if (impl && !impl->is_shut_down()) {
224  all_impls_shut_down = false;
225  break;
226  }
227  }
228 
229  if (all_impls_shut_down) {
230  if (DCPS_debug_level) {
231  LogGuid writer_log(repo_id_);
232  LogGuid reader_log(data.remote_id_);
233  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
234  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
235  writer_log.c_str(),
236  reader_log.c_str()));
237  }
238  return false;
239  }
240 
242 
243  PendingMap::iterator iter = pending_.find(data.remote_id_);
244 
245  if (iter == pending_.end()) {
246  GUID_t remote_copy(data.remote_id_);
247  PendingAssoc_rch pa = make_rch<PendingAssoc>(rchandle_from(this));
248  pa->active_ = active;
249  pa->impls_.clear();
250  pa->blob_index_ = 0;
251  pa->data_ = data;
253  pa->attribs_.local_id_ = repo_id_;
254  pa->attribs_.priority_ = get_priority_value(data);
255  pa->attribs_.local_reliable_ = reliable_;
256  pa->attribs_.local_durable_ = durable_;
257  pa->attribs_.max_sn_ = get_max_sn();
258  iter = pending_.insert(std::make_pair(remote_copy, pa)).first;
259 
260  LogGuid tc_assoc_log(repo_id_);
261  LogGuid remote_log(data.remote_id_);
262  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
263  "between %C and remote %C\n",
264  tc_assoc_log.c_str(),
265  remote_log.c_str()), 0);
266  } else {
267 
268  ACE_ERROR((LM_ERROR,
269  ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
270  ACE_TEXT("already associating with remote.\n")));
271 
272  return false;
273 
274  }
275 
276  PendingAssoc_rch pend = iter->second;
277 
278  if (active) {
279  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
280  pend->impls_.reserve(impls_.size());
281  std::reverse_copy(impls_.begin(), impls_.end(),
282  std::back_inserter(pend->impls_));
283 
284  return pend->initiate_connect(this, guard);
285 
286  } else { // passive
287 
288  // call accept_datalink for each impl / blob pair of the same type
289  for (size_t i = 0; i < impls_.size(); ++i) {
290  // Release the PendingAssoc object's mutex_ since the nested for-loop does not access
291  // the PendingAssoc object directly and the functions called by the nested loop can
292  // lead to a PendingAssoc object's mutex_ being acquired, which will cause deadlock if
293  // it is not released here.
294  TransportImpl::ConnectionAttribs attribs;
295  TransportImpl_rch impl = impls_[i].lock();
296  {
297  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
298  pend->impls_.push_back(impl);
299  attribs = pend->attribs_;
300  }
301  const OPENDDS_STRING type = impl->transport_type();
302 
303  for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
304  if (data.remote_data_[j].transport_type.in() == type) {
305  const TransportImpl::RemoteTransport remote = {
306  data.remote_id_, data.remote_data_[j].data, data.discovery_locator_.data, data.participant_discovered_at_, data.remote_transport_context_,
307  data.publication_transport_priority_,
308  data.remote_reliable_, data.remote_durable_};
309 
310  TransportImpl::AcceptConnectResult res;
311  {
312  // This thread acquired lock_ at the beginning of this method.
313  // Calling accept_datalink might require getting the lock for the transport's reactor.
314  // If the current thread is not an event handler for the transport's reactor, e.g.,
315  // the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
316  // Event handlers in the transport reactor may call passive_connection which calls use_datalink
317  // which acquires lock_. The locking order in this case is transport reactor lock -> lock_.
318  // To avoid deadlock, we must reverse the lock.
319  RcHandle<TransportClient> client = rchandle_from(this);
320  ACE_GUARD_RETURN(Reverse_Lock_t, rev_tc_guard, reverse_lock_, false);
321  res = impl->accept_datalink(remote, attribs, client);
322  }
323 
324  //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
325  iter = pending_.find(data.remote_id_);
326 
327  if (iter == pending_.end()) {
328  //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
329  //active side connection and completed, thus pend was removed from pending_. Can return true.
330  return true;
331  }
332  pend = iter->second;
333 
334  if (res.success_) {
335  if (res.link_.is_nil()) {
336  // In this case, it may be waiting for the TCP connection to be
337  // established. Just wait without trying other transports.
338  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
339  } else {
340  use_datalink_i(data.remote_id_, res.link_, guard);
341  return true;
342  }
343  }
344  }
345  }
346  }
347 
348  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
349  }
350 
351  return true;
352 }
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
virtual SequenceNumber get_max_sn() const
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
virtual GUID_t get_guid() const =0
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
#define OPENDDS_STRING
virtual Priority get_priority_value(const AssociationData &data) const =0
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
RcHandle< PendingAssocTimer > pending_assoc_timer_
RcHandle< PendingAssoc > PendingAssoc_rch
ACE_TEXT("TCP_Factory")
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define VDBG_LVL(DBG_ARGS, LEVEL)
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)

◆ associated_with()

bool OpenDDS::DCPS::TransportClient::associated_with ( const GUID_t remote) const

Definition at line 1172 of file TransportClient.cpp.

References ACE_ERROR, data_link_index_, LM_ERROR, lock_, and ACE_Guard< ACE_LOCK >::locked().

1173 {
1175  if (!guard.locked()) {
1176  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::associated_with: "
1177  "lock failed\n"));
1178  return false;
1179  }
1180  return data_link_index_.count(remote);
1181 }
#define ACE_ERROR(X)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ cdr_encapsulation() [1/2]

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation ( ) const
inline

◆ cdr_encapsulation() [2/2]

void OpenDDS::DCPS::TransportClient::cdr_encapsulation ( bool  encap)
inlineprotected

Definition at line 150 of file TransportClient.h.

151  {
152  cdr_encapsulation_ = encap;
153  }

◆ check_transport_qos()

virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos ( const TransportInst inst)
privatepure virtual

◆ clean_prev_pending()

void OpenDDS::DCPS::TransportClient::clean_prev_pending ( )
private

Definition at line 72 of file TransportClient.cpp.

References prev_pending_.

Referenced by associate().

73 {
74  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end();) {
75  if (it->second->safe_to_remove()) {
76  prev_pending_.erase(it++);
77  } else {
78  ++it;
79  }
80  }
81 }

◆ connection_info()

const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info ( ) const
inline

◆ data_acked()

void OpenDDS::DCPS::TransportClient::data_acked ( const GUID_t remote)

Definition at line 1194 of file TransportClient.cpp.

References ACE_ERROR, get_send_listener(), LM_ERROR, lock_, and ACE_Guard< ACE_LOCK >::locked().

1195 {
1196  TransportSendListener_rch send_listener;
1197  {
1199  if (!guard.locked()) {
1200  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::data_acked: "
1201  "lock failed\n"));
1202  return;
1203  }
1204  send_listener = get_send_listener();
1205  }
1206  send_listener->data_acked(remote);
1207 }
#define ACE_ERROR(X)
RcHandle< TransportSendListener > TransportSendListener_rch
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
TransportSendListener_rch get_send_listener()

◆ disassociate()

void OpenDDS::DCPS::TransportClient::disassociate ( const GUID_t peerId)

Definition at line 693 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, lock_, OPENDDS_ASSERT, pending_, pending_assoc_timer_, prev_pending_, OpenDDS::DCPS::DataLink::release_reservations(), OpenDDS::DCPS::DataLinkSet::remove_link(), OpenDDS::DCPS::DataLink::remove_listener(), repo_id_, OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), and VDBG_LVL.

Referenced by OpenDDS::RTPS::disassociate_helper_extended(), OpenDDS::RTPS::Sedp::Endpoint::get_priority_value(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::DataWriterImpl::remove_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations_i(), and OpenDDS::DCPS::DataReaderImpl::remove_associations_i().

694 {
695  LogGuid peerId_log(peerId);
696  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
697  "TransportClient(%@) disassociating from %C\n",
698  this,
699  peerId_log.c_str()), 5);
700 
702 
703  PendingMap::iterator iter = pending_.find(peerId);
704  if (iter != pending_.end()) {
705  {
706  // The transport impl may have resource for a pending connection.
707  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
708  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
709  TransportImpl_rch impl = iter->second->impls_[i].lock();
710  if (impl) {
711  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
712  }
713  }
714  }
715  iter->second->reset_client();
716  pending_assoc_timer_->cancel_timer(iter->second);
717  prev_pending_.insert(std::make_pair(iter->first, iter->second));
718  pending_.erase(iter);
719  return;
720  }
721 
722  const DataLinkIndex::iterator found = data_link_index_.find(peerId);
723 
724  if (found == data_link_index_.end()) {
725  if (DCPS_debug_level > 4) {
726  const LogGuid log(peerId);
727  ACE_DEBUG((LM_DEBUG,
728  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
729  ACE_TEXT("no link for remote peer %C\n"),
730  log.c_str()));
731  }
732 
733  return;
734  }
735 
736  const DataLink_rch link = found->second;
737 
738  //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
739  //otherwise it could be removed in transport_detached()
740  data_link_index_.erase(found);
741  DataLinkSetMap released;
742 
743  if (DCPS_debug_level > 4) {
744  ACE_DEBUG((LM_DEBUG,
745  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
746  ACE_TEXT("about to release_reservations for link[%@]\n"),
747  link.in()));
748  }
749 
751  link->release_reservations(peerId, repo_id_, released);
752 
753  if (!released.empty()) {
754 
755  if (DCPS_debug_level > 4) {
756  ACE_DEBUG((LM_DEBUG,
757  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
758  ACE_TEXT("about to remove_link[%@] from links_\n"),
759  link.in()));
760  }
761  links_.remove_link(link);
762 
763  if (DCPS_debug_level > 4) {
764  LogGuid logger(repo_id_);
765  ACE_DEBUG((LM_DEBUG,
766  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
767  logger.c_str(),
768  link.in()));
769  }
770  // Datalink is no longer used for any remote peer by this TransportClient
771  link->remove_listener(repo_id_);
772 
773  }
774 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void remove_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:50
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< PendingAssocTimer > pending_assoc_timer_
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ domain_id()

virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id ( ) const
privatepure virtual

◆ enable_transport()

void OpenDDS::DCPS::TransportClient::enable_transport ( bool  reliable,
bool  durable 
)

Definition at line 84 of file TransportClient.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, OpenDDS::DCPS::TransportRegistry::domain_default_config(), domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::fix_empty_default(), OpenDDS::DCPS::TransportRegistry::global_config(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::TransportConfig::instances_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, OpenDDS::DCPS::TransportConfig::name(), and OpenDDS::DCPS::rchandle_from().

Referenced by OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().

85 {
86  // Search for a TransportConfig to use:
88 
89  // 1. If this object is an Entity, check if a TransportConfig has been
90  // bound either directly to this entity or to a parent entity.
91  for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
92  ent && tc.is_nil(); ent = ent->parent()) {
93  tc = ent->transport_config();
94  }
95 
96  if (tc.is_nil()) {
97  TransportRegistry* const reg = TransportRegistry::instance();
98  // 2. Check for a TransportConfig that is the default for this Domain.
99  tc = reg->domain_default_config(domain_id());
100 
101  if (tc.is_nil()) {
102  // 3. Use the global_config if one has been set.
103  tc = reg->global_config();
104 
105  if (!tc.is_nil() && tc->instances_.empty()
106  && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
107  // 4. Set the "fallback option" if the global_config is empty.
108  // (only applies if the user hasn't changed the global config)
109  tc = reg->fix_empty_default();
110  }
111  }
112  }
113 
114  if (tc.is_nil()) {
115  ACE_ERROR((LM_ERROR,
116  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
117  ACE_TEXT("No TransportConfig found.\n")));
118  throw Transport::NotConfigured();
119  }
120 
121  enable_transport_using_config(reliable, durable, tc);
122 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
static const char DEFAULT_CONFIG_NAME[]
RcHandle< TransportConfig > TransportConfig_rch
virtual DDS::DomainId_t domain_id() const =0
ACE_TEXT("TCP_Factory")
static TransportRegistry * instance()
Return a singleton instance of this class.
void enable_transport_using_config(bool reliable, bool durable, const TransportConfig_rch &tc)

◆ enable_transport_using_config()

void OpenDDS::DCPS::TransportClient::enable_transport_using_config ( bool  reliable,
bool  durable,
const TransportConfig_rch tc 
)

Definition at line 125 of file TransportClient.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), cdr_encapsulation_, check_transport_qos(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, OpenDDS::DCPS::TimeDuration::from_msec(), get_crypto_handle(), OpenDDS::DCPS::TransportInst::get_or_create_impl(), impls_, OpenDDS::DCPS::TransportConfig::instances_, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportImpl::local_crypto_handle(), OpenDDS::DCPS::TransportConfig::passive_connect_duration_, passive_connect_duration_, populate_connection_info(), reliable_, OpenDDS::DCPS::TransportInst::requires_cdr_encapsulation(), OpenDDS::DCPS::TransportConfig::swap_bytes_, and swap_bytes_.

Referenced by enable_transport(), OpenDDS::RTPS::Sedp::Endpoint::get_priority_value(), and OpenDDS::RTPS::Sedp::init().

127 {
128  config_ = tc;
129  swap_bytes_ = tc->swap_bytes_;
130  reliable_ = reliable;
131  durable_ = durable;
132  unsigned long duration = tc->passive_connect_duration_;
133  if (duration == 0) {
135  if (DCPS_debug_level) {
136  ACE_DEBUG((LM_WARNING,
137  ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
138  ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
139  ACE_TEXT("default value\n")));
140  }
141  }
143 
145 
146  const size_t n = tc->instances_.size();
147 
148  for (size_t i = 0; i < n; ++i) {
149  TransportInst_rch inst = tc->instances_[i];
150 
151  if (check_transport_qos(*inst)) {
152  TransportImpl_rch impl = inst->get_or_create_impl();
153 
154  if (impl) {
155  impls_.push_back(impl);
156 
157 #if defined(OPENDDS_SECURITY)
158  impl->local_crypto_handle(get_crypto_handle());
159 #endif
160 
161  cdr_encapsulation_ |= inst->requires_cdr_encapsulation();
162  }
163  }
164  }
165 
166  if (impls_.empty()) {
167  ACE_ERROR((LM_ERROR,
168  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
169  ACE_TEXT("No TransportImpl could be created.\n")));
170  throw Transport::NotConfigured();
171  }
172 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
virtual bool check_transport_qos(const TransportInst &inst)=0
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
static TimeDuration from_msec(const ACE_UINT64 &ms)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
static const unsigned long DEFAULT_PASSIVE_CONNECT_DURATION

◆ get_builtin_subscriber_proxy()

virtual RcHandle<BitSubscriber> OpenDDS::DCPS::TransportClient::get_builtin_subscriber_proxy ( ) const
inlinevirtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Definition at line 132 of file TransportClient.h.

132 { return RcHandle<BitSubscriber>(); }

◆ get_crypto_handle()

virtual DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::TransportClient::get_crypto_handle ( ) const
inlineprivatevirtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Definition at line 167 of file TransportClient.h.

References DDS::HANDLE_NIL.

Referenced by enable_transport_using_config().

168  {
169  return DDS::HANDLE_NIL;
170  }
const InstanceHandle_t HANDLE_NIL

◆ get_guid()

virtual GUID_t OpenDDS::DCPS::TransportClient::get_guid ( ) const
pure virtual

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::TransportClient::get_ice_endpoint ( )

Definition at line 880 of file TransportClient.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::TransportImpl::get_ice_endpoint(), impls_, and lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::get_ice_endpoint(), OpenDDS::DCPS::DataReaderImpl::get_ice_endpoint(), and OpenDDS::RTPS::Sedp::DiscoveryWriter::write_dcps_participant_secure().

881 {
882  // The one-to-many relationship with impls implies that this should
883  // return a set of endpoints instead of a single endpoint or null.
884  // For now, we will assume a single impl.
885 
886  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, WeakRcHandle<ICE::Endpoint>());
887  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
888  pos != limit;
889  ++pos) {
890  TransportImpl_rch impl = pos->lock();
891  if (impl) {
892  WeakRcHandle<ICE::Endpoint> endpoint = impl->get_ice_endpoint();
893  if (endpoint) { return endpoint; }
894  }
895  }
896 
897  return WeakRcHandle<ICE::Endpoint>();
898 }
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()

◆ get_max_sn()

virtual SequenceNumber OpenDDS::DCPS::TransportClient::get_max_sn ( ) const
inlineprivatevirtual

◆ get_priority_value()

virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value ( const AssociationData data) const
privatepure virtual

◆ get_receive_listener()

TransportReceiveListener_rch OpenDDS::DCPS::TransportClient::get_receive_listener ( )
private

Definition at line 1114 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link().

1115 {
1116  return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
1117 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ get_send_listener()

TransportSendListener_rch OpenDDS::DCPS::TransportClient::get_send_listener ( )
private

Definition at line 1108 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link(), data_acked(), send_control(), and send_control_to().

1109 {
1110  return rchandle_from(dynamic_cast<TransportSendListener*>(this));
1111 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ initiate_connect_i()

bool OpenDDS::DCPS::TransportClient::initiate_connect_i ( TransportImpl::AcceptConnectResult result,
TransportImpl_rch  impl,
const TransportImpl::RemoteTransport remote,
const TransportImpl::ConnectionAttribs attribs_,
Guard guard 
)
private

Definition at line 386 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::TransportImpl::connect_datalink(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Guard< ACE_LOCK >::locked(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().

391 {
392  if (!guard.locked()) {
393  //don't own the lock_ so can't release it...shouldn't happen
394  LogGuid local_log(repo_id_);
395  LogGuid remote_log(remote.repo_id_);
396  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i ")
397  ACE_TEXT("between local %C and remote %C unsuccessful because ")
398  ACE_TEXT("guard was not locked\n"),
399  local_log.c_str(),
400  remote_log.c_str()), 0);
401  return false;
402  }
403 
404  {
405  //can't call connect while holding lock due to possible reactor deadlock
406  LogGuid local_log(repo_id_);
407  LogGuid remote_log(remote.repo_id_);
408  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
409  "attempt to connect_datalink between local %C and remote %C\n",
410  local_log.c_str(),
411  remote_log.c_str()), 0);
412  {
413  TransportImpl::ConnectionAttribs attribs = attribs_;
414  RcHandle<TransportClient> client = rchandle_from(this);
415  ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
416  result = impl->connect_datalink(remote, attribs, client);
417  }
418  if (!result.success_) {
419  if (DCPS_debug_level) {
420  LogGuid writer_log(repo_id_);
421  LogGuid reader_log(remote.repo_id_);
422  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i - ")
423  ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
424  writer_log.c_str(),
425  reader_log.c_str()));
426  }
427  return false;
428  }
429  }
430 
431  LogGuid local_log(repo_id_);
432  LogGuid remote_log(remote.repo_id_);
433  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
434  "connection between local %C and remote %C initiation successful\n",
435  local_log.c_str(),
436  remote_log.c_str()), 0);
437  return true;
438 }
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ is_leading()

bool OpenDDS::DCPS::TransportClient::is_leading ( const GUID_t reader_id) const

Definition at line 1209 of file TransportClient.cpp.

References get_guid(), OpenDDS::DCPS::DataLinkSet::is_leading(), links_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

1210 {
1211  return links_.is_leading(get_guid(), reader_id);
1212 }
virtual GUID_t get_guid() const =0
bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const

◆ is_reliable()

bool OpenDDS::DCPS::TransportClient::is_reliable ( ) const
inline

Definition at line 70 of file TransportClient.h.

References header, and send().

◆ OPENDDS_MAP_CMP() [1/2]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( GUID_t  ,
DataLink_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [2/2]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( GUID_t  ,
PendingAssoc_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP()

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
PendingAssoc_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR ( WeakRcHandle< TransportImpl )
private

◆ pending_association_with()

bool OpenDDS::DCPS::TransportClient::pending_association_with ( const GUID_t remote) const

Definition at line 1183 of file TransportClient.cpp.

References ACE_ERROR, LM_ERROR, lock_, ACE_Guard< ACE_LOCK >::locked(), and pending_.

1184 {
1186  if (!guard.locked()) {
1187  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::pending_association_with: "
1188  "lock failed\n"));
1189  return false;
1190  }
1191  return pending_.count(remote);
1192 }
#define ACE_ERROR(X)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ populate_connection_info()

void OpenDDS::DCPS::TransportClient::populate_connection_info ( )

Definition at line 175 of file TransportClient.cpp.

References ACE_ERROR, ACE_TEXT(), check_transport_qos(), config_, conn_info_, OpenDDS::DCPS::TransportImpl::connection_info(), OpenDDS::DCPS::CONNINFO_ALL, OpenDDS::DCPS::TransportInst::get_or_create_impl(), OpenDDS::DCPS::grow(), OpenDDS::DCPS::TransportConfig::instances_, and LM_ERROR.

Referenced by enable_transport_using_config(), and OpenDDS::DCPS::DataWriterImpl::transport_discovery_change().

176 {
177  conn_info_.length(0);
178 
179  const size_t n = config_->instances_.size();
180  for (size_t i = 0; i < n; ++i) {
182  if (check_transport_qos(*inst)) {
183  TransportImpl_rch impl = inst->get_or_create_impl();
184  if (impl) {
185  const CORBA::ULong idx = DCPS::grow(conn_info_) - 1;
186  impl->connection_info(conn_info_[idx], CONNINFO_ALL);
187  }
188  }
189  }
190 
191  if (conn_info_.length() == 0) {
192  ACE_ERROR((LM_ERROR,
193  ACE_TEXT("(%P|%t) TransportClient::populate_connection_info: ")
194  ACE_TEXT("No connection info\n")));
195  }
196 }
#define ACE_ERROR(X)
virtual bool check_transport_qos(const TransportInst &inst)=0
static const ConnectionInfoFlags CONNINFO_ALL
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_CDR::ULong ULong
TransportLocatorSeq conn_info_
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
ACE_TEXT("TCP_Factory")

◆ register_for_reader()

void OpenDDS::DCPS::TransportClient::register_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid,
const TransportLocatorSeq locators,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 797 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::register_for_reader().

Referenced by OpenDDS::DCPS::ReplayerImpl::register_for_reader(), and OpenDDS::DCPS::DataWriterImpl::register_for_reader().

802 {
804  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
805  pos != limit;
806  ++pos) {
807  TransportImpl_rch impl = pos->lock();
808  if (impl) {
809  impl->register_for_reader(participant, writerid, readerid, locators, listener);
810  }
811  }
812 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void register_for_reader(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
Definition: TransportImpl.h:87

◆ register_for_writer()

void OpenDDS::DCPS::TransportClient::register_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)

Definition at line 831 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::register_for_writer().

Referenced by OpenDDS::DCPS::RecorderImpl::register_for_writer(), and OpenDDS::DCPS::DataReaderImpl::register_for_writer().

836 {
838  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
839  pos != limit;
840  ++pos) {
841  TransportImpl_rch impl = pos->lock();
842  if (impl) {
843  impl->register_for_writer(participant, readerid, writerid, locators, listener);
844  }
845  }
846 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
Definition: TransportImpl.h:97

◆ remove_all_msgs()

bool OpenDDS::DCPS::TransportClient::remove_all_msgs ( )

Definition at line 1159 of file TransportClient.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, links_, OpenDDS::DCPS::DataLinkSet::remove_all_msgs(), and repo_id_.

Referenced by OpenDDS::DCPS::WriteDataContainer::unregister_all().

1160 {
1161  if (repo_id_ == GUID_UNKNOWN) {
1162  return true;
1163  }
1165 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool remove_all_msgs(const GUID_t &pub_id)

◆ remove_sample()

bool OpenDDS::DCPS::TransportClient::remove_sample ( const DataSampleElement sample)

Definition at line 1153 of file TransportClient.cpp.

References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().

Referenced by OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample().

1154 {
1155  return links_.remove_sample(sample);
1156 }
bool remove_sample(const DataSampleElement *sample)

◆ repo_id()

GUID_t OpenDDS::DCPS::TransportClient::repo_id ( ) const
inline

Definition at line 139 of file TransportClient.h.

References lock_.

Referenced by associate(), OpenDDS::DCPS::DataReaderImpl::data_received(), send_i(), transport_stop(), and OpenDDS::DCPS::ReplayerImpl::write().

140  {
142  return repo_id_;
143  }
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ send()

void OpenDDS::DCPS::TransportClient::send ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id = 0 
)

Definition at line 927 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::RTPS::Sedp::Writer::send_sample_i(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), and OpenDDS::DCPS::DataWriterImpl::write().

928 {
929  if (send_list.head() == 0) {
930  return;
931  }
932  ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
933  send_i(send_list, transaction_id);
934 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
ACE_Thread_Mutex send_transaction_lock_

◆ send_control()

SendControlStatus OpenDDS::DCPS::TransportClient::send_control ( const DataSampleHeader header,
Message_Block_Ptr  msg 
)

Definition at line 1120 of file TransportClient.cpp.

References get_send_listener(), OpenDDS::DCPS::GUID_UNKNOWN, links_, OpenDDS::DCPS::move(), repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.

Referenced by OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::RTPS::Sedp::Writer::write_control_msg().

1122 {
1123  if (repo_id_ == GUID_UNKNOWN) {
1124  return SEND_CONTROL_OK;
1125  }
1127 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void send_control(DataSampleElement *sample)
Send a control message that is wrapped in a DataSampleElement.
Definition: DataLinkSet.inl:81
TransportSendListener_rch get_send_listener()

◆ send_control_to()

SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to ( const DataSampleHeader header,
Message_Block_Ptr  msg,
const GUID_t destination 
)

Definition at line 1130 of file TransportClient.cpp.

References ACE_GUARD_RETURN, data_link_index_, get_send_listener(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLinkSet::insert_link(), lock_, OpenDDS::DCPS::move(), repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::SEND_CONTROL_OK.

Referenced by send_w_control().

1133 {
1134  if (repo_id_ == GUID_UNKNOWN) {
1135  return SEND_CONTROL_OK;
1136  }
1137 
1138  DataLinkSet singular;
1139  {
1141  DataLinkIndex::iterator found = data_link_index_.find(destination);
1142 
1143  if (found == data_link_index_.end()) {
1144  return SEND_CONTROL_ERROR;
1145  }
1146 
1147  singular.insert_link(found->second);
1148  }
1149  return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
1150 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
TransportSendListener_rch get_send_listener()

◆ send_final_acks()

void OpenDDS::DCPS::TransportClient::send_final_acks ( )

Definition at line 687 of file TransportClient.cpp.

References get_guid(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().

688 {
690 }
virtual GUID_t get_guid() const =0
void send_final_acks(const GUID_t &readerid)

◆ send_i()

void OpenDDS::DCPS::TransportClient::send_i ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id 
)
private

Definition at line 951 of file TransportClient.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, LM_DEBUG, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, repo_id(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.

Referenced by send(), and send_w_control().

952 {
953  if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
954  if (transaction_id > max_transaction_id_seen_) {
955  max_transaction_id_seen_ = transaction_id;
956  max_transaction_tail_ = send_list.tail();
957  }
958  return;
959  } else /* transaction_id == expected_transaction_id */ {
960 
961  DataSampleElement* cur = send_list.head();
962  if (max_transaction_tail_ == 0) {
963  //Means no future transaction beat this transaction into send
964  if (transaction_id != 0)
966  // Only send this current transaction
967  max_transaction_tail_ = send_list.tail();
968  }
969  DataLinkSet send_links;
970 
971  while (cur != 0) {
972  // VERY IMPORTANT NOTE:
973  //
974  // We have to be very careful in how we deal with the current
975  // DataSampleElement. The issue is that once we have invoked
976  // data_delivered() on the send_listener_ object, or we have invoked
977  // send() on the pub_links, we can no longer access the current
978  // DataSampleElement!Thus, we need to get the next
979  // DataSampleElement (pointer) from the current element now,
980  // while it is safe.
981  DataSampleElement* next_elem;
982  if (cur != max_transaction_tail_) {
983  next_elem = cur->get_next_send_sample();
984  } else {
985  next_elem = max_transaction_tail_;
986  }
987  DataLinkSet_rch pub_links =
988  (cur->get_num_subs() > 0)
989  ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
990  : DataLinkSet_rch(&links_, inc_count());
991 
992  if (pub_links.is_nil() || pub_links->empty()) {
993  // NOTE: This is the "local publisher id is not currently
994  // associated with any remote subscriber ids" case.
995 
996  if (DCPS_debug_level > 4) {
997  LogGuid logger(cur->get_pub_id());
998  ACE_DEBUG((LM_DEBUG,
999  ACE_TEXT("(%P|%t) TransportClient::send_i: ")
1000  ACE_TEXT("no links for publication %C, ")
1001  ACE_TEXT("not sending element %@ for transaction: %d.\n"),
1002  logger.c_str(),
1003  cur,
1004  cur->transaction_id()));
1005  }
1006 
1007  // We tell the send_listener_ that all of the remote subscriber ids
1008  // that wanted the data (all zero of them) have indeed received
1009  // the data.
1010  cur->get_send_listener()->data_delivered(cur);
1011 
1012  } else {
1013  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
1014  , cur), 5);
1015 
1016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1017 
1018  // Content-Filtering adjustment to the pub_links:
1019  // - If the sample should be filtered out of all subscriptions on a given
1020  // DataLink, then exclude that link from the subset that we'll send to.
1021  // - If the sample should be filtered out of some (or none) of the subs,
1022  // then record that information in the DataSampleElement so that the
1023  // header's content_filter_entries_ can be marshaled before it's sent.
1024  if (cur->filter_out_.ptr()) {
1025  DataLinkSet_rch subset;
1026  DataLinkSet::GuardType guard(pub_links->lock());
1027  typedef DataLinkSet::MapType MapType;
1028  MapType& map = pub_links->map();
1029 
1030  for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
1031  size_t n_subs;
1032  GUIDSeq_var ti =
1033  itr->second->target_intersection(cur->get_pub_id(),
1034  cur->filter_out_.in(), n_subs);
1035 
1036  if (ti.ptr() == 0 || ti->length() != n_subs) {
1037  if (!subset.in()) {
1038  subset = make_rch<DataLinkSet>();
1039  }
1040 
1041  subset->insert_link(itr->second);
1042  cur->filter_per_link_[itr->first] = ti._retn();
1043 
1044  } else {
1045  VDBG((LM_DEBUG,
1046  "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
1047  itr->second.in()));
1048  }
1049  }
1050 
1051  if (!subset.in()) {
1052  guard.release();
1053  VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
1054  // similar to the "if (pub_links.is_nil())" case above, no links
1055  cur->get_send_listener()->data_delivered(cur);
1056  if (cur != max_transaction_tail_) {
1057  // Move on to the next DataSampleElement to send.
1058  cur = next_elem;
1059  continue;
1060  } else {
1061  break;
1062  }
1063  }
1064 
1065  pub_links = subset;
1066  }
1067 
1068 #endif
1069 
1070  // This will do several things, including adding to the membership
1071  // of the send_links set. Any DataLinks added to the send_links
1072  // set will be also told about the send_start() event. Those
1073  // DataLinks (in the pub_links set) that are already in the
1074  // send_links set will not be told about the send_start() event
1075  // since they heard about it when they were inserted into the
1076  // send_links set.
1077  send_links.send_start(pub_links.in());
1078  if (cur->get_header().message_id_ != SAMPLE_DATA) {
1079  pub_links->send_control(cur);
1080  } else {
1081  pub_links->send(cur);
1082  }
1083  }
1084  if (cur != max_transaction_tail_) {
1085  // Move on to the next DataSampleElement to send.
1086  cur = next_elem;
1087  } else {
1088  break;
1089  }
1090  }
1091 
1092  // This will inform each DataLink in the set about the stop_send() event.
1093  // It will then clear the send_links_ set.
1094  //
1095  // The reason that the send_links_ set is cleared is because we continually
1096  // reuse the same send_links_ object over and over for each call to this
1097  // send method.
1098  GUID_t pub_id = repo_id();
1099  send_links.send_stop(pub_id);
1100  if (transaction_id != 0) {
1102  }
1104  }
1105 }
#define ACE_DEBUG(X)
DataSampleElement * get_next_send_sample() const
DataSampleElement * max_transaction_tail_
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLinkSet.h:27
#define VDBG(DBG_ARGS)
DataLinkSet_rch select_links(const GUID_t *remoteIds, const CORBA::ULong num_targets)
Definition: DataLinkSet.cpp:64
ACE_Guard< LockType > GuardType
Definition: DataLinkSet.h:78
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ send_response()

bool OpenDDS::DCPS::TransportClient::send_response ( const GUID_t peer,
const DataSampleHeader header,
Message_Block_Ptr  payload 
)

Definition at line 901 of file TransportClient.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataLinkSet::insert_link(), LM_DEBUG, OpenDDS::DCPS::move(), and OpenDDS::DCPS::DataLinkSet::send_response().

904 {
905  DataLinkIndex::iterator found = data_link_index_.find(peer);
906 
907  if (found == data_link_index_.end()) {
908  if (DCPS_debug_level > 4) {
909  LogGuid logger(peer);
910  ACE_DEBUG((LM_DEBUG,
911  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
912  ACE_TEXT("no link for publication %C, ")
913  ACE_TEXT("not sending response.\n"),
914  logger.c_str()));
915  }
916 
917  return false;
918  }
919 
920  DataLinkSet singular;
921  singular.insert_link(found->second);
922  singular.send_response(peer, header, move(payload));
923  return true;
924 }
#define ACE_DEBUG(X)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ send_w_control()

SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control ( SendStateDataSampleList  send_list,
const DataSampleHeader header,
Message_Block_Ptr  msg,
const GUID_t destination 
)

Definition at line 937 of file TransportClient.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), and OpenDDS::DCPS::DataWriterImpl::replay_durable_data_for().

941 {
942  ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
944  if (send_list.head()) {
945  send_i(send_list, 0);
946  }
947  return send_control_to(header, move(msg), destination);
948 }
SendControlStatus send_control_to(const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
ACE_Thread_Mutex send_transaction_lock_

◆ stop_associating() [1/2]

void OpenDDS::DCPS::TransportClient::stop_associating ( )

Definition at line 635 of file TransportClient.cpp.

References ACE_GUARD, lock_, pending_, pending_assoc_timer_, prev_pending_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().

Referenced by OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::DataReaderImpl::remove_all_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::DataWriterImpl::remove_associations(), OpenDDS::DCPS::DataReaderImpl::remove_associations(), and ~TransportClient().

636 {
638  for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
639  {
640  // The transport impl may have resource for a pending connection.
641  ACE_Guard<ACE_Thread_Mutex> guard(it->second->mutex_);
642  for (size_t i = 0; i < it->second->impls_.size(); ++i) {
643  TransportImpl_rch impl = it->second->impls_[i].lock();
644  if (impl) {
645  impl->stop_accepting_or_connecting(*this, it->second->data_.remote_id_, true, true);
646  }
647  }
648  }
649  it->second->reset_client();
650  pending_assoc_timer_->cancel_timer(it->second);
651  prev_pending_.insert(std::make_pair(it->first, it->second));
652  }
653  pending_.clear();
654 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< PendingAssocTimer > pending_assoc_timer_

◆ stop_associating() [2/2]

void OpenDDS::DCPS::TransportClient::stop_associating ( const GUID_t repos,
CORBA::ULong  length 
)

Definition at line 657 of file TransportClient.cpp.

References ACE_GUARD, lock_, pending_, pending_assoc_timer_, prev_pending_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().

658 {
660 
661  if (repos == 0 || length == 0) {
662  return;
663  } else {
664  for (CORBA::ULong i = 0; i < length; ++i) {
665  PendingMap::iterator iter = pending_.find(repos[i]);
666  if (iter != pending_.end()) {
667  {
668  // The transport impl may have resource for a pending connection.
669  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
670  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
671  TransportImpl_rch impl = iter->second->impls_[i].lock();
672  if (impl) {
673  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
674  }
675  }
676  }
677  iter->second->reset_client();
678  pending_assoc_timer_->cancel_timer(iter->second);
679  prev_pending_.insert(std::make_pair(iter->first, iter->second));
680  pending_.erase(iter);
681  }
682  }
683  }
684 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_CDR::ULong ULong
RcHandle< PendingAssocTimer > pending_assoc_timer_

◆ swap_bytes()

bool OpenDDS::DCPS::TransportClient::swap_bytes ( ) const
inline

◆ terminate_send_if_suspended()

void OpenDDS::DCPS::TransportClient::terminate_send_if_suspended ( )

◆ transport_assoc_done()

virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done ( int  ,
const GUID_t  
)
inlineprivatevirtual

Reimplemented in OpenDDS::RTPS::Sedp::Writer, OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.

Definition at line 161 of file TransportClient.h.

Referenced by use_datalink_i().

161 {}

◆ transport_stop()

void OpenDDS::DCPS::TransportClient::transport_stop ( )

Definition at line 776 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransportImpl::client_stop(), OpenDDS::DCPS::GUID_UNKNOWN, impls_, lock_, repo_id(), and repo_id_.

Referenced by OpenDDS::DCPS::RecorderImpl::remove_all_associations(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), and OpenDDS::DCPS::DataReaderImpl::remove_all_associations().

777 {
779  const ImplsType impls = impls_;
780  const GUID_t repo_id = repo_id_;
781  guard.release();
782 
783  if (repo_id == GUID_UNKNOWN) {
784  // Not associated so nothing to stop.
785  return;
786  }
787 
788  for (size_t i = 0; i < impls.size(); ++i) {
789  const TransportImpl_rch impl = impls[i].lock();
790  if (impl) {
791  impl->client_stop(repo_id);
792  }
793  }
794 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual void client_stop(const GUID_t &)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.

◆ unregister_for_reader()

void OpenDDS::DCPS::TransportClient::unregister_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid 
)

Definition at line 815 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::unregister_for_reader().

Referenced by OpenDDS::DCPS::ReplayerImpl::unregister_for_reader(), and OpenDDS::DCPS::DataWriterImpl::unregister_for_reader().

818 {
820  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
821  pos != limit;
822  ++pos) {
823  TransportImpl_rch impl = pos->lock();
824  if (impl) {
825  impl->unregister_for_reader(participant, writerid, readerid);
826  }
827  }
828 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void unregister_for_reader(const GUID_t &, const GUID_t &, const GUID_t &)
Definition: TransportImpl.h:93

◆ unregister_for_writer()

void OpenDDS::DCPS::TransportClient::unregister_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid 
)

Definition at line 849 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::unregister_for_writer().

Referenced by OpenDDS::DCPS::RecorderImpl::unregister_for_writer(), and OpenDDS::DCPS::DataReaderImpl::unregister_for_writer().

852 {
854  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
855  pos != limit;
856  ++pos) {
857  TransportImpl_rch impl = pos->lock();
858  if (impl) {
859  impl->unregister_for_writer(participant, readerid, writerid);
860  }
861  }
862 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)

◆ update_locators()

void OpenDDS::DCPS::TransportClient::update_locators ( const GUID_t remote,
const TransportLocatorSeq locators 
)

Definition at line 865 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::update_locators().

Referenced by OpenDDS::DCPS::DataWriterImpl::update_locators(), and OpenDDS::DCPS::DataReaderImpl::update_locators().

867 {
869  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
870  pos != limit;
871  ++pos) {
872  TransportImpl_rch impl = pos->lock();
873  if (impl) {
874  impl->update_locators(remote, locators);
875  }
876  }
877 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)

◆ use_datalink()

void OpenDDS::DCPS::TransportClient::use_datalink ( const GUID_t remote_id,
const DataLink_rch link 
)

Definition at line 528 of file TransportClient.cpp.

References ACE_GUARD, lock_, and use_datalink_i().

530 {
532 
533  use_datalink_i(remote_id, link, guard);
534 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)

◆ use_datalink_i()

void OpenDDS::DCPS::TransportClient::use_datalink_i ( const GUID_t remote_id,
const DataLink_rch link,
Guard guard 
)
private

Definition at line 537 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransportClient::PendingAssoc::active_, add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::TransportClient::PendingAssoc::mutex_, pending_, pending_assoc_timer_, prev_pending_, ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::TransportClient::PendingAssoc::reset_client(), OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), transport_assoc_done(), and VDBG_LVL.

Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().

540 {
541  // Try to make a local copy of remote_id to use in calls
542  // because the reference could be invalidated if the caller
543  // reference location is deleted (i.e. in stop_accepting_or_connecting
544  // if use_datalink_i was called from passive_connection)
545  // Does changing this from a reference to a local affect anything going forward?
546  GUID_t remote_id(remote_id_ref);
547 
548  LogGuid peerId_log(remote_id);
549  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
550  "TransportClient(%@) using datalink[%@] from %C\n",
551  this,
552  link.in(),
553  peerId_log.c_str()), 0);
554 
555  PendingMap::iterator iter = pending_.find(remote_id);
556 
557  if (iter == pending_.end()) {
558  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
559  "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
560  this,
561  link.in(),
562  peerId_log.c_str()), 0);
563  return;
564  }
565 
566  PendingAssoc_rch pend = iter->second;
567  ACE_GUARD(ACE_Thread_Mutex, pend_guard, pend->mutex_);
568  const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
569  bool ok = false;
570 
571  if (link.is_nil()) {
572 
573  if (pend->active_ && pend->initiate_connect(this, guard)) {
574  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
575  "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
576  this,
577  link.in(),
578  peerId_log.c_str()), 0);
579  return;
580  }
581 
582  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
583  "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
584  this,
585  link.in(),
586  peerId_log.c_str()), 0);
587  } else { // link is ready to use
588  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
589  "TransportClient(%@) about to add_link[%@] to remote: %C\n",
590  this,
591  link.in(),
592  peerId_log.c_str()), 0);
593 
594  add_link(link, remote_id);
595  ok = true;
596  }
597 
598  // either link is valid or assoc failed, clean up pending object
599  for (size_t i = 0; i < pend->impls_.size(); ++i) {
600  TransportImpl_rch impl = pend->impls_[i].lock();
601  if (impl) {
602  impl->stop_accepting_or_connecting(*this, pend->data_.remote_id_, false, !ok);
603  }
604  }
605 
606  pend_guard.release();
607  pend->reset_client();
608  pending_assoc_timer_->cancel_timer(pend);
609  prev_pending_.insert(std::make_pair(iter->first, iter->second));
610  pending_.erase(iter);
611 
612  // Release TransportClient's lock as we're done updating its data.
613  guard.release();
614 
615  transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
616 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
RcHandle< PendingAssocTimer > pending_assoc_timer_
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
virtual void transport_assoc_done(int, const GUID_t &)
RcHandle< PendingAssoc > PendingAssoc_rch
#define VDBG_LVL(DBG_ARGS, LEVEL)

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 194 of file TransportClient.h.

Member Data Documentation

◆ cdr_encapsulation_

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation_
private

Definition at line 340 of file TransportClient.h.

Referenced by enable_transport_using_config().

◆ config_

TransportConfig_rch OpenDDS::DCPS::TransportClient::config_
private

Definition at line 316 of file TransportClient.h.

Referenced by enable_transport_using_config(), and populate_connection_info().

◆ conn_info_

TransportLocatorSeq OpenDDS::DCPS::TransportClient::conn_info_
private

Definition at line 344 of file TransportClient.h.

Referenced by populate_connection_info().

◆ data_link_index_

DataLinkIndex OpenDDS::DCPS::TransportClient::data_link_index_
private

◆ durable_

bool OpenDDS::DCPS::TransportClient::durable_
private

◆ expected_transaction_id_

ACE_UINT64 OpenDDS::DCPS::TransportClient::expected_transaction_id_
private

Definition at line 328 of file TransportClient.h.

Referenced by send_i().

◆ impls_

ImplsType OpenDDS::DCPS::TransportClient::impls_
private

◆ links_

DataLinkSet OpenDDS::DCPS::TransportClient::links_
private

◆ lock_

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::lock_
mutableprivate

◆ max_transaction_id_seen_

ACE_UINT64 OpenDDS::DCPS::TransportClient::max_transaction_id_seen_
private

Definition at line 329 of file TransportClient.h.

Referenced by send_i().

◆ max_transaction_tail_

DataSampleElement* OpenDDS::DCPS::TransportClient::max_transaction_tail_
private

Definition at line 336 of file TransportClient.h.

Referenced by send_i().

◆ passive_connect_duration_

TimeDuration OpenDDS::DCPS::TransportClient::passive_connect_duration_
private

Definition at line 342 of file TransportClient.h.

Referenced by enable_transport_using_config().

◆ pending_

PendingMap OpenDDS::DCPS::TransportClient::pending_
private

◆ pending_assoc_timer_

RcHandle<PendingAssocTimer> OpenDDS::DCPS::TransportClient::pending_assoc_timer_
private

Definition at line 312 of file TransportClient.h.

Referenced by associate(), disassociate(), stop_associating(), and use_datalink_i().

◆ prev_pending_

PrevPendingMap OpenDDS::DCPS::TransportClient::prev_pending_
private

◆ reliable_

bool OpenDDS::DCPS::TransportClient::reliable_
private

Definition at line 340 of file TransportClient.h.

Referenced by add_link(), associate(), and enable_transport_using_config().

◆ repo_id_

GUID_t OpenDDS::DCPS::TransportClient::repo_id_
private

◆ reverse_lock_

Reverse_Lock_t OpenDDS::DCPS::TransportClient::reverse_lock_
private

Definition at line 349 of file TransportClient.h.

Referenced by associate(), and initiate_connect_i().

◆ send_transaction_lock_

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::send_transaction_lock_
private

Definition at line 327 of file TransportClient.h.

Referenced by send(), and send_w_control().

◆ swap_bytes_

bool OpenDDS::DCPS::TransportClient::swap_bytes_
private

Definition at line 340 of file TransportClient.h.

Referenced by enable_transport_using_config().


The documentation for this class was generated from the following files: