OpenDDS::DCPS::MulticastTransport Class Reference

#include <MulticastTransport.h>

Inheritance diagram for OpenDDS::DCPS::MulticastTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 MulticastTransport (const TransportInst_rch &inst)
 ~MulticastTransport ()
void passive_connection (MulticastPeer local_peer, MulticastPeer remote_peer)

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual void stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id)
virtual bool configure_i (TransportInst *config)
virtual void shutdown_i ()
virtual bool connection_info_i (TransportLocator &info) const
virtual void release_datalink (DataLink *link)
virtual std::string transport_type () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Thread_Mutex ThreadLockType
typedef ACE_Guard< ThreadLockTypeGuardThreadType
typedef std::vector< DataLink::OnStartCallbackCallbacks
typedef std::pair< MulticastPeer,
MulticastPeer
Peers

Private Member Functions

MulticastDataLinkmake_datalink (const RepoId &local_id, Priority priority, bool active)
MulticastSessionstart_session (const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
typedef OPENDDS_MAP (MulticastPeer, MulticastDataLink_rch) Links
 link for pubs.
typedef OPENDDS_MAP (Peers, Callbacks) PendConnMap
 OPENDDS_SET (Peers) connections_

Private Attributes

RcHandle< MulticastInstconfig_i_
ThreadLockType links_lock_
Links client_links_
Links server_links_
 link for subs.
ThreadLockType connections_lock_
PendConnMap pending_connections_

Detailed Description

Definition at line 25 of file MulticastTransport.h.


Member Typedef Documentation

typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::MulticastTransport::Callbacks [private]

Definition at line 85 of file MulticastTransport.h.

typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::MulticastTransport::GuardThreadType [private]

Definition at line 60 of file MulticastTransport.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::MulticastTransport::GuardType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 57 of file MulticastTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::MulticastTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 56 of file MulticastTransport.h.

typedef std::pair<MulticastPeer, MulticastPeer> OpenDDS::DCPS::MulticastTransport::Peers [private]

Definition at line 86 of file MulticastTransport.h.

typedef ACE_Thread_Mutex OpenDDS::DCPS::MulticastTransport::ThreadLockType [private]

Definition at line 59 of file MulticastTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::MulticastTransport::MulticastTransport ( const TransportInst_rch inst  )  [explicit]

Definition at line 28 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

00029   : config_i_(0)
00030 {
00031   if (!inst.is_nil()) {
00032     if (!configure(inst.in())) {
00033       throw Transport::UnableToCreate();
00034     }
00035   }
00036 
00037 }

OpenDDS::DCPS::MulticastTransport::~MulticastTransport (  ) 

Definition at line 39 of file MulticastTransport.cpp.

00040 {
00041 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::accept_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 203 of file MulticastTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), pending_connections_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::TransportClient::repo_id_, server_links_, start_session(), and VDBG.

00206 {
00207   // Check that the remote reliability matches.
00208   if (get_remote_reliability(remote) != this->config_i_->is_reliable()) {
00209     return AcceptConnectResult();
00210   }
00211 
00212   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00213                                  | RepoIdConverter(attribs.local_id_).participantId();
00214 
00215   GuardThreadType guard_links(this->links_lock_);
00216 
00217   Links::const_iterator link_iter = this->server_links_.find(local_peer);
00218   MulticastDataLink_rch link;
00219 
00220   if (link_iter == this->server_links_.end()) {
00221 
00222     link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/);
00223     this->server_links_[local_peer] = link;
00224   } else {
00225     link = link_iter->second;
00226   }
00227 
00228   guard_links.release();
00229 
00230   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00231                             | RepoIdConverter(remote.repo_id_).participantId();
00232   GuardThreadType guard(this->connections_lock_);
00233 
00234   if (connections_.count(std::make_pair(remote_peer, local_peer))) {
00235     //can't call start session with connections_lock_ due to reactor
00236     //call in session->start which could deadlock with passive_connection
00237     guard.release();
00238 
00239     VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
00240     MulticastSession_rch session =
00241       this->start_session(link, remote_peer, false /*!active*/);
00242 
00243     if (session.is_nil()) {
00244       link = 0;
00245     }
00246     return AcceptConnectResult(link._retn());
00247 
00248   } else {
00249 
00250     this->pending_connections_[std::make_pair(remote_peer, local_peer)].
00251     push_back(std::pair<TransportClient*, RepoId>(client, remote.repo_id_));
00252     //can't call start session with connections_lock_ due to reactor
00253     //call in session->start which could deadlock with passive_connection
00254     guard.release();
00255     MulticastSession_rch session =
00256       this->start_session(link, remote_peer, false /*!active*/);
00257 
00258     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00259 
00260   }
00261 }

bool OpenDDS::DCPS::MulticastTransport::configure_i ( TransportInst config  )  [protected, virtual]

Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 338 of file MulticastTransport.cpp.

References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), and TheServiceParticipant.

00339 {
00340   this->config_i_ = dynamic_cast<MulticastInst*>(config);
00341 
00342   if (this->config_i_ == 0) {
00343     ACE_ERROR_RETURN((LM_ERROR,
00344                       ACE_TEXT("(%P|%t) ERROR: ")
00345                       ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00346                       ACE_TEXT("invalid configuration!\n"), this),
00347                      false);
00348   }
00349 
00350   this->config_i_->_add_ref();
00351 
00352   // Override with DCPSDefaultAddress.
00353   if (this->config_i_->local_address_.empty () &&
00354       !TheServiceParticipant->default_address ().empty ()) {
00355     this->config_i_->local_address_ = TheServiceParticipant->default_address ().c_str ();
00356   }
00357 
00358   if (!this->config_i_->group_address_.is_multicast()) {
00359     ACE_ERROR_RETURN((LM_ERROR,
00360                       ACE_TEXT("(%P|%t) ERROR: ")
00361                       ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00362                       ACE_TEXT("invalid configuration: address %C is not ")
00363                       ACE_TEXT("multicast.\n"),
00364                       this, this->config_i_->group_address_.get_host_addr()),
00365                      false);
00366   }
00367 
00368   this->create_reactor_task(this->config_i_->async_send_);
00369 
00370   return true;
00371 }

TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::connect_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 163 of file MulticastTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), client_links_, OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and start_session().

00166 {
00167   // Check that the remote reliability matches.
00168   if (get_remote_reliability(remote) != this->config_i_->is_reliable()) {
00169     return AcceptConnectResult();
00170   }
00171 
00172   GuardThreadType guard_links(this->links_lock_);
00173   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00174                                  | RepoIdConverter(attribs.local_id_).participantId();
00175   Links::const_iterator link_iter = this->client_links_.find(local_peer);
00176   MulticastDataLink_rch link;
00177 
00178   if (link_iter == this->client_links_.end()) {
00179 
00180     link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/);
00181     this->client_links_[local_peer] = link;
00182   } else {
00183     link = link_iter->second;
00184   }
00185 
00186   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00187                             | RepoIdConverter(remote.repo_id_).participantId();
00188 
00189   MulticastSession_rch session =
00190     this->start_session(link, remote_peer, true /*active*/);
00191 
00192   if (session.is_nil()) {
00193     Links::iterator to_remove = this->client_links_.find(local_peer);
00194     if (to_remove != this->client_links_.end()) {
00195       this->client_links_.erase(to_remove);
00196     }
00197     return AcceptConnectResult();
00198   }
00199   return AcceptConnectResult(link._retn());
00200 }

bool OpenDDS::DCPS::MulticastTransport::connection_info_i ( TransportLocator info  )  const [protected, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 399 of file MulticastTransport.cpp.

References config_i_.

00400 {
00401   this->config_i_->populate_locator(info);
00402   return true;
00403 }

MulticastDataLink * OpenDDS::DCPS::MulticastTransport::make_datalink ( const RepoId local_id,
Priority  priority,
bool  active 
) [private]

Definition at line 44 of file MulticastTransport.cpp.

References config_i_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportImpl::reactor_task(), and VDBG_LVL.

00047 {
00048   RcHandle<MulticastSessionFactory> session_factory;
00049 
00050   if (this->config_i_->reliable_) {
00051     ACE_NEW_RETURN(session_factory, ReliableSessionFactory, 0);
00052 
00053   } else {
00054     ACE_NEW_RETURN(session_factory, BestEffortSessionFactory, 0);
00055   }
00056 
00057   MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
00058                            | RepoIdConverter(local_id).participantId();
00059 
00060   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
00061             "peers: local %#08x%08x priority %d active %d\n",
00062             this->config_i_->name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
00063             priority, active), 2);
00064 
00065   MulticastDataLink_rch link;
00066   ACE_NEW_RETURN(link,
00067                  MulticastDataLink(this,
00068                                    session_factory.in(),
00069                                    local_peer,
00070                                    active),
00071                  0);
00072 
00073   // Configure link with transport configuration and reactor task:
00074   TransportReactorTask_rch rtask = reactor_task();
00075   //link->configure(this->config_i_.in(), rtask.in());
00076   link->configure(config_i_.in(), rtask.in());
00077 
00078   // Assign send strategy:
00079   MulticastSendStrategy* send_strategy;
00080   ACE_NEW_RETURN(send_strategy, MulticastSendStrategy(link.in()), 0);
00081   link->send_strategy(send_strategy);
00082 
00083   // Assign receive strategy:
00084   MulticastReceiveStrategy* recv_strategy;
00085   ACE_NEW_RETURN(recv_strategy, MulticastReceiveStrategy(link.in()), 0);
00086   link->receive_strategy(recv_strategy);
00087 
00088   // Join multicast group:
00089   if (!link->join(this->config_i_->group_address_)) {
00090     ACE_TCHAR str[64];
00091     this->config_i_->group_address_.addr_to_string(str,
00092                                                    sizeof(str)/sizeof(str[0]));
00093     ACE_ERROR_RETURN((LM_ERROR,
00094                       ACE_TEXT("(%P|%t) ERROR: ")
00095                       ACE_TEXT("MulticastTransport::make_datalink: ")
00096                       ACE_TEXT("failed to join multicast group: %s!\n"),
00097                       str),
00098                      0);
00099   }
00100 
00101   return link._retn();
00102 }

typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( Peers  ,
Callbacks   
) [private]

typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP ( MulticastPeer  ,
MulticastDataLink_rch   
) [private]

link for pubs.

OpenDDS::DCPS::MulticastTransport::OPENDDS_SET ( Peers   )  [private]

void OpenDDS::DCPS::MulticastTransport::passive_connection ( MulticastPeer  local_peer,
MulticastPeer  remote_peer 
)

Definition at line 290 of file MulticastTransport.cpp.

References OpenDDS::DCPS::find(), pending_connections_, server_links_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00291 {
00292   GuardThreadType guard(this->connections_lock_);
00293 
00294   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
00295             "from remote peer %#08x%08x to local peer %#08x%08x\n",
00296             this->config_i_->name().c_str(),
00297             (unsigned int) (remote_peer >> 32),
00298             (unsigned int) remote_peer,
00299             (unsigned int) (local_peer >> 32),
00300             (unsigned int) local_peer), 2);
00301 
00302   const Peers peers(remote_peer, local_peer);
00303   const PendConnMap::iterator pend = this->pending_connections_.find(peers);
00304   //if connection was pending, calls to use_datalink finalized the connection
00305   //if it was not previously pending, accept_datalink() will finalize connection
00306   this->connections_.insert(peers);
00307 
00308   Links::const_iterator server_link = this->server_links_.find(local_peer);
00309   DataLink_rch link;
00310 
00311   if (server_link != this->server_links_.end()) {
00312     link = static_rchandle_cast<DataLink>(server_link->second);
00313     MulticastSession_rch session = server_link->second->find_or_create_session(remote_peer);
00314     session->set_acked();
00315   }
00316 
00317   if (pend != pending_connections_.end()) {
00318     Callbacks tmp(pend->second);
00319     for (size_t i = 0; i < tmp.size(); ++i) {
00320       const PendConnMap::iterator pend = pending_connections_.find(peers);
00321       if (pend != pending_connections_.end()) {
00322         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00323                                                   pend->second.end(),
00324                                                   tmp.at(i));
00325         if (tmp_iter != pend->second.end()) {
00326           TransportClient* pend_client = tmp.at(i).first;
00327           RepoId remote_repo = tmp.at(i).second;
00328           guard.release();
00329           pend_client->use_datalink(remote_repo, link);
00330           guard.acquire();
00331         }
00332       }
00333     }
00334   }
00335 }

void OpenDDS::DCPS::MulticastTransport::release_datalink ( DataLink link  )  [protected, virtual]

The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 406 of file MulticastTransport.cpp.

00407 {
00408   // No-op for multicast: keep both the client_link_ and server_link_ around
00409   // until the transport is shut down.
00410 }

void OpenDDS::DCPS::MulticastTransport::shutdown_i (  )  [protected, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 374 of file MulticastTransport.cpp.

References client_links_, config_i_, and server_links_.

00375 {
00376   GuardThreadType guard_links(this->links_lock_);
00377   Links::iterator link;
00378 
00379   for (link = this->client_links_.begin();
00380        link != this->client_links_.end();
00381        ++link) {
00382     if (link->second.in()) {
00383       link->second->transport_shutdown();
00384     }
00385   }
00386 
00387   for (link = this->server_links_.begin();
00388        link != this->server_links_.end();
00389        ++link) {
00390     if (link->second.in()) {
00391       link->second->transport_shutdown();
00392     }
00393   }
00394 
00395   this->config_i_ = 0;
00396 }

MulticastSession * OpenDDS::DCPS::MulticastTransport::start_session ( const MulticastDataLink_rch link,
MulticastPeer  remote_peer,
bool  active 
) [private]

Definition at line 105 of file MulticastTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

Referenced by accept_datalink(), and connect_datalink().

00107 {
00108   if (link.is_nil()) {
00109     ACE_ERROR_RETURN((LM_ERROR,
00110                       ACE_TEXT("(%P|%t) ERROR: ")
00111                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00112                       ACE_TEXT("link is nil\n"),
00113                       this->config_i_->name().c_str()),
00114                      0);
00115   }
00116 
00117   MulticastSession_rch session = link->find_or_create_session(remote_peer);
00118 
00119   if (session.is_nil()) {
00120     ACE_ERROR_RETURN((LM_ERROR,
00121                       ACE_TEXT("(%P|%t) ERROR: ")
00122                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00123                       ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00124                       this->config_i_->name().c_str(),
00125                       (unsigned int)(remote_peer >> 32),
00126                       (unsigned int) remote_peer),
00127                      0);
00128   }
00129 
00130   const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
00131 
00132   if (!session->start(active, acked)) {
00133     ACE_ERROR_RETURN((LM_ERROR,
00134                       ACE_TEXT("(%P|%t) ERROR: ")
00135                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00136                       ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
00137                       this->config_i_->name().c_str(),
00138                       (unsigned int)(remote_peer >> 32),
00139                       (unsigned int) remote_peer),
00140                      0);
00141   }
00142 
00143   return session._retn();
00144 }

void OpenDDS::DCPS::MulticastTransport::stop_accepting_or_connecting ( TransportClient client,
const RepoId remote_id 
) [protected, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 264 of file MulticastTransport.cpp.

References pending_connections_, and VDBG.

00266 {
00267   VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
00268 
00269   GuardThreadType guard(this->connections_lock_);
00270 
00271   for (PendConnMap::iterator it = this->pending_connections_.begin();
00272        it != this->pending_connections_.end(); ++it) {
00273     bool erased_from_it = false;
00274     for (size_t i = 0; i < it->second.size(); ++i) {
00275       if (it->second[i].first == client && it->second[i].second == remote_id) {
00276         erased_from_it = true;
00277         it->second.erase(it->second.begin() + i);
00278         break;
00279       }
00280     }
00281 
00282     if (erased_from_it && it->second.empty()) {
00283       this->pending_connections_.erase(it);
00284       return;
00285     }
00286   }
00287 }

virtual std::string OpenDDS::DCPS::MulticastTransport::transport_type (  )  const [inline, protected, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 52 of file MulticastTransport.h.

00052 { return "multicast"; }


Member Data Documentation

Links OpenDDS::DCPS::MulticastTransport::client_links_ [private]

Definition at line 74 of file MulticastTransport.h.

Referenced by connect_datalink(), and shutdown_i().

RcHandle<MulticastInst> OpenDDS::DCPS::MulticastTransport::config_i_ [private]

Definition at line 69 of file MulticastTransport.h.

Referenced by configure_i(), connection_info_i(), make_datalink(), and shutdown_i().

ThreadLockType OpenDDS::DCPS::MulticastTransport::connections_lock_ [private]

Definition at line 84 of file MulticastTransport.h.

ThreadLockType OpenDDS::DCPS::MulticastTransport::links_lock_ [private]

Definition at line 71 of file MulticastTransport.h.

PendConnMap OpenDDS::DCPS::MulticastTransport::pending_connections_ [private]

Definition at line 88 of file MulticastTransport.h.

Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().

Links OpenDDS::DCPS::MulticastTransport::server_links_ [private]

link for subs.

Definition at line 76 of file MulticastTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:35 2016 for OpenDDS by  doxygen 1.4.7