OpenDDS::DCPS::MulticastTransport Class Reference

#include <MulticastTransport.h>

Inheritance diagram for OpenDDS::DCPS::MulticastTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastTransport:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 MulticastTransport (MulticastInst &inst)
 ~MulticastTransport ()
void passive_connection (MulticastPeer local_peer, MulticastPeer remote_peer)
MulticastInstconfig () const

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual void stop_accepting_or_connecting (const TransportClient_wrch &client, const RepoId &remote_id)
bool configure_i (MulticastInst &config)
virtual void shutdown_i ()
virtual bool connection_info_i (TransportLocator &info) const
virtual void release_datalink (DataLink *link)
virtual std::string transport_type () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Thread_Mutex ThreadLockType
typedef ACE_Guard< ThreadLockTypeGuardThreadType
typedef std::vector
< DataLink::OnStartCallback
Callbacks
typedef std::pair
< MulticastPeer, MulticastPeer
Peers

Private Member Functions

MulticastDataLink_rch make_datalink (const RepoId &local_id, Priority priority, bool active)
MulticastSession_rch start_session (const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
typedef OPENDDS_MAP (MulticastPeer, MulticastDataLink_rch) Links
 link for pubs.
typedef OPENDDS_MAP (Peers, Callbacks) PendConnMap
 OPENDDS_SET (Peers) connections_

Private Attributes

ThreadLockType links_lock_
Links client_links_
Links server_links_
 link for subs.
ThreadLockType connections_lock_
PendConnMap pending_connections_

Detailed Description

Definition at line 29 of file MulticastTransport.h.


Member Typedef Documentation

Definition at line 91 of file MulticastTransport.h.

Definition at line 66 of file MulticastTransport.h.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 63 of file MulticastTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::MulticastTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 62 of file MulticastTransport.h.

Definition at line 92 of file MulticastTransport.h.

Definition at line 65 of file MulticastTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::MulticastTransport::MulticastTransport ( MulticastInst inst  )  [explicit]

Definition at line 30 of file MulticastTransport.cpp.

References configure_i(), and OpenDDS::DCPS::TransportImpl::open().

00031   : TransportImpl(inst)
00032 {
00033   if (! (configure_i(inst) && open())) {
00034     throw Transport::UnableToCreate();
00035   }
00036 }

Here is the call graph for this function:

OpenDDS::DCPS::MulticastTransport::~MulticastTransport (  ) 

Definition at line 38 of file MulticastTransport.cpp.

00039 {
00040 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::accept_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
) [protected, virtual]

accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

active

active

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 194 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, config(), connections_lock_, OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), pending_connections_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::push_back(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::RcHandle< T >::reset(), server_links_, start_session(), and VDBG.

00197 {
00198   // Check that the remote reliability matches.
00199   if (get_remote_reliability(remote) != this->config().is_reliable()) {
00200     return AcceptConnectResult();
00201   }
00202 
00203   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00204                                  | RepoIdConverter(attribs.local_id_).participantId();
00205 
00206   GuardThreadType guard_links(this->links_lock_);
00207 
00208   Links::const_iterator link_iter = this->server_links_.find(local_peer);
00209   MulticastDataLink_rch link;
00210 
00211   if (link_iter == this->server_links_.end()) {
00212 
00213     link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/);
00214     this->server_links_[local_peer] = link;
00215   } else {
00216     link = link_iter->second;
00217   }
00218 
00219   guard_links.release();
00220 
00221   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00222                             | RepoIdConverter(remote.repo_id_).participantId();
00223   GuardThreadType guard(this->connections_lock_);
00224 
00225   if (connections_.count(std::make_pair(remote_peer, local_peer))) {
00226     //can't call start session with connections_lock_ due to reactor
00227     //call in session->start which could deadlock with passive_connection
00228     guard.release();
00229 
00230     VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
00231     MulticastSession_rch session(
00232       this->start_session(link, remote_peer, false /*!active*/));
00233 
00234     if (session.is_nil()) {
00235       link.reset();
00236     }
00237     return AcceptConnectResult(link);
00238 
00239   } else {
00240 
00241     this->pending_connections_[std::make_pair(remote_peer, local_peer)].
00242     push_back(std::make_pair(client, remote.repo_id_));
00243     //can't call start session with connections_lock_ due to reactor
00244     //call in session->start which could deadlock with passive_connection
00245     guard.release();
00246     MulticastSession_rch session(
00247       this->start_session(link, remote_peer, false /*!active*/));
00248 
00249     return AcceptConnectResult(
00250       session ? AcceptConnectResult::ACR_SUCCESS : AcceptConnectResult::ACR_FAILED
00251     );
00252 
00253   }
00254 }

Here is the call graph for this function:

MulticastInst & OpenDDS::DCPS::MulticastTransport::config (  )  const

Expose the configuration information so others can see what we can do.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 44 of file MulticastTransport.cpp.

Referenced by accept_datalink(), OpenDDS::DCPS::MulticastDataLink::config(), configure_i(), connect_datalink(), connection_info_i(), make_datalink(), passive_connection(), and start_session().

00045 {
00046   return static_cast<MulticastInst&>(TransportImpl::config());
00047 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::MulticastTransport::configure_i ( MulticastInst config  )  [protected]

Definition at line 333 of file MulticastTransport.cpp.

References ACE_TEXT(), OpenDDS::DCPS::MulticastInst::async_send_, config(), OpenDDS::DCPS::TransportImpl::create_reactor_task(), OpenDDS::DCPS::MulticastInst::group_address_, ACE_INET_Addr::is_multicast(), LM_ERROR, OpenDDS::DCPS::MulticastInst::local_address_, and TheServiceParticipant.

Referenced by MulticastTransport().

00334 {
00335   // Override with DCPSDefaultAddress.
00336   if (config.local_address_.empty () &&
00337       !TheServiceParticipant->default_address ().empty ()) {
00338     config.local_address_ = TheServiceParticipant->default_address ().c_str ();
00339   }
00340 
00341   if (!config.group_address_.is_multicast()) {
00342     ACE_ERROR_RETURN((LM_ERROR,
00343                       ACE_TEXT("(%P|%t) ERROR: ")
00344                       ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00345                       ACE_TEXT("invalid configuration: address %C is not ")
00346                       ACE_TEXT("multicast.\n"),
00347                       this, this->config().group_address_.get_host_addr()),
00348                      false);
00349   }
00350 
00351   this->create_reactor_task(config.async_send_);
00352 
00353   return true;
00354 }

Here is the call graph for this function:

Here is the caller graph for this function:

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::connect_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
) [protected, virtual]

connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 155 of file MulticastTransport.cpp.

References client_links_, config(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, and start_session().

00158 {
00159   // Check that the remote reliability matches.
00160   if (get_remote_reliability(remote) != this->config().is_reliable()) {
00161     return AcceptConnectResult();
00162   }
00163 
00164   GuardThreadType guard_links(this->links_lock_);
00165   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00166                                  | RepoIdConverter(attribs.local_id_).participantId();
00167   Links::const_iterator link_iter = this->client_links_.find(local_peer);
00168   MulticastDataLink_rch link;
00169 
00170   if (link_iter == this->client_links_.end()) {
00171     link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/);
00172     this->client_links_[local_peer] = link;
00173   } else {
00174     link = link_iter->second;
00175   }
00176 
00177   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00178                             | RepoIdConverter(remote.repo_id_).participantId();
00179 
00180   MulticastSession_rch session(
00181     this->start_session(link, remote_peer, true /*active*/));
00182 
00183   if (session.is_nil()) {
00184     Links::iterator to_remove = this->client_links_.find(local_peer);
00185     if (to_remove != this->client_links_.end()) {
00186       this->client_links_.erase(to_remove);
00187     }
00188     return AcceptConnectResult();
00189   }
00190   return AcceptConnectResult(link);
00191 }

Here is the call graph for this function:

bool OpenDDS::DCPS::MulticastTransport::connection_info_i ( TransportLocator local_info  )  const [protected, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 382 of file MulticastTransport.cpp.

References config(), and OpenDDS::DCPS::MulticastInst::populate_locator().

00383 {
00384   this->config().populate_locator(info);
00385   return true;
00386 }

Here is the call graph for this function:

MulticastDataLink_rch OpenDDS::DCPS::MulticastTransport::make_datalink ( const RepoId local_id,
Priority  priority,
bool  active 
) [private]

Definition at line 50 of file MulticastTransport.cpp.

References ACE_TEXT(), ACE_INET_Addr::addr_to_string(), config(), OpenDDS::DCPS::MulticastInst::group_address_, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), str, and VDBG_LVL.

Referenced by accept_datalink(), and connect_datalink().

00053 {
00054 
00055   RcHandle<MulticastSessionFactory> session_factory;
00056 
00057   if (this->config().is_reliable()) {
00058     session_factory = make_rch<ReliableSessionFactory>();
00059 
00060   } else {
00061     session_factory = make_rch<BestEffortSessionFactory>();
00062   }
00063 
00064   MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
00065                            | RepoIdConverter(local_id).participantId();
00066 
00067   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
00068             "peers: local %#08x%08x priority %d active %d\n",
00069             this->config().name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
00070             priority, active), 2);
00071 
00072   TransportReactorTask_rch rtask(reactor_task());
00073   MulticastDataLink_rch link(make_rch<MulticastDataLink>(ref(*this),
00074                                    session_factory,
00075                                    local_peer,
00076                                    ref(config()),
00077                                    rtask.in(),
00078                                    active));
00079 
00080   // Join multicast group:
00081   if (!link->join(this->config().group_address_)) {
00082     ACE_TCHAR str[64];
00083     this->config().group_address_.addr_to_string(str,
00084                                         sizeof(str)/sizeof(str[0]));
00085     ACE_ERROR((LM_ERROR,
00086                     ACE_TEXT("(%P|%t) ERROR: ")
00087                     ACE_TEXT("MulticastTransport::make_datalink: ")
00088                     ACE_TEXT("failed to join multicast group: %s!\n"),
00089                     str));
00090     return MulticastDataLink_rch();
00091   }
00092 
00093   return link;
00094 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( Peers  ,
Callbacks   
) [private]
typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( MulticastPeer  ,
MulticastDataLink_rch   
) [private]

link for pubs.

OpenDDS::DCPS::MulticastTransport::OPENDDS_SET ( Peers   )  [private]
void OpenDDS::DCPS::MulticastTransport::passive_connection ( MulticastPeer  local_peer,
MulticastPeer  remote_peer 
)

Definition at line 283 of file MulticastTransport.cpp.

References ACE_Guard< ACE_LOCK >::acquire(), config(), connections_lock_, OpenDDS::DCPS::find(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), pending_connections_, ACE_Guard< ACE_LOCK >::release(), server_links_, OpenDDS::DCPS::static_rchandle_cast(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00284 {
00285   GuardThreadType guard(this->connections_lock_);
00286 
00287   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
00288             "from remote peer %#08x%08x to local peer %#08x%08x\n",
00289             this->config().name().c_str(),
00290             (unsigned int) (remote_peer >> 32),
00291             (unsigned int) remote_peer,
00292             (unsigned int) (local_peer >> 32),
00293             (unsigned int) local_peer), 2);
00294 
00295   const Peers peers(remote_peer, local_peer);
00296   const PendConnMap::iterator pend = this->pending_connections_.find(peers);
00297   //if connection was pending, calls to use_datalink finalized the connection
00298   //if it was not previously pending, accept_datalink() will finalize connection
00299   this->connections_.insert(peers);
00300 
00301   Links::const_iterator server_link = this->server_links_.find(local_peer);
00302   DataLink_rch link;
00303 
00304   if (server_link != this->server_links_.end()) {
00305     link = static_rchandle_cast<DataLink>(server_link->second);
00306     MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer));
00307     session->set_acked();
00308   }
00309 
00310   if (pend != pending_connections_.end()) {
00311     Callbacks tmp(pend->second);
00312     for (size_t i = 0; i < tmp.size(); ++i) {
00313       const PendConnMap::iterator pend = pending_connections_.find(peers);
00314       if (pend != pending_connections_.end()) {
00315         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00316                                                   pend->second.end(),
00317                                                   tmp.at(i));
00318         if (tmp_iter != pend->second.end()) {
00319           TransportClient_wrch pend_client = tmp.at(i).first;
00320           RepoId remote_repo = tmp.at(i).second;
00321           guard.release();
00322           TransportClient_rch client = pend_client.lock();
00323           if (client)
00324             client->use_datalink(remote_repo, link);
00325           guard.acquire();
00326         }
00327       }
00328     }
00329   }
00330 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::MulticastTransport::release_datalink ( DataLink link  )  [protected, virtual]

Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 389 of file MulticastTransport.cpp.

00390 {
00391   // No-op for multicast: keep both the client_link_ and server_link_ around
00392   // until the transport is shut down.
00393 }

void OpenDDS::DCPS::MulticastTransport::shutdown_i (  )  [protected, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 357 of file MulticastTransport.cpp.

References client_links_, links_lock_, and server_links_.

00358 {
00359   GuardThreadType guard_links(this->links_lock_);
00360   Links::iterator link;
00361 
00362   for (link = this->client_links_.begin();
00363        link != this->client_links_.end();
00364        ++link) {
00365     if (link->second.in()) {
00366       link->second->transport_shutdown();
00367     }
00368   }
00369   client_links_.clear();
00370 
00371   for (link = this->server_links_.begin();
00372        link != this->server_links_.end();
00373        ++link) {
00374     if (link->second.in()) {
00375       link->second->transport_shutdown();
00376     }
00377   }
00378   server_links_.clear();
00379 }

MulticastSession_rch OpenDDS::DCPS::MulticastTransport::start_session ( const MulticastDataLink_rch link,
MulticastPeer  remote_peer,
bool  active 
) [private]

Definition at line 97 of file MulticastTransport.cpp.

References ACE_TEXT(), config(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and LM_ERROR.

Referenced by accept_datalink(), and connect_datalink().

00099 {
00100   if (link.is_nil()) {
00101     ACE_ERROR_RETURN((LM_ERROR,
00102                       ACE_TEXT("(%P|%t) ERROR: ")
00103                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00104                       ACE_TEXT("link is nil\n"),
00105                       this->config().name().c_str()),
00106                      MulticastSession_rch());
00107   }
00108 
00109   MulticastSession_rch session(link->find_or_create_session(remote_peer));
00110 
00111   if (session.is_nil()) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       ACE_TEXT("(%P|%t) ERROR: ")
00114                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00115                       ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00116                       this->config().name().c_str(),
00117                       (unsigned int)(remote_peer >> 32),
00118                       (unsigned int) remote_peer),
00119                      MulticastSession_rch());
00120   }
00121 
00122   const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
00123 
00124   if (!session->start(active, acked)) {
00125     ACE_ERROR_RETURN((LM_ERROR,
00126                       ACE_TEXT("(%P|%t) ERROR: ")
00127                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00128                       ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
00129                       this->config().name().c_str(),
00130                       (unsigned int)(remote_peer >> 32),
00131                       (unsigned int) remote_peer),
00132                      MulticastSession_rch());
00133   }
00134 
00135   return session;
00136 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::MulticastTransport::stop_accepting_or_connecting ( const TransportClient_wrch client,
const RepoId remote_id 
) [protected, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 257 of file MulticastTransport.cpp.

References connections_lock_, LM_DEBUG, pending_connections_, and VDBG.

00259 {
00260   VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
00261 
00262   GuardThreadType guard(this->connections_lock_);
00263 
00264   for (PendConnMap::iterator it = this->pending_connections_.begin();
00265        it != this->pending_connections_.end(); ++it) {
00266     bool erased_from_it = false;
00267     for (size_t i = 0; i < it->second.size(); ++i) {
00268       if (it->second[i].first == client && it->second[i].second == remote_id) {
00269         erased_from_it = true;
00270         it->second.erase(it->second.begin() + i);
00271         break;
00272       }
00273     }
00274 
00275     if (erased_from_it && it->second.empty()) {
00276       this->pending_connections_.erase(it);
00277       return;
00278     }
00279   }
00280 }

virtual std::string OpenDDS::DCPS::MulticastTransport::transport_type (  )  const [inline, protected, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 58 of file MulticastTransport.h.

00058 { return "multicast"; }


Member Data Documentation

Definition at line 80 of file MulticastTransport.h.

Referenced by connect_datalink(), and shutdown_i().

Definition at line 77 of file MulticastTransport.h.

Referenced by accept_datalink(), connect_datalink(), and shutdown_i().

link for subs.

Definition at line 82 of file MulticastTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1