OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::TransportClient Class Referenceabstract

Mix-in class for DDS entities which directly use the transport layer. More...

#include <TransportClient.h>

Inheritance diagram for OpenDDS::DCPS::TransportClient:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportClient:
Collaboration graph
[legend]

Classes

struct  PendingAssoc
 
class  PendingAssocTimer
 

Public Types

enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
virtual GUID_t get_guid () const =0
 
virtual RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Types

typedef ACE_Guard< ACE_Thread_MutexGuard
 
typedef ACE_Reverse_Lock< ACE_Thread_MutexReverse_Lock_t
 
typedef RcHandle< PendingAssocPendingAssoc_rch
 

Private Member Functions

virtual bool check_transport_qos (const TransportInst &inst)=0
 
virtual DDS::DomainId_t domain_id () const =0
 
virtual Priority get_priority_value (const AssociationData &data) const =0
 
virtual void transport_assoc_done (int, const GUID_t &)
 
virtual SequenceNumber get_max_sn () const
 
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle () const
 
void use_datalink_i (const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)
 
TransportSendListener_rch get_send_listener ()
 
TransportReceiveListener_rch get_receive_listener ()
 
bool initiate_connect_i (TransportImpl::AcceptConnectResult &result, TransportImpl_rch impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
 
void send_i (SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
 
typedef OPENDDS_MAP_CMP (GUID_t, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex
 
typedef OPENDDS_VECTOR (WeakRcHandle< TransportImpl >) ImplsType
 
typedef OPENDDS_MAP_CMP (GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PrevPendingMap
 
void clean_prev_pending ()
 

Private Attributes

RcHandle< PendingAssocTimerpending_assoc_timer_
 
TransportConfig_rch config_
 
ImplsType impls_
 
PendingMap pending_
 
PrevPendingMap prev_pending_
 
DataLinkSet links_
 
DataLinkIndex data_link_index_
 
ACE_Thread_Mutex send_transaction_lock_
 
ACE_UINT64 expected_transaction_id_
 
ACE_UINT64 max_transaction_id_seen_
 
DataSampleElementmax_transaction_tail_
 
bool swap_bytes_
 
bool cdr_encapsulation_
 
bool reliable_
 
bool durable_
 
TimeDuration passive_connect_duration_
 
TransportLocatorSeq conn_info_
 
ACE_Thread_Mutex lock_
 Seems to protect accesses to impls_, pending_, links_, data_link_index_. More...
 
Reverse_Lock_t reverse_lock_
 
GUID_t repo_id_
 

Friends

class ::DDS_TEST
 

Detailed Description

Mix-in class for DDS entities which directly use the transport layer.

DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.

Definition at line 47 of file TransportClient.h.

Member Typedef Documentation

◆ Guard

Definition at line 174 of file TransportClient.h.

◆ PendingAssoc_rch

Definition at line 222 of file TransportClient.h.

◆ Reverse_Lock_t

Definition at line 199 of file TransportClient.h.

Member Enumeration Documentation

◆ anonymous enum

anonymous enum
Enumerator
ASSOC_OK 
ASSOC_ACTIVE 

Definition at line 55 of file TransportClient.h.

Constructor & Destructor Documentation

◆ TransportClient()

OpenDDS::DCPS::TransportClient::TransportClient ( )

Definition at line 34 of file TransportClient.cpp.

35  : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
39  , swap_bytes_(false)
40  , cdr_encapsulation_(false)
41  , reliable_(false)
42  , durable_(false)
45 {
46 }
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< PendingAssocTimer > pending_assoc_timer_
DataSampleElement * max_transaction_tail_
#define TheServiceParticipant

◆ ~TransportClient()

OpenDDS::DCPS::TransportClient::~TransportClient ( )
virtual

Definition at line 48 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), impls_, LM_DEBUG, lock_, prev_pending_, repo_id_, OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), stop_associating(), and OpenDDS::DCPS::Transport_debug_level.

49 {
50  if (Transport_debug_level > 5) {
51  LogGuid logger(repo_id_);
52  ACE_DEBUG((LM_DEBUG,
53  ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
54  logger.c_str()));
55  }
56 
58 
60 
61  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end(); ++it) {
62  for (size_t i = 0; i < impls_.size(); ++i) {
63  TransportImpl_rch impl = impls_[i].lock();
64  if (impl) {
65  impl->stop_accepting_or_connecting(it->second->client_, it->second->data_.remote_id_, false, false);
66  }
67  }
68  }
69 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
ACE_TEXT("TCP_Factory")

Member Function Documentation

◆ add_link()

void OpenDDS::DCPS::TransportClient::add_link ( const DataLink_rch link,
const GUID_t peer 
)
virtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 617 of file TransportClient.cpp.

References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), links_, OpenDDS::DCPS::DataLink::make_reservation(), reliable_, and repo_id_.

Referenced by OpenDDS::DCPS::DataReaderImpl::add_link(), and use_datalink_i().

618 {
619  links_.insert_link(link);
620  data_link_index_[peer] = link;
621 
623 
624  if (trl) {
625  link->make_reservation(peer, repo_id_, trl, reliable_);
626 
627  } else {
628  link->make_reservation(peer, repo_id_, get_send_listener(), reliable_);
629  }
630 }
TransportSendListener_rch get_send_listener()
int insert_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:42
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
TransportReceiveListener_rch get_receive_listener()

◆ associate()

bool OpenDDS::DCPS::TransportClient::associate ( const AssociationData peer,
bool  active 
)

Definition at line 199 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportImpl::accept_datalink(), ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::TransportClient::PendingAssoc::active_, OpenDDS::DCPS::TransportClient::PendingAssoc::attribs_, OpenDDS::DCPS::back_inserter(), OpenDDS::DCPS::TransportClient::PendingAssoc::blob_index_, OpenDDS::DCPS::LogGuid::c_str(), clean_prev_pending(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, durable_, get_guid(), get_max_sn(), get_priority_value(), OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, impls_, OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::max_sn_, OpenDDS::DCPS::TransportClient::PendingAssoc::mutex_, OPENDDS_STRING, OpenDDS::DCPS::AssociationData::participant_discovered_at_, pending_, pending_assoc_timer_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, OpenDDS::DCPS::rchandle_from(), reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, repo_id(), repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, OpenDDS::DCPS::TransportImpl::transport_type(), use_datalink_i(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::RecorderImpl::add_association(), OpenDDS::DCPS::ReplayerImpl::add_association(), OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Writer::assoc(), and OpenDDS::RTPS::Sedp::Reader::assoc().

200 {
201  GUID_t repo_id = get_guid();
202 
203  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
204 
205  repo_id_ = repo_id;
206 
207  if (impls_.empty()) {
208  if (DCPS_debug_level) {
209  LogGuid writer_log(repo_id_);
210  LogGuid reader_log(data.remote_id_);
211  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
212  ACE_TEXT("local %C remote %C no available impls\n"),
213  writer_log.c_str(),
214  reader_log.c_str()));
215  }
216  return false;
217  }
218 
219  bool all_impls_shut_down = true;
220  for (size_t i = 0; i < impls_.size(); ++i) {
221  TransportImpl_rch impl = impls_[i].lock();
222  if (impl && !impl->is_shut_down()) {
223  all_impls_shut_down = false;
224  break;
225  }
226  }
227 
228  if (all_impls_shut_down) {
229  if (DCPS_debug_level) {
230  LogGuid writer_log(repo_id_);
231  LogGuid reader_log(data.remote_id_);
232  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
233  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
234  writer_log.c_str(),
235  reader_log.c_str()));
236  }
237  return false;
238  }
239 
241 
242  PendingMap::iterator iter = pending_.find(data.remote_id_);
243 
244  if (iter == pending_.end()) {
245  GUID_t remote_copy(data.remote_id_);
246  PendingAssoc_rch pa = make_rch<PendingAssoc>(rchandle_from(this));
247  pa->active_ = active;
248  pa->impls_.clear();
249  pa->blob_index_ = 0;
250  pa->data_ = data;
251  pa->attribs_.local_id_ = repo_id_;
252  pa->attribs_.priority_ = get_priority_value(data);
253  pa->attribs_.local_reliable_ = reliable_;
254  pa->attribs_.local_durable_ = durable_;
255  pa->attribs_.max_sn_ = get_max_sn();
256  iter = pending_.insert(std::make_pair(remote_copy, pa)).first;
257 
258  LogGuid tc_assoc_log(repo_id_);
259  LogGuid remote_log(data.remote_id_);
260  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
261  "between %C and remote %C\n",
262  tc_assoc_log.c_str(),
263  remote_log.c_str()), 0);
264  } else {
265 
266  ACE_ERROR((LM_ERROR,
267  ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
268  ACE_TEXT("already associating with remote.\n")));
269 
270  return false;
271 
272  }
273 
274  PendingAssoc_rch pend = iter->second;
275 
276  if (active) {
277  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
278  pend->impls_.reserve(impls_.size());
279  std::reverse_copy(impls_.begin(), impls_.end(),
280  std::back_inserter(pend->impls_));
281 
282  return pend->initiate_connect(this, guard);
283 
284  } else { // passive
285 
286  // call accept_datalink for each impl / blob pair of the same type
287  for (size_t i = 0; i < impls_.size(); ++i) {
288  // Release the PendingAssoc object's mutex_ since the nested for-loop does not access
289  // the PendingAssoc object directly and the functions called by the nested loop can
290  // lead to a PendingAssoc object's mutex_ being acquired, which will cause deadlock if
291  // it is not released here.
292  TransportImpl::ConnectionAttribs attribs;
293  TransportImpl_rch impl = impls_[i].lock();
294  {
295  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
296  pend->impls_.push_back(impl);
297  attribs = pend->attribs_;
298  }
299  const OPENDDS_STRING type = impl->transport_type();
300 
301  for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
302  if (data.remote_data_[j].transport_type.in() == type) {
303  const TransportImpl::RemoteTransport remote = {
304  data.remote_id_, data.remote_data_[j].data, data.discovery_locator_.data, data.participant_discovered_at_, data.remote_transport_context_,
305  data.publication_transport_priority_,
306  data.remote_reliable_, data.remote_durable_};
307 
308  TransportImpl::AcceptConnectResult res;
309  {
310  // This thread acquired lock_ at the beginning of this method.
311  // Calling accept_datalink might require getting the lock for the transport's reactor.
312  // If the current thread is not an event handler for the transport's reactor, e.g.,
313  // the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
314  // Event handlers in the transport reactor may call passive_connection which calls use_datalink
315  // which acquires lock_. The locking order in this case is transport reactor lock -> lock_.
316  // To avoid deadlock, we must reverse the lock.
317  RcHandle<TransportClient> client = rchandle_from(this);
318  ACE_GUARD_RETURN(Reverse_Lock_t, rev_tc_guard, reverse_lock_, false);
319  res = impl->accept_datalink(remote, attribs, client);
320  }
321 
322  //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
323  iter = pending_.find(data.remote_id_);
324 
325  if (iter == pending_.end()) {
326  //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
327  //active side connection and completed, thus pend was removed from pending_. Can return true.
328  return true;
329  }
330  pend = iter->second;
331 
332  if (res.success_) {
333  if (res.link_.is_nil()) {
334  // In this case, it may be waiting for the TCP connection to be
335  // established. Just wait without trying other transports.
336  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
337  } else {
338  use_datalink_i(data.remote_id_, res.link_, guard);
339  return true;
340  }
341  }
342  }
343  }
344  }
345 
346  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
347  }
348 
349  return true;
350 }
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
#define ACE_DEBUG(X)
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
#define ACE_ERROR(X)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< PendingAssocTimer > pending_assoc_timer_
#define OPENDDS_STRING
virtual Priority get_priority_value(const AssociationData &data) const =0
ACE_CDR::ULong ULong
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual SequenceNumber get_max_sn() const
virtual GUID_t get_guid() const =0
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define VDBG_LVL(DBG_ARGS, LEVEL)
RcHandle< PendingAssoc > PendingAssoc_rch

◆ associated_with()

bool OpenDDS::DCPS::TransportClient::associated_with ( const GUID_t remote) const

Definition at line 1154 of file TransportClient.cpp.

References ACE_ERROR, data_link_index_, LM_ERROR, lock_, and ACE_Guard< ACE_LOCK >::locked().

1155 {
1157  if (!guard.locked()) {
1158  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::associated_with: "
1159  "lock failed\n"));
1160  return false;
1161  }
1162  return data_link_index_.count(remote);
1163 }
#define ACE_ERROR(X)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ cdr_encapsulation() [1/2]

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation ( ) const
inline

◆ cdr_encapsulation() [2/2]

void OpenDDS::DCPS::TransportClient::cdr_encapsulation ( bool  encap)
inlineprotected

Definition at line 150 of file TransportClient.h.

151  {
152  cdr_encapsulation_ = encap;
153  }

◆ check_transport_qos()

virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos ( const TransportInst inst)
privatepure virtual

◆ clean_prev_pending()

void OpenDDS::DCPS::TransportClient::clean_prev_pending ( )
private

Definition at line 72 of file TransportClient.cpp.

References prev_pending_.

Referenced by associate().

73 {
74  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end();) {
75  if (it->second->safe_to_remove()) {
76  prev_pending_.erase(it++);
77  } else {
78  ++it;
79  }
80  }
81 }

◆ connection_info()

const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info ( ) const
inline

◆ data_acked()

void OpenDDS::DCPS::TransportClient::data_acked ( const GUID_t remote)

Definition at line 1176 of file TransportClient.cpp.

References ACE_ERROR, get_send_listener(), LM_ERROR, lock_, and ACE_Guard< ACE_LOCK >::locked().

1177 {
1178  TransportSendListener_rch send_listener;
1179  {
1181  if (!guard.locked()) {
1182  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::data_acked: "
1183  "lock failed\n"));
1184  return;
1185  }
1186  send_listener = get_send_listener();
1187  }
1188  send_listener->data_acked(remote);
1189 }
#define ACE_ERROR(X)
RcHandle< TransportSendListener > TransportSendListener_rch
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
TransportSendListener_rch get_send_listener()

◆ disassociate()

void OpenDDS::DCPS::TransportClient::disassociate ( const GUID_t peerId)

Definition at line 691 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, lock_, pending_, pending_assoc_timer_, prev_pending_, OpenDDS::DCPS::DataLink::release_reservations(), OpenDDS::DCPS::DataLinkSet::remove_link(), OpenDDS::DCPS::DataLink::remove_listener(), repo_id_, OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), and VDBG_LVL.

Referenced by OpenDDS::RTPS::disassociate_helper_extended(), OpenDDS::RTPS::Sedp::Endpoint::get_priority_value(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::DataWriterImpl::remove_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations_i(), and OpenDDS::DCPS::DataReaderImpl::remove_associations_i().

692 {
693  LogGuid peerId_log(peerId);
694  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
695  "TransportClient(%@) disassociating from %C\n",
696  this,
697  peerId_log.c_str()), 5);
698 
700 
701  PendingMap::iterator iter = pending_.find(peerId);
702  if (iter != pending_.end()) {
703  {
704  // The transport impl may have resource for a pending connection.
705  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
706  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
707  TransportImpl_rch impl = iter->second->impls_[i].lock();
708  if (impl) {
709  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
710  }
711  }
712  }
713  iter->second->reset_client();
714  pending_assoc_timer_->cancel_timer(iter->second);
715  prev_pending_.insert(std::make_pair(iter->first, iter->second));
716  pending_.erase(iter);
717  return;
718  }
719 
720  const DataLinkIndex::iterator found = data_link_index_.find(peerId);
721 
722  if (found == data_link_index_.end()) {
723  if (DCPS_debug_level > 4) {
724  const LogGuid log(peerId);
725  ACE_DEBUG((LM_DEBUG,
726  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
727  ACE_TEXT("no link for remote peer %C\n"),
728  log.c_str()));
729  }
730 
731  return;
732  }
733 
734  const DataLink_rch link = found->second;
735 
736  //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
737  //otherwise it could be removed in transport_detached()
738  data_link_index_.erase(found);
739  DataLinkSetMap released;
740 
741  if (DCPS_debug_level > 4) {
742  ACE_DEBUG((LM_DEBUG,
743  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
744  ACE_TEXT("about to release_reservations for link[%@]\n"),
745  link.in()));
746  }
747 
748  link->release_reservations(peerId, repo_id_, released);
749 
750  if (!released.empty()) {
751 
752  if (DCPS_debug_level > 4) {
753  ACE_DEBUG((LM_DEBUG,
754  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
755  ACE_TEXT("about to remove_link[%@] from links_\n"),
756  link.in()));
757  }
758  links_.remove_link(link);
759 
760  if (DCPS_debug_level > 4) {
761  LogGuid logger(repo_id_);
762  ACE_DEBUG((LM_DEBUG,
763  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
764  logger.c_str(),
765  link.in()));
766  }
767  // Datalink is no longer used for any remote peer by this TransportClient
768  link->remove_listener(repo_id_);
769 
770  }
771 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
void remove_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:50
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
RcHandle< PendingAssocTimer > pending_assoc_timer_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0

◆ domain_id()

virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id ( ) const
privatepure virtual

◆ enable_transport()

void OpenDDS::DCPS::TransportClient::enable_transport ( bool  reliable,
bool  durable 
)

Definition at line 84 of file TransportClient.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, OpenDDS::DCPS::TransportRegistry::domain_default_config(), domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::fix_empty_default(), OpenDDS::DCPS::TransportRegistry::global_config(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::TransportConfig::instances_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, OpenDDS::DCPS::TransportConfig::name(), and OpenDDS::DCPS::rchandle_from().

Referenced by OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().

85 {
86  // Search for a TransportConfig to use:
88 
89  // 1. If this object is an Entity, check if a TransportConfig has been
90  // bound either directly to this entity or to a parent entity.
91  for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
92  ent && tc.is_nil(); ent = ent->parent()) {
93  tc = ent->transport_config();
94  }
95 
96  if (tc.is_nil()) {
97  TransportRegistry* const reg = TransportRegistry::instance();
98  // 2. Check for a TransportConfig that is the default for this Domain.
99  tc = reg->domain_default_config(domain_id());
100 
101  if (tc.is_nil()) {
102  // 3. Use the global_config if one has been set.
103  tc = reg->global_config();
104 
105  if (!tc.is_nil() && tc->instances_.empty()
106  && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
107  // 4. Set the "fallback option" if the global_config is empty.
108  // (only applies if the user hasn't changed the global config)
109  tc = reg->fix_empty_default();
110  }
111  }
112  }
113 
114  if (tc.is_nil()) {
115  ACE_ERROR((LM_ERROR,
116  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
117  ACE_TEXT("No TransportConfig found.\n")));
118  throw Transport::NotConfigured();
119  }
120 
121  enable_transport_using_config(reliable, durable, tc);
122 }
#define ACE_ERROR(X)
void enable_transport_using_config(bool reliable, bool durable, const TransportConfig_rch &tc)
static const char DEFAULT_CONFIG_NAME[]
static TransportRegistry * instance()
Return a singleton instance of this class.
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
virtual DDS::DomainId_t domain_id() const =0
RcHandle< TransportConfig > TransportConfig_rch

◆ enable_transport_using_config()

void OpenDDS::DCPS::TransportClient::enable_transport_using_config ( bool  reliable,
bool  durable,
const TransportConfig_rch tc 
)

Definition at line 125 of file TransportClient.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), cdr_encapsulation_, check_transport_qos(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, OpenDDS::DCPS::TimeDuration::from_msec(), get_crypto_handle(), OpenDDS::DCPS::TransportInst::get_or_create_impl(), impls_, OpenDDS::DCPS::TransportConfig::instances_, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportImpl::local_crypto_handle(), OpenDDS::DCPS::TransportConfig::passive_connect_duration_, passive_connect_duration_, populate_connection_info(), reliable_, OpenDDS::DCPS::TransportInst::requires_cdr_encapsulation(), OpenDDS::DCPS::TransportConfig::swap_bytes_, and swap_bytes_.

Referenced by enable_transport(), OpenDDS::RTPS::Sedp::Endpoint::get_priority_value(), and OpenDDS::RTPS::Sedp::init().

127 {
128  config_ = tc;
129  swap_bytes_ = tc->swap_bytes_;
130  reliable_ = reliable;
131  durable_ = durable;
132  unsigned long duration = tc->passive_connect_duration_;
133  if (duration == 0) {
135  if (DCPS_debug_level) {
136  ACE_DEBUG((LM_WARNING,
137  ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
138  ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
139  ACE_TEXT("default value\n")));
140  }
141  }
143 
145 
146  const size_t n = tc->instances_.size();
147 
148  for (size_t i = 0; i < n; ++i) {
149  TransportInst_rch inst = tc->instances_[i];
150 
151  if (check_transport_qos(*inst)) {
152  TransportImpl_rch impl = inst->get_or_create_impl();
153 
154  if (impl) {
155  impls_.push_back(impl);
156 
157 #if defined(OPENDDS_SECURITY)
158  impl->local_crypto_handle(get_crypto_handle());
159 #endif
160 
161  cdr_encapsulation_ |= inst->requires_cdr_encapsulation();
162  }
163  }
164  }
165 
166  if (impls_.empty()) {
167  ACE_ERROR((LM_ERROR,
168  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
169  ACE_TEXT("No TransportImpl could be created.\n")));
170  throw Transport::NotConfigured();
171  }
172 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual bool check_transport_qos(const TransportInst &inst)=0
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
static TimeDuration from_msec(const ACE_UINT64 &ms)
ACE_TEXT("TCP_Factory")
static const unsigned long DEFAULT_PASSIVE_CONNECT_DURATION

◆ get_builtin_subscriber_proxy()

virtual RcHandle<BitSubscriber> OpenDDS::DCPS::TransportClient::get_builtin_subscriber_proxy ( ) const
inlinevirtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Definition at line 132 of file TransportClient.h.

132 { return RcHandle<BitSubscriber>(); }

◆ get_crypto_handle()

virtual DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::TransportClient::get_crypto_handle ( ) const
inlineprivatevirtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Definition at line 167 of file TransportClient.h.

References DDS::HANDLE_NIL.

Referenced by enable_transport_using_config().

168  {
169  return DDS::HANDLE_NIL;
170  }
const InstanceHandle_t HANDLE_NIL

◆ get_guid()

virtual GUID_t OpenDDS::DCPS::TransportClient::get_guid ( ) const
pure virtual

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::TransportClient::get_ice_endpoint ( )

Definition at line 872 of file TransportClient.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::TransportImpl::get_ice_endpoint(), impls_, and lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::get_ice_endpoint(), OpenDDS::DCPS::DataReaderImpl::get_ice_endpoint(), and OpenDDS::RTPS::Sedp::DiscoveryWriter::write_dcps_participant_secure().

873 {
874  // The one-to-many relationship with impls implies that this should
875  // return a set of endpoints instead of a single endpoint or null.
876  // For now, we will assume a single impl.
877 
878  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, WeakRcHandle<ICE::Endpoint>());
879  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
880  pos != limit;
881  ++pos) {
882  TransportImpl_rch impl = pos->lock();
883  if (impl) {
884  WeakRcHandle<ICE::Endpoint> endpoint = impl->get_ice_endpoint();
885  if (endpoint) { return endpoint; }
886  }
887  }
888 
889  return WeakRcHandle<ICE::Endpoint>();
890 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ get_max_sn()

virtual SequenceNumber OpenDDS::DCPS::TransportClient::get_max_sn ( ) const
inlineprivatevirtual

◆ get_priority_value()

virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value ( const AssociationData data) const
privatepure virtual

◆ get_receive_listener()

TransportReceiveListener_rch OpenDDS::DCPS::TransportClient::get_receive_listener ( )
private

Definition at line 1106 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link().

1107 {
1108  return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
1109 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ get_send_listener()

TransportSendListener_rch OpenDDS::DCPS::TransportClient::get_send_listener ( )
private

Definition at line 1100 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link(), data_acked(), send_control(), and send_control_to().

1101 {
1102  return rchandle_from(dynamic_cast<TransportSendListener*>(this));
1103 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ initiate_connect_i()

bool OpenDDS::DCPS::TransportClient::initiate_connect_i ( TransportImpl::AcceptConnectResult result,
TransportImpl_rch  impl,
const TransportImpl::RemoteTransport remote,
const TransportImpl::ConnectionAttribs attribs_,
Guard guard 
)
private

Definition at line 384 of file TransportClient.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::TransportImpl::connect_datalink(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Guard< ACE_LOCK >::locked(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().

389 {
390  if (!guard.locked()) {
391  //don't own the lock_ so can't release it...shouldn't happen
392  LogGuid local_log(repo_id_);
393  LogGuid remote_log(remote.repo_id_);
394  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i ")
395  ACE_TEXT("between local %C and remote %C unsuccessful because ")
396  ACE_TEXT("guard was not locked\n"),
397  local_log.c_str(),
398  remote_log.c_str()), 0);
399  return false;
400  }
401 
402  {
403  //can't call connect while holding lock due to possible reactor deadlock
404  LogGuid local_log(repo_id_);
405  LogGuid remote_log(remote.repo_id_);
406  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
407  "attempt to connect_datalink between local %C and remote %C\n",
408  local_log.c_str(),
409  remote_log.c_str()), 0);
410  {
411  TransportImpl::ConnectionAttribs attribs = attribs_;
412  RcHandle<TransportClient> client = rchandle_from(this);
413  ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
414  result = impl->connect_datalink(remote, attribs, client);
415  }
416  if (!result.success_) {
417  if (DCPS_debug_level) {
418  LogGuid writer_log(repo_id_);
419  LogGuid reader_log(remote.repo_id_);
420  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i - ")
421  ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
422  writer_log.c_str(),
423  reader_log.c_str()));
424  }
425  return false;
426  }
427  }
428 
429  LogGuid local_log(repo_id_);
430  LogGuid remote_log(remote.repo_id_);
431  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
432  "connection between local %C and remote %C initiation successful\n",
433  local_log.c_str(),
434  remote_log.c_str()), 0);
435  return true;
436 }
#define ACE_DEBUG(X)
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ is_leading()

bool OpenDDS::DCPS::TransportClient::is_leading ( const GUID_t reader_id) const

Definition at line 1191 of file TransportClient.cpp.

References get_guid(), OpenDDS::DCPS::DataLinkSet::is_leading(), links_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

1192 {
1193  return links_.is_leading(get_guid(), reader_id);
1194 }
bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const
virtual GUID_t get_guid() const =0

◆ is_reliable()

bool OpenDDS::DCPS::TransportClient::is_reliable ( ) const
inline

Definition at line 70 of file TransportClient.h.

References header, and send().

◆ OPENDDS_MAP_CMP() [1/2]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( GUID_t  ,
DataLink_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [2/2]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( GUID_t  ,
PendingAssoc_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP()

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
PendingAssoc_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR ( WeakRcHandle< TransportImpl )
private

◆ pending_association_with()

bool OpenDDS::DCPS::TransportClient::pending_association_with ( const GUID_t remote) const

Definition at line 1165 of file TransportClient.cpp.

References ACE_ERROR, LM_ERROR, lock_, ACE_Guard< ACE_LOCK >::locked(), and pending_.

1166 {
1168  if (!guard.locked()) {
1169  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::pending_association_with: "
1170  "lock failed\n"));
1171  return false;
1172  }
1173  return pending_.count(remote);
1174 }
#define ACE_ERROR(X)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ populate_connection_info()

void OpenDDS::DCPS::TransportClient::populate_connection_info ( )

Definition at line 175 of file TransportClient.cpp.

References ACE_ERROR, ACE_TEXT(), check_transport_qos(), config_, conn_info_, OpenDDS::DCPS::TransportImpl::connection_info(), OpenDDS::DCPS::CONNINFO_ALL, OpenDDS::DCPS::TransportInst::get_or_create_impl(), OpenDDS::DCPS::grow(), OpenDDS::DCPS::TransportConfig::instances_, and LM_ERROR.

Referenced by enable_transport_using_config(), and OpenDDS::DCPS::DataWriterImpl::transport_discovery_change().

176 {
177  conn_info_.length(0);
178 
179  const size_t n = config_->instances_.size();
180  for (size_t i = 0; i < n; ++i) {
182  if (check_transport_qos(*inst)) {
183  TransportImpl_rch impl = inst->get_or_create_impl();
184  if (impl) {
185  const CORBA::ULong idx = DCPS::grow(conn_info_) - 1;
186  impl->connection_info(conn_info_[idx], CONNINFO_ALL);
187  }
188  }
189  }
190 
191  if (conn_info_.length() == 0) {
192  ACE_ERROR((LM_ERROR,
193  ACE_TEXT("(%P|%t) TransportClient::populate_connection_info: ")
194  ACE_TEXT("No connection info\n")));
195  }
196 }
#define ACE_ERROR(X)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual bool check_transport_qos(const TransportInst &inst)=0
ACE_CDR::ULong ULong
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
ACE_TEXT("TCP_Factory")
static const ConnectionInfoFlags CONNINFO_ALL
TransportLocatorSeq conn_info_

◆ register_for_reader()

void OpenDDS::DCPS::TransportClient::register_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid,
const TransportLocatorSeq locators,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 789 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::register_for_reader().

Referenced by OpenDDS::DCPS::ReplayerImpl::register_for_reader(), and OpenDDS::DCPS::DataWriterImpl::register_for_reader().

794 {
796  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
797  pos != limit;
798  ++pos) {
799  TransportImpl_rch impl = pos->lock();
800  if (impl) {
801  impl->register_for_reader(participant, writerid, readerid, locators, listener);
802  }
803  }
804 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
virtual void register_for_reader(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
Definition: TransportImpl.h:87

◆ register_for_writer()

void OpenDDS::DCPS::TransportClient::register_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)

Definition at line 823 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::register_for_writer().

Referenced by OpenDDS::DCPS::RecorderImpl::register_for_writer(), and OpenDDS::DCPS::DataReaderImpl::register_for_writer().

828 {
830  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
831  pos != limit;
832  ++pos) {
833  TransportImpl_rch impl = pos->lock();
834  if (impl) {
835  impl->register_for_writer(participant, readerid, writerid, locators, listener);
836  }
837  }
838 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
Definition: TransportImpl.h:97
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ remove_all_msgs()

bool OpenDDS::DCPS::TransportClient::remove_all_msgs ( )

◆ remove_sample()

bool OpenDDS::DCPS::TransportClient::remove_sample ( const DataSampleElement sample)

Definition at line 1138 of file TransportClient.cpp.

References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().

Referenced by OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample().

1139 {
1140  return links_.remove_sample(sample);
1141 }
bool remove_sample(const DataSampleElement *sample)

◆ repo_id()

GUID_t OpenDDS::DCPS::TransportClient::repo_id ( ) const
inline

Definition at line 139 of file TransportClient.h.

References lock_.

Referenced by associate(), OpenDDS::DCPS::DataReaderImpl::data_received(), send_i(), transport_stop(), and OpenDDS::DCPS::ReplayerImpl::write().

140  {
142  return repo_id_;
143  }
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ send()

void OpenDDS::DCPS::TransportClient::send ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id = 0 
)

Definition at line 919 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::RTPS::Sedp::Writer::send_sample_i(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), and OpenDDS::DCPS::DataWriterImpl::write().

920 {
921  if (send_list.head() == 0) {
922  return;
923  }
924  ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
925  send_i(send_list, transaction_id);
926 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex send_transaction_lock_
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)

◆ send_control()

SendControlStatus OpenDDS::DCPS::TransportClient::send_control ( const DataSampleHeader header,
Message_Block_Ptr  msg 
)

Definition at line 1112 of file TransportClient.cpp.

References get_send_listener(), links_, OpenDDS::DCPS::move(), repo_id_, and OpenDDS::DCPS::DataLinkSet::send_control().

Referenced by OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::RTPS::Sedp::Writer::write_control_msg().

1114 {
1116 }
void send_control(DataSampleElement *sample)
Send a control message that is wrapped in a DataSampleElement.
Definition: DataLinkSet.inl:81
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
TransportSendListener_rch get_send_listener()
T::rv_reference move(T &p)
Definition: unique_ptr.h:141

◆ send_control_to()

SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to ( const DataSampleHeader header,
Message_Block_Ptr  msg,
const GUID_t destination 
)

Definition at line 1119 of file TransportClient.cpp.

References ACE_GUARD_RETURN, data_link_index_, get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), lock_, OpenDDS::DCPS::move(), repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), and OpenDDS::DCPS::SEND_CONTROL_ERROR.

Referenced by send_w_control().

1122 {
1123  DataLinkSet singular;
1124  {
1126  DataLinkIndex::iterator found = data_link_index_.find(destination);
1127 
1128  if (found == data_link_index_.end()) {
1129  return SEND_CONTROL_ERROR;
1130  }
1131 
1132  singular.insert_link(found->second);
1133  }
1134  return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
1135 }
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
TransportSendListener_rch get_send_listener()
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ send_final_acks()

void OpenDDS::DCPS::TransportClient::send_final_acks ( )

Definition at line 685 of file TransportClient.cpp.

References get_guid(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().

686 {
688 }
void send_final_acks(const GUID_t &readerid)
virtual GUID_t get_guid() const =0

◆ send_i()

void OpenDDS::DCPS::TransportClient::send_i ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id 
)
private

Definition at line 943 of file TransportClient.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, LM_DEBUG, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, repo_id(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.

Referenced by send(), and send_w_control().

944 {
945  if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
946  if (transaction_id > max_transaction_id_seen_) {
947  max_transaction_id_seen_ = transaction_id;
948  max_transaction_tail_ = send_list.tail();
949  }
950  return;
951  } else /* transaction_id == expected_transaction_id */ {
952 
953  DataSampleElement* cur = send_list.head();
954  if (max_transaction_tail_ == 0) {
955  //Means no future transaction beat this transaction into send
956  if (transaction_id != 0)
958  // Only send this current transaction
959  max_transaction_tail_ = send_list.tail();
960  }
961  DataLinkSet send_links;
962 
963  while (cur != 0) {
964  // VERY IMPORTANT NOTE:
965  //
966  // We have to be very careful in how we deal with the current
967  // DataSampleElement. The issue is that once we have invoked
968  // data_delivered() on the send_listener_ object, or we have invoked
969  // send() on the pub_links, we can no longer access the current
970  // DataSampleElement!Thus, we need to get the next
971  // DataSampleElement (pointer) from the current element now,
972  // while it is safe.
973  DataSampleElement* next_elem;
974  if (cur != max_transaction_tail_) {
975  next_elem = cur->get_next_send_sample();
976  } else {
977  next_elem = max_transaction_tail_;
978  }
979  DataLinkSet_rch pub_links =
980  (cur->get_num_subs() > 0)
981  ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
982  : DataLinkSet_rch(&links_, inc_count());
983 
984  if (pub_links.is_nil() || pub_links->empty()) {
985  // NOTE: This is the "local publisher id is not currently
986  // associated with any remote subscriber ids" case.
987 
988  if (DCPS_debug_level > 4) {
989  LogGuid logger(cur->get_pub_id());
990  ACE_DEBUG((LM_DEBUG,
991  ACE_TEXT("(%P|%t) TransportClient::send_i: ")
992  ACE_TEXT("no links for publication %C, ")
993  ACE_TEXT("not sending element %@ for transaction: %d.\n"),
994  logger.c_str(),
995  cur,
996  cur->transaction_id()));
997  }
998 
999  // We tell the send_listener_ that all of the remote subscriber ids
1000  // that wanted the data (all zero of them) have indeed received
1001  // the data.
1002  cur->get_send_listener()->data_delivered(cur);
1003 
1004  } else {
1005  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
1006  , cur), 5);
1007 
1008 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1009 
1010  // Content-Filtering adjustment to the pub_links:
1011  // - If the sample should be filtered out of all subscriptions on a given
1012  // DataLink, then exclude that link from the subset that we'll send to.
1013  // - If the sample should be filtered out of some (or none) of the subs,
1014  // then record that information in the DataSampleElement so that the
1015  // header's content_filter_entries_ can be marshaled before it's sent.
1016  if (cur->filter_out_.ptr()) {
1017  DataLinkSet_rch subset;
1018  DataLinkSet::GuardType guard(pub_links->lock());
1019  typedef DataLinkSet::MapType MapType;
1020  MapType& map = pub_links->map();
1021 
1022  for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
1023  size_t n_subs;
1024  GUIDSeq_var ti =
1025  itr->second->target_intersection(cur->get_pub_id(),
1026  cur->filter_out_.in(), n_subs);
1027 
1028  if (ti.ptr() == 0 || ti->length() != n_subs) {
1029  if (!subset.in()) {
1030  subset = make_rch<DataLinkSet>();
1031  }
1032 
1033  subset->insert_link(itr->second);
1034  cur->filter_per_link_[itr->first] = ti._retn();
1035 
1036  } else {
1037  VDBG((LM_DEBUG,
1038  "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
1039  itr->second.in()));
1040  }
1041  }
1042 
1043  if (!subset.in()) {
1044  guard.release();
1045  VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
1046  // similar to the "if (pub_links.is_nil())" case above, no links
1047  cur->get_send_listener()->data_delivered(cur);
1048  if (cur != max_transaction_tail_) {
1049  // Move on to the next DataSampleElement to send.
1050  cur = next_elem;
1051  continue;
1052  } else {
1053  break;
1054  }
1055  }
1056 
1057  pub_links = subset;
1058  }
1059 
1060 #endif
1061 
1062  // This will do several things, including adding to the membership
1063  // of the send_links set. Any DataLinks added to the send_links
1064  // set will be also told about the send_start() event. Those
1065  // DataLinks (in the pub_links set) that are already in the
1066  // send_links set will not be told about the send_start() event
1067  // since they heard about it when they were inserted into the
1068  // send_links set.
1069  send_links.send_start(pub_links.in());
1070  if (cur->get_header().message_id_ != SAMPLE_DATA) {
1071  pub_links->send_control(cur);
1072  } else {
1073  pub_links->send(cur);
1074  }
1075  }
1076  if (cur != max_transaction_tail_) {
1077  // Move on to the next DataSampleElement to send.
1078  cur = next_elem;
1079  } else {
1080  break;
1081  }
1082  }
1083 
1084  // This will inform each DataLink in the set about the stop_send() event.
1085  // It will then clear the send_links_ set.
1086  //
1087  // The reason that the send_links_ set is cleared is because we continually
1088  // reuse the same send_links_ object over and over for each call to this
1089  // send method.
1090  GUID_t pub_id = repo_id();
1091  send_links.send_stop(pub_id);
1092  if (transaction_id != 0) {
1094  }
1096  }
1097 }
#define ACE_DEBUG(X)
DataSampleElement * get_next_send_sample() const
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLinkSet.h:27
ACE_Guard< LockType > GuardType
Definition: DataLinkSet.h:78
#define VDBG(DBG_ARGS)
DataLinkSet_rch select_links(const GUID_t *remoteIds, const CORBA::ULong num_targets)
Definition: DataLinkSet.cpp:64
DataSampleElement * max_transaction_tail_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ send_response()

bool OpenDDS::DCPS::TransportClient::send_response ( const GUID_t peer,
const DataSampleHeader header,
Message_Block_Ptr  payload 
)

Definition at line 893 of file TransportClient.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataLinkSet::insert_link(), LM_DEBUG, OpenDDS::DCPS::move(), and OpenDDS::DCPS::DataLinkSet::send_response().

896 {
897  DataLinkIndex::iterator found = data_link_index_.find(peer);
898 
899  if (found == data_link_index_.end()) {
900  if (DCPS_debug_level > 4) {
901  LogGuid logger(peer);
902  ACE_DEBUG((LM_DEBUG,
903  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
904  ACE_TEXT("no link for publication %C, ")
905  ACE_TEXT("not sending response.\n"),
906  logger.c_str()));
907  }
908 
909  return false;
910  }
911 
912  DataLinkSet singular;
913  singular.insert_link(found->second);
914  singular.send_response(peer, header, move(payload));
915  return true;
916 }
#define ACE_DEBUG(X)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ send_w_control()

SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control ( SendStateDataSampleList  send_list,
const DataSampleHeader header,
Message_Block_Ptr  msg,
const GUID_t destination 
)

Definition at line 929 of file TransportClient.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), and OpenDDS::DCPS::DataWriterImpl::replay_durable_data_for().

933 {
934  ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
936  if (send_list.head()) {
937  send_i(send_list, 0);
938  }
939  return send_control_to(header, move(msg), destination);
940 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
ACE_Thread_Mutex send_transaction_lock_
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
SendControlStatus send_control_to(const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)

◆ stop_associating() [1/2]

void OpenDDS::DCPS::TransportClient::stop_associating ( )

Definition at line 633 of file TransportClient.cpp.

References ACE_GUARD, lock_, pending_, pending_assoc_timer_, prev_pending_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().

Referenced by OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::DataReaderImpl::remove_all_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::DataWriterImpl::remove_associations(), OpenDDS::DCPS::DataReaderImpl::remove_associations(), and ~TransportClient().

634 {
636  for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
637  {
638  // The transport impl may have resource for a pending connection.
639  ACE_Guard<ACE_Thread_Mutex> guard(it->second->mutex_);
640  for (size_t i = 0; i < it->second->impls_.size(); ++i) {
641  TransportImpl_rch impl = it->second->impls_[i].lock();
642  if (impl) {
643  impl->stop_accepting_or_connecting(*this, it->second->data_.remote_id_, true, true);
644  }
645  }
646  }
647  it->second->reset_client();
648  pending_assoc_timer_->cancel_timer(it->second);
649  prev_pending_.insert(std::make_pair(it->first, it->second));
650  }
651  pending_.clear();
652 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< PendingAssocTimer > pending_assoc_timer_
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0

◆ stop_associating() [2/2]

void OpenDDS::DCPS::TransportClient::stop_associating ( const GUID_t repos,
CORBA::ULong  length 
)

Definition at line 655 of file TransportClient.cpp.

References ACE_GUARD, lock_, pending_, pending_assoc_timer_, prev_pending_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().

656 {
658 
659  if (repos == 0 || length == 0) {
660  return;
661  } else {
662  for (CORBA::ULong i = 0; i < length; ++i) {
663  PendingMap::iterator iter = pending_.find(repos[i]);
664  if (iter != pending_.end()) {
665  {
666  // The transport impl may have resource for a pending connection.
667  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
668  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
669  TransportImpl_rch impl = iter->second->impls_[i].lock();
670  if (impl) {
671  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
672  }
673  }
674  }
675  iter->second->reset_client();
676  pending_assoc_timer_->cancel_timer(iter->second);
677  prev_pending_.insert(std::make_pair(iter->first, iter->second));
678  pending_.erase(iter);
679  }
680  }
681  }
682 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
RcHandle< PendingAssocTimer > pending_assoc_timer_
ACE_CDR::ULong ULong
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0

◆ swap_bytes()

bool OpenDDS::DCPS::TransportClient::swap_bytes ( ) const
inline

◆ terminate_send_if_suspended()

void OpenDDS::DCPS::TransportClient::terminate_send_if_suspended ( )

◆ transport_assoc_done()

virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done ( int  ,
const GUID_t  
)
inlineprivatevirtual

Reimplemented in OpenDDS::RTPS::Sedp::Writer, OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.

Definition at line 161 of file TransportClient.h.

Referenced by use_datalink_i().

161 {}

◆ transport_stop()

void OpenDDS::DCPS::TransportClient::transport_stop ( )

Definition at line 773 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransportImpl::client_stop(), impls_, lock_, repo_id(), and repo_id_.

Referenced by OpenDDS::DCPS::RecorderImpl::remove_all_associations(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), and OpenDDS::DCPS::DataReaderImpl::remove_all_associations().

774 {
776  const ImplsType impls = impls_;
777  const GUID_t repo_id = repo_id_;
778  guard.release();
779 
780  for (size_t i = 0; i < impls.size(); ++i) {
781  const TransportImpl_rch impl = impls[i].lock();
782  if (impl) {
783  impl->client_stop(repo_id);
784  }
785  }
786 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
virtual void client_stop(const GUID_t &)

◆ unregister_for_reader()

void OpenDDS::DCPS::TransportClient::unregister_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid 
)

Definition at line 807 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::unregister_for_reader().

Referenced by OpenDDS::DCPS::ReplayerImpl::unregister_for_reader(), and OpenDDS::DCPS::DataWriterImpl::unregister_for_reader().

810 {
812  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
813  pos != limit;
814  ++pos) {
815  TransportImpl_rch impl = pos->lock();
816  if (impl) {
817  impl->unregister_for_reader(participant, writerid, readerid);
818  }
819  }
820 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
virtual void unregister_for_reader(const GUID_t &, const GUID_t &, const GUID_t &)
Definition: TransportImpl.h:93

◆ unregister_for_writer()

void OpenDDS::DCPS::TransportClient::unregister_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid 
)

Definition at line 841 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::unregister_for_writer().

Referenced by OpenDDS::DCPS::RecorderImpl::unregister_for_writer(), and OpenDDS::DCPS::DataReaderImpl::unregister_for_writer().

844 {
846  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
847  pos != limit;
848  ++pos) {
849  TransportImpl_rch impl = pos->lock();
850  if (impl) {
851  impl->unregister_for_writer(participant, readerid, writerid);
852  }
853  }
854 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.

◆ update_locators()

void OpenDDS::DCPS::TransportClient::update_locators ( const GUID_t remote,
const TransportLocatorSeq locators 
)

Definition at line 857 of file TransportClient.cpp.

References ACE_GUARD, impls_, lock_, and OpenDDS::DCPS::TransportImpl::update_locators().

Referenced by OpenDDS::DCPS::DataWriterImpl::update_locators(), and OpenDDS::DCPS::DataReaderImpl::update_locators().

859 {
861  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
862  pos != limit;
863  ++pos) {
864  TransportImpl_rch impl = pos->lock();
865  if (impl) {
866  impl->update_locators(remote, locators);
867  }
868  }
869 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)

◆ use_datalink()

void OpenDDS::DCPS::TransportClient::use_datalink ( const GUID_t remote_id,
const DataLink_rch link 
)

Definition at line 526 of file TransportClient.cpp.

References ACE_GUARD, lock_, and use_datalink_i().

528 {
530 
531  use_datalink_i(remote_id, link, guard);
532 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)

◆ use_datalink_i()

void OpenDDS::DCPS::TransportClient::use_datalink_i ( const GUID_t remote_id,
const DataLink_rch link,
Guard guard 
)
private

Definition at line 535 of file TransportClient.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransportClient::PendingAssoc::active_, add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::TransportClient::PendingAssoc::mutex_, pending_, pending_assoc_timer_, prev_pending_, ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::TransportClient::PendingAssoc::reset_client(), OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting(), transport_assoc_done(), and VDBG_LVL.

Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().

538 {
539  // Try to make a local copy of remote_id to use in calls
540  // because the reference could be invalidated if the caller
541  // reference location is deleted (i.e. in stop_accepting_or_connecting
542  // if use_datalink_i was called from passive_connection)
543  // Does changing this from a reference to a local affect anything going forward?
544  GUID_t remote_id(remote_id_ref);
545 
546  LogGuid peerId_log(remote_id);
547  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
548  "TransportClient(%@) using datalink[%@] from %C\n",
549  this,
550  link.in(),
551  peerId_log.c_str()), 0);
552 
553  PendingMap::iterator iter = pending_.find(remote_id);
554 
555  if (iter == pending_.end()) {
556  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
557  "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
558  this,
559  link.in(),
560  peerId_log.c_str()), 0);
561  return;
562  }
563 
564  PendingAssoc_rch pend = iter->second;
565  ACE_GUARD(ACE_Thread_Mutex, pend_guard, pend->mutex_);
566  const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
567  bool ok = false;
568 
569  if (link.is_nil()) {
570 
571  if (pend->active_ && pend->initiate_connect(this, guard)) {
572  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
573  "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
574  this,
575  link.in(),
576  peerId_log.c_str()), 0);
577  return;
578  }
579 
580  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
581  "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
582  this,
583  link.in(),
584  peerId_log.c_str()), 0);
585  } else { // link is ready to use
586  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
587  "TransportClient(%@) about to add_link[%@] to remote: %C\n",
588  this,
589  link.in(),
590  peerId_log.c_str()), 0);
591 
592  add_link(link, remote_id);
593  ok = true;
594  }
595 
596  // either link is valid or assoc failed, clean up pending object
597  for (size_t i = 0; i < pend->impls_.size(); ++i) {
598  TransportImpl_rch impl = pend->impls_[i].lock();
599  if (impl) {
600  impl->stop_accepting_or_connecting(*this, pend->data_.remote_id_, false, !ok);
601  }
602  }
603 
604  pend_guard.release();
605  pend->reset_client();
606  pending_assoc_timer_->cancel_timer(pend);
607  prev_pending_.insert(std::make_pair(iter->first, iter->second));
608  pending_.erase(iter);
609 
610  // Release TransportClient's lock as we're done updating its data.
611  guard.release();
612 
613  transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
614 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
RcHandle< PendingAssocTimer > pending_assoc_timer_
virtual void transport_assoc_done(int, const GUID_t &)
#define VDBG_LVL(DBG_ARGS, LEVEL)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
RcHandle< PendingAssoc > PendingAssoc_rch

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 194 of file TransportClient.h.

Member Data Documentation

◆ cdr_encapsulation_

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation_
private

Definition at line 340 of file TransportClient.h.

Referenced by enable_transport_using_config().

◆ config_

TransportConfig_rch OpenDDS::DCPS::TransportClient::config_
private

Definition at line 316 of file TransportClient.h.

Referenced by enable_transport_using_config(), and populate_connection_info().

◆ conn_info_

TransportLocatorSeq OpenDDS::DCPS::TransportClient::conn_info_
private

Definition at line 344 of file TransportClient.h.

Referenced by populate_connection_info().

◆ data_link_index_

DataLinkIndex OpenDDS::DCPS::TransportClient::data_link_index_
private

◆ durable_

bool OpenDDS::DCPS::TransportClient::durable_
private

◆ expected_transaction_id_

ACE_UINT64 OpenDDS::DCPS::TransportClient::expected_transaction_id_
private

Definition at line 328 of file TransportClient.h.

Referenced by send_i().

◆ impls_

ImplsType OpenDDS::DCPS::TransportClient::impls_
private

◆ links_

DataLinkSet OpenDDS::DCPS::TransportClient::links_
private

◆ lock_

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::lock_
mutableprivate

◆ max_transaction_id_seen_

ACE_UINT64 OpenDDS::DCPS::TransportClient::max_transaction_id_seen_
private

Definition at line 329 of file TransportClient.h.

Referenced by send_i().

◆ max_transaction_tail_

DataSampleElement* OpenDDS::DCPS::TransportClient::max_transaction_tail_
private

Definition at line 336 of file TransportClient.h.

Referenced by send_i().

◆ passive_connect_duration_

TimeDuration OpenDDS::DCPS::TransportClient::passive_connect_duration_
private

Definition at line 342 of file TransportClient.h.

Referenced by enable_transport_using_config().

◆ pending_

PendingMap OpenDDS::DCPS::TransportClient::pending_
private

◆ pending_assoc_timer_

RcHandle<PendingAssocTimer> OpenDDS::DCPS::TransportClient::pending_assoc_timer_
private

Definition at line 312 of file TransportClient.h.

Referenced by associate(), disassociate(), stop_associating(), and use_datalink_i().

◆ prev_pending_

PrevPendingMap OpenDDS::DCPS::TransportClient::prev_pending_
private

◆ reliable_

bool OpenDDS::DCPS::TransportClient::reliable_
private

Definition at line 340 of file TransportClient.h.

Referenced by add_link(), associate(), and enable_transport_using_config().

◆ repo_id_

GUID_t OpenDDS::DCPS::TransportClient::repo_id_
private

◆ reverse_lock_

Reverse_Lock_t OpenDDS::DCPS::TransportClient::reverse_lock_
private

Definition at line 349 of file TransportClient.h.

Referenced by associate(), and initiate_connect_i().

◆ send_transaction_lock_

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::send_transaction_lock_
private

Definition at line 327 of file TransportClient.h.

Referenced by send(), and send_w_control().

◆ swap_bytes_

bool OpenDDS::DCPS::TransportClient::swap_bytes_
private

Definition at line 340 of file TransportClient.h.

Referenced by enable_transport_using_config().


The documentation for this class was generated from the following files: