OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::MulticastTransport Class Reference

#include <MulticastTransport.h>

Inheritance diagram for OpenDDS::DCPS::MulticastTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastTransport:
Collaboration graph
[legend]

Public Member Functions

 MulticastTransport (const MulticastInst_rch &inst)
 
 ~MulticastTransport ()
 
void passive_connection (MulticastPeer local_peer, MulticastPeer remote_peer)
 
MulticastInst_rch config () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportImpl
virtual ~TransportImpl ()
 
virtual void unbind_link (DataLink *link)
 Remove any pending_release mappings. More...
 
bool release_link_resources (DataLink *link)
 
TransportInst_rch config () const
 
virtual void register_for_reader (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
 
virtual void unregister_for_reader (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void update_locators (const GUID_t &, const TransportLocatorSeq &)
 
virtual void get_last_recv_locator (const GUID_t &, TransportLocator &)
 
virtual void rtps_relay_address_change ()
 
virtual void append_transport_statistics (TransportStatisticsSequence &)
 
ACE_Reactor_Timer_Interfacetimer () const
 Interface to the transport's reactor for scheduling timers. More...
 
ACE_Reactorreactor () const
 
ACE_thread_t reactor_owner () const
 
bool is_shut_down () const
 
void create_reactor_task (bool useAsyncSend=false, const OPENDDS_STRING &name="")
 
void dump ()
 Diagnostic aid. More...
 
OPENDDS_STRING dump_to_str ()
 
void report ()
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
virtual void rtps_relay_only_now (bool)
 
virtual void use_rtps_relay_now (bool)
 
virtual void use_ice_now (bool)
 
ReactorTask_rch reactor_task ()
 
EventDispatcher_rch event_dispatcher ()
 
int acquire ()
 
int tryacquire ()
 
int release ()
 
int remove ()
 
bool connection_info (TransportLocator &local_info, ConnectionInfoFlags flags) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual void stop_accepting_or_connecting (const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
 
bool configure_i (const MulticastInst_rch &config)
 
virtual void shutdown_i ()
 
virtual bool connection_info_i (TransportLocator &info, ConnectionInfoFlags flags) const
 
virtual void release_datalink (DataLink *link)
 
virtual std::string transport_type () const
 
void client_stop (const GUID_t &localId)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportImpl
 TransportImpl (TransportInst_rch config)
 
bool open ()
 
typedef OPENDDS_MULTIMAP (TransportClient_wrch, DataLink_rch) PendConnMap
 
void add_pending_connection (const TransportClient_rch &client, DataLink_rch link)
 
void shutdown ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef ACE_Thread_Mutex ThreadLockType
 
typedef ACE_Guard< ThreadLockTypeGuardThreadType
 
typedef std::vector< DataLink::OnStartCallbackCallbacks
 
typedef std::pair< MulticastPeer, MulticastPeerPeers
 

Private Member Functions

MulticastDataLink_rch make_datalink (const GUID_t &local_id, Priority priority, bool active)
 
MulticastSession_rch start_session (const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
 
typedef OPENDDS_MAP (MulticastPeer, MulticastDataLink_rch) Links
 link for pubs. More...
 
typedef OPENDDS_MAP (Peers, Callbacks) PendConnMap
 
 OPENDDS_SET (Peers) connections_
 

Private Attributes

ThreadLockType links_lock_
 
Links client_links_
 
Links server_links_
 link for subs. More...
 
ThreadLockType connections_lock_
 
PendConnMap pending_connections_
 

Additional Inherited Members

- Public Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType lock_
 Lock to protect the config_ and reactor_task_ data members. More...
 
WeakRcHandle< TransportInstconfig_
 
ReactorTask_rch reactor_task_
 
EventDispatcher_rch event_dispatcher_
 smart ptr to the associated DL cleanup task More...
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
- Protected Types inherited from OpenDDS::DCPS::TransportImpl
typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType pending_connections_lock_
 Lock to protect the pending_connections_ data member. More...
 
PendConnMap pending_connections_
 
AtomicBool is_shut_down_
 Id of the last link established. More...
 

Detailed Description

Definition at line 28 of file MulticastTransport.h.

Member Typedef Documentation

◆ Callbacks

Definition at line 92 of file MulticastTransport.h.

◆ GuardThreadType

Definition at line 69 of file MulticastTransport.h.

◆ GuardType

Definition at line 66 of file MulticastTransport.h.

◆ LockType

Definition at line 65 of file MulticastTransport.h.

◆ Peers

Definition at line 93 of file MulticastTransport.h.

◆ ThreadLockType

Definition at line 68 of file MulticastTransport.h.

Constructor & Destructor Documentation

◆ MulticastTransport()

OpenDDS::DCPS::MulticastTransport::MulticastTransport ( const MulticastInst_rch inst)
explicit

Definition at line 31 of file MulticastTransport.cpp.

References configure_i(), and OpenDDS::DCPS::TransportImpl::open().

32  : TransportImpl(inst)
33 {
34  if (! (configure_i(inst) && open())) {
35  throw Transport::UnableToCreate();
36  }
37 }
bool configure_i(const MulticastInst_rch &config)
TransportImpl(TransportInst_rch config)

◆ ~MulticastTransport()

OpenDDS::DCPS::MulticastTransport::~MulticastTransport ( )

Definition at line 39 of file MulticastTransport.cpp.

40 {
41 }

Member Function Documentation

◆ accept_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::accept_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
protectedvirtual

accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

active

active

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 210 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, config(), connections_lock_, OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), OpenDDS::DCPS::RepoIdConverter::participantId(), pending_connections_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::push_back(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::RcHandle< T >::reset(), server_links_, start_session(), and VDBG.

213 {
214  MulticastInst_rch cfg = config();
215 
216  if (!cfg) {
217  return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
218  }
219 
220  // Check that the remote reliability matches.
221  if (get_remote_reliability(remote) != cfg->is_reliable()) {
222  return AcceptConnectResult();
223  }
224 
225  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
226  | RepoIdConverter(attribs.local_id_).participantId();
227 
228  GuardThreadType guard_links(this->links_lock_);
229 
230  Links::const_iterator link_iter = this->server_links_.find(local_peer);
232 
233  if (link_iter == this->server_links_.end()) {
234 
235  link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/);
236  this->server_links_[local_peer] = link;
237  } else {
238  link = link_iter->second;
239  }
240 
241  guard_links.release();
242 
243  MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
244  | RepoIdConverter(remote.repo_id_).participantId();
245 
246  GuardThreadType guard(this->connections_lock_);
247 
248  if (connections_.count(std::make_pair(remote_peer, local_peer))) {
249  //can't call start session with connections_lock_ due to reactor
250  //call in session->start which could deadlock with passive_connection
251  guard.release();
252 
253  VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
254  MulticastSession_rch session(
255  this->start_session(link, remote_peer, false /*!active*/));
256 
257  if (session.is_nil()) {
258  link.reset();
259  }
260  return AcceptConnectResult(link);
261 
262  } else {
263 
264  if (cfg->is_reliable()) {
265  pending_connections_[std::make_pair(remote_peer, local_peer)].
266  push_back(std::make_pair(client, remote.repo_id_));
267  }
268 
269  //can't call start session with connections_lock_ due to reactor
270  //call in session->start which could deadlock with passive_connection
271  guard.release();
272  MulticastSession_rch session(
273  this->start_session(link, remote_peer, false /*!active*/));
274 
275  if (!session) {
276  return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
277  } else if (cfg->is_reliable()) {
278  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
279  } else {
280  return AcceptConnectResult(link);
281  }
282  }
283 }
MulticastSession_rch start_session(const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
ACE_Guard< ThreadLockType > GuardThreadType
MulticastDataLink_rch make_datalink(const GUID_t &local_id, Priority priority, bool active)
static bool get_remote_reliability(const TransportImpl::RemoteTransport &remote)
RcHandle< MulticastSession > MulticastSession_rch
MulticastInst_rch config() const
#define VDBG(DBG_ARGS)
RcHandle< MulticastInst > MulticastInst_rch
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
RcHandle< MulticastDataLink > MulticastDataLink_rch
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ client_stop()

void OpenDDS::DCPS::MulticastTransport::client_stop ( const GUID_t localId)
protectedvirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 435 of file MulticastTransport.cpp.

References client_links_, OpenDDS::DCPS::RepoIdConverter::federationId(), links_lock_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::RepoIdConverter::participantId(), and ACE_Guard< ACE_LOCK >::release().

436 {
437  GuardThreadType guard_links(this->links_lock_);
438  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(localId).federationId() << 32
439  | RepoIdConverter(localId).participantId();
440  Links::const_iterator link_iter = this->client_links_.find(local_peer);
442 
443  if (link_iter != this->client_links_.end()) {
444  link = link_iter->second;
445  }
446  guard_links.release();
447 
448  if (link) {
449  link->client_stop(localId);
450  }
451 }
ACE_Guard< ThreadLockType > GuardThreadType
RcHandle< MulticastDataLink > MulticastDataLink_rch
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ config()

MulticastInst_rch OpenDDS::DCPS::MulticastTransport::config ( ) const

Definition at line 45 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::config(), and OpenDDS::DCPS::dynamic_rchandle_cast().

Referenced by accept_datalink(), connect_datalink(), connection_info_i(), make_datalink(), passive_connection(), and start_session().

46 {
47  return dynamic_rchandle_cast<MulticastInst>(TransportImpl::config());
48 }
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
TransportInst_rch config() const

◆ configure_i()

bool OpenDDS::DCPS::MulticastTransport::configure_i ( const MulticastInst_rch config)
protected

Definition at line 368 of file MulticastTransport.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::TransportImpl::create_reactor_task(), INET6_ADDRSTRLEN, OpenDDS::DCPS::LogAddr::ip(), LM_ERROR, and TheServiceParticipant.

Referenced by MulticastTransport().

369 {
370  if (!config) {
371  return false;
372  }
373 
374  // Override with DCPSDefaultAddress.
375  if (config->local_address_.empty() &&
376  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
377  char buffer[INET6_ADDRSTRLEN];
378  config->local_address_ = TheServiceParticipant->default_address().to_addr().get_host_addr(buffer, sizeof buffer);
379  }
380 
381  if (!config->group_address_.is_multicast()) {
382  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MulticastTransport[%@]::configure_i: ")
383  ACE_TEXT("invalid configuration: address %C is not multicast.\n"),
384  this, LogAddr::ip(config->group_address_).c_str()), false);
385  }
386 
387  this->create_reactor_task(config->async_send_, "MulticastTransport" + config->name());
388 
389  return true;
390 }
static const String ip(const ACE_INET_Addr &addr)
Definition: LogAddr.cpp:15
#define INET6_ADDRSTRLEN
MulticastInst_rch config() const
ACE_TEXT("TCP_Factory")
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant

◆ connect_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::connect_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
protectedvirtual

connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 150 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, client_links_, config(), OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), OpenDDS::DCPS::RepoIdConverter::participantId(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, and start_session().

153 {
154  MulticastInst_rch cfg = config();
155 
156  if (!cfg) {
157  return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
158  }
159 
160  // Check that the remote reliability matches.
161  if (get_remote_reliability(remote) != cfg->is_reliable()) {
162  return AcceptConnectResult();
163  }
164 
165  GuardThreadType guard_links(this->links_lock_);
166  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
167  | RepoIdConverter(attribs.local_id_).participantId();
168  Links::const_iterator link_iter = this->client_links_.find(local_peer);
170 
171  if (link_iter == this->client_links_.end()) {
172  link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/);
173  this->client_links_[local_peer] = link;
174  } else {
175  link = link_iter->second;
176  }
177 
178  MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
179  | RepoIdConverter(remote.repo_id_).participantId();
180 
181  if (cfg->is_reliable()) {
182  link->add_on_start_callback(client, remote.repo_id_);
183  }
184 
185  MulticastSession_rch session(
186  this->start_session(link, remote_peer, true /*active*/));
187 
188  if (session.is_nil()) {
189  Links::iterator to_remove = this->client_links_.find(local_peer);
190  if (to_remove != this->client_links_.end()) {
191  this->client_links_.erase(to_remove);
192  }
193  link->remove_on_start_callback(client, remote.repo_id_);
194  return AcceptConnectResult();
195  }
196 
197  if (cfg->is_reliable()) {
198  session->add_remote(attribs.local_id_, remote.repo_id_);
199  if (remote_peer != local_peer) {
200  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
201  }
202  } else {
203  session->add_remote(attribs.local_id_);
204  }
205 
206  return AcceptConnectResult(link);
207 }
MulticastSession_rch start_session(const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
ACE_Guard< ThreadLockType > GuardThreadType
MulticastDataLink_rch make_datalink(const GUID_t &local_id, Priority priority, bool active)
static bool get_remote_reliability(const TransportImpl::RemoteTransport &remote)
RcHandle< MulticastSession > MulticastSession_rch
MulticastInst_rch config() const
RcHandle< MulticastInst > MulticastInst_rch
RcHandle< MulticastDataLink > MulticastDataLink_rch
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ connection_info_i()

bool OpenDDS::DCPS::MulticastTransport::connection_info_i ( TransportLocator local_info,
ConnectionInfoFlags  flags 
) const
protectedvirtual

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 418 of file MulticastTransport.cpp.

References config().

419 {
420  MulticastInst_rch cfg = config();
421  if (cfg) {
422  cfg->populate_locator(info, flags);
423  return true;
424  }
425  return false;
426 }
MulticastInst_rch config() const
RcHandle< MulticastInst > MulticastInst_rch

◆ make_datalink()

MulticastDataLink_rch OpenDDS::DCPS::MulticastTransport::make_datalink ( const GUID_t local_id,
Priority  priority,
bool  active 
)
private

Definition at line 51 of file MulticastTransport.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::LogAddr::c_str(), config(), OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::LogAddr::HostPort, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::RepoIdConverter::participantId(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), and VDBG_LVL.

Referenced by accept_datalink(), and connect_datalink().

54 {
55  RcHandle<MulticastSessionFactory> session_factory;
56  MulticastInst_rch cfg = config();
57  if (cfg && cfg->is_reliable()) {
58  session_factory = make_rch<ReliableSessionFactory>();
59  } else {
60  session_factory = make_rch<BestEffortSessionFactory>();
61  }
62 
63  MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
64  | RepoIdConverter(local_id).participantId();
65 
66  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
67  "peers: local %#08x%08x priority %d active %d\n",
68  cfg ? cfg->name().c_str() : "", (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
69  priority, active), 2);
70 
71  MulticastDataLink_rch link(make_rch<MulticastDataLink>(rchandle_from(this),
72  session_factory,
73  local_peer,
74  ref(cfg),
75  reactor_task(),
76  active));
77 
78  // Join multicast group:
79  if (!link->join(cfg->group_address_)) {
80  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MulticastTransport::make_datalink: ")
81  ACE_TEXT("failed to join multicast group: %C!\n"),
82  LogAddr(cfg->group_address_, LogAddr::HostPort).c_str()));
83  return MulticastDataLink_rch();
84  }
85 
86  return link;
87 }
#define ACE_ERROR(X)
ReactorTask_rch reactor_task()
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
MulticastInst_rch config() const
RcHandle< MulticastInst > MulticastInst_rch
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define VDBG_LVL(DBG_ARGS, LEVEL)
RcHandle< MulticastDataLink > MulticastDataLink_rch
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ OPENDDS_MAP() [1/2]

typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( MulticastPeer  ,
MulticastDataLink_rch   
)
private

link for pubs.

◆ OPENDDS_MAP() [2/2]

typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( Peers  ,
Callbacks   
)
private

◆ OPENDDS_SET()

OpenDDS::DCPS::MulticastTransport::OPENDDS_SET ( Peers  )
private

◆ passive_connection()

void OpenDDS::DCPS::MulticastTransport::passive_connection ( MulticastPeer  local_peer,
MulticastPeer  remote_peer 
)

Definition at line 314 of file MulticastTransport.cpp.

References ACE_Guard< ACE_LOCK >::acquire(), config(), connections_lock_, OpenDDS::DCPS::find(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), pending_connections_, ACE_Guard< ACE_LOCK >::release(), server_links_, OpenDDS::DCPS::static_rchandle_cast(), and VDBG_LVL.

315 {
316  GuardThreadType guard(this->connections_lock_);
317 
318  MulticastInst_rch cfg = config();
319 
320  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
321  "from remote peer %#08x%08x to local peer %#08x%08x\n",
322  cfg ? cfg->name().c_str() : "",
323  (unsigned int) (remote_peer >> 32),
324  (unsigned int) remote_peer,
325  (unsigned int) (local_peer >> 32),
326  (unsigned int) local_peer), 2);
327 
328  const Peers peers(remote_peer, local_peer);
329  const PendConnMap::iterator pend = this->pending_connections_.find(peers);
330  //if connection was pending, calls to use_datalink finalized the connection
331  //if it was not previously pending, accept_datalink() will finalize connection
332 
333  this->connections_.insert(peers);
334 
335  Links::const_iterator server_link = this->server_links_.find(local_peer);
336  DataLink_rch link;
337 
338  if (server_link != this->server_links_.end()) {
339  link = static_rchandle_cast<DataLink>(server_link->second);
340  MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer));
341  session->set_acked();
342  }
343 
344  if (pend != pending_connections_.end()) {
345  Callbacks tmp(pend->second);
346  for (size_t i = 0; i < tmp.size(); ++i) {
347  const PendConnMap::iterator pend = pending_connections_.find(peers);
348  if (pend != pending_connections_.end()) {
349  const Callbacks::iterator tmp_iter = find(pend->second.begin(),
350  pend->second.end(),
351  tmp.at(i));
352  if (tmp_iter != pend->second.end()) {
353  TransportClient_wrch pend_client = tmp.at(i).first;
354  GUID_t remote_repo = tmp.at(i).second;
355  guard.release();
356  TransportClient_rch client = pend_client.lock();
357  if (client) {
358  client->use_datalink(remote_repo, link);
359  }
360  guard.acquire();
361  }
362  }
363  }
364  }
365 }
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
std::pair< MulticastPeer, MulticastPeer > Peers
ACE_Guard< ThreadLockType > GuardThreadType
WeakRcHandle< TransportClient > TransportClient_wrch
Definition: TransportImpl.h:45
RcHandle< TransportClient > TransportClient_rch
RcHandle< DataLink > DataLink_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLink_rch.h:34
RcHandle< MulticastSession > MulticastSession_rch
MulticastInst_rch config() const
RcHandle< MulticastInst > MulticastInst_rch
std::vector< DataLink::OnStartCallback > Callbacks
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ release_datalink()

void OpenDDS::DCPS::MulticastTransport::release_datalink ( DataLink link)
protectedvirtual

Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 429 of file MulticastTransport.cpp.

430 {
431  // No-op for multicast: keep both the client_link_ and server_link_ around
432  // until the transport is shut down.
433 }

◆ shutdown_i()

void OpenDDS::DCPS::MulticastTransport::shutdown_i ( )
protectedvirtual

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 393 of file MulticastTransport.cpp.

References client_links_, links_lock_, and server_links_.

394 {
395  GuardThreadType guard_links(this->links_lock_);
396  Links::iterator link;
397 
398  for (link = this->client_links_.begin();
399  link != this->client_links_.end();
400  ++link) {
401  if (link->second.in()) {
402  link->second->transport_shutdown();
403  }
404  }
405  client_links_.clear();
406 
407  for (link = this->server_links_.begin();
408  link != this->server_links_.end();
409  ++link) {
410  if (link->second.in()) {
411  link->second->transport_shutdown();
412  }
413  }
414  server_links_.clear();
415 }
ACE_Guard< ThreadLockType > GuardThreadType

◆ start_session()

MulticastSession_rch OpenDDS::DCPS::MulticastTransport::start_session ( const MulticastDataLink_rch link,
MulticastPeer  remote_peer,
bool  active 
)
private

Definition at line 90 of file MulticastTransport.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), config(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and LM_ERROR.

Referenced by accept_datalink(), and connect_datalink().

92 {
93  MulticastInst_rch cfg = config();
94 
95  if (link.is_nil()) {
96  ACE_ERROR_RETURN((LM_ERROR,
97  ACE_TEXT("(%P|%t) ERROR: ")
98  ACE_TEXT("MulticastTransport[%C]::start_session: ")
99  ACE_TEXT("link is nil\n"),
100  cfg ? cfg->name().c_str() : ""),
102  }
103 
104  MulticastSession_rch session(link->find_or_create_session(remote_peer));
105 
106  if (session.is_nil()) {
107  ACE_ERROR_RETURN((LM_ERROR,
108  ACE_TEXT("(%P|%t) ERROR: ")
109  ACE_TEXT("MulticastTransport[%C]::start_session: ")
110  ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
111  cfg ? cfg->name().c_str() : "",
112  (unsigned int)(remote_peer >> 32),
113  (unsigned int) remote_peer),
115  }
116 
117  const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
118 
119  if (!session->start(active, acked)) {
120  ACE_ERROR_RETURN((LM_ERROR,
121  ACE_TEXT("(%P|%t) ERROR: ")
122  ACE_TEXT("MulticastTransport[%C]::start_session: ")
123  ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
124  cfg ? cfg->name().c_str() : "",
125  (unsigned int)(remote_peer >> 32),
126  (unsigned int) remote_peer),
128  }
129 
130  return session;
131 }
RcHandle< MulticastSession > MulticastSession_rch
MulticastInst_rch config() const
RcHandle< MulticastInst > MulticastInst_rch
ACE_TEXT("TCP_Factory")
#define ACE_ERROR_RETURN(X, Y)

◆ stop_accepting_or_connecting()

void OpenDDS::DCPS::MulticastTransport::stop_accepting_or_connecting ( const TransportClient_wrch client,
const GUID_t remote_id,
bool  disassociate,
bool  association_failed 
)
protectedvirtual

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 286 of file MulticastTransport.cpp.

References connections_lock_, LM_DEBUG, pending_connections_, and VDBG.

290 {
291  VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
292 
293  GuardThreadType guard(this->connections_lock_);
294 
295  for (PendConnMap::iterator it = this->pending_connections_.begin();
296  it != this->pending_connections_.end(); ++it) {
297  bool erased_from_it = false;
298  for (size_t i = 0; i < it->second.size(); ++i) {
299  if (it->second[i].first == client && it->second[i].second == remote_id) {
300  erased_from_it = true;
301  it->second.erase(it->second.begin() + i);
302  break;
303  }
304  }
305 
306  if (erased_from_it && it->second.empty()) {
307  this->pending_connections_.erase(it);
308  return;
309  }
310  }
311 }
ACE_Guard< ThreadLockType > GuardThreadType
#define VDBG(DBG_ARGS)

◆ transport_type()

virtual std::string OpenDDS::DCPS::MulticastTransport::transport_type ( ) const
inlineprotectedvirtual

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 59 of file MulticastTransport.h.

59 { return "multicast"; }

Member Data Documentation

◆ client_links_

Links OpenDDS::DCPS::MulticastTransport::client_links_
private

Definition at line 81 of file MulticastTransport.h.

Referenced by client_stop(), connect_datalink(), and shutdown_i().

◆ connections_lock_

ThreadLockType OpenDDS::DCPS::MulticastTransport::connections_lock_
private

◆ links_lock_

ThreadLockType OpenDDS::DCPS::MulticastTransport::links_lock_
private

Definition at line 78 of file MulticastTransport.h.

Referenced by accept_datalink(), client_stop(), connect_datalink(), and shutdown_i().

◆ pending_connections_

PendConnMap OpenDDS::DCPS::MulticastTransport::pending_connections_
private

◆ server_links_

Links OpenDDS::DCPS::MulticastTransport::server_links_
private

link for subs.

Definition at line 83 of file MulticastTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().


The documentation for this class was generated from the following files: