#include <MulticastTransport.h>
Inheritance diagram for OpenDDS::DCPS::MulticastTransport:
Public Member Functions | |
MulticastTransport (const TransportInst_rch &inst) | |
~MulticastTransport () | |
void | passive_connection (MulticastPeer local_peer, MulticastPeer remote_peer) |
Protected Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual void | stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id) |
virtual bool | configure_i (TransportInst *config) |
virtual void | shutdown_i () |
virtual bool | connection_info_i (TransportLocator &info) const |
virtual void | release_datalink (DataLink *link) |
virtual std::string | transport_type () const |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Thread_Mutex | ThreadLockType |
typedef ACE_Guard< ThreadLockType > | GuardThreadType |
typedef std::vector< DataLink::OnStartCallback > | Callbacks |
typedef std::pair< MulticastPeer, MulticastPeer > | Peers |
Private Member Functions | |
MulticastDataLink * | make_datalink (const RepoId &local_id, Priority priority, bool active) |
MulticastSession * | start_session (const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active) |
typedef | OPENDDS_MAP (MulticastPeer, MulticastDataLink_rch) Links |
link for pubs. | |
typedef | OPENDDS_MAP (Peers, Callbacks) PendConnMap |
OPENDDS_SET (Peers) connections_ | |
Private Attributes | |
RcHandle< MulticastInst > | config_i_ |
ThreadLockType | links_lock_ |
Links | client_links_ |
Links | server_links_ |
link for subs. | |
ThreadLockType | connections_lock_ |
PendConnMap | pending_connections_ |
Definition at line 25 of file MulticastTransport.h.
typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::MulticastTransport::Callbacks [private] |
Definition at line 85 of file MulticastTransport.h.
typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::MulticastTransport::GuardThreadType [private] |
Definition at line 60 of file MulticastTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::MulticastTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 57 of file MulticastTransport.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::MulticastTransport::LockType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 56 of file MulticastTransport.h.
typedef std::pair<MulticastPeer, MulticastPeer> OpenDDS::DCPS::MulticastTransport::Peers [private] |
Definition at line 86 of file MulticastTransport.h.
typedef ACE_Thread_Mutex OpenDDS::DCPS::MulticastTransport::ThreadLockType [private] |
Definition at line 59 of file MulticastTransport.h.
OpenDDS::DCPS::MulticastTransport::MulticastTransport | ( | const TransportInst_rch & | inst | ) | [explicit] |
Definition at line 28 of file MulticastTransport.cpp.
References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
00029 : config_i_(0) 00030 { 00031 if (!inst.is_nil()) { 00032 if (!configure(inst.in())) { 00033 throw Transport::UnableToCreate(); 00034 } 00035 } 00036 00037 }
OpenDDS::DCPS::MulticastTransport::~MulticastTransport | ( | ) |
TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 203 of file MulticastTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), pending_connections_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::TransportClient::repo_id_, server_links_, start_session(), and VDBG.
00206 { 00207 // Check that the remote reliability matches. 00208 if (get_remote_reliability(remote) != this->config_i_->is_reliable()) { 00209 return AcceptConnectResult(); 00210 } 00211 00212 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32 00213 | RepoIdConverter(attribs.local_id_).participantId(); 00214 00215 GuardThreadType guard_links(this->links_lock_); 00216 00217 Links::const_iterator link_iter = this->server_links_.find(local_peer); 00218 MulticastDataLink_rch link; 00219 00220 if (link_iter == this->server_links_.end()) { 00221 00222 link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/); 00223 this->server_links_[local_peer] = link; 00224 } else { 00225 link = link_iter->second; 00226 } 00227 00228 guard_links.release(); 00229 00230 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32 00231 | RepoIdConverter(remote.repo_id_).participantId(); 00232 GuardThreadType guard(this->connections_lock_); 00233 00234 if (connections_.count(std::make_pair(remote_peer, local_peer))) { 00235 //can't call start session with connections_lock_ due to reactor 00236 //call in session->start which could deadlock with passive_connection 00237 guard.release(); 00238 00239 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n")); 00240 MulticastSession_rch session = 00241 this->start_session(link, remote_peer, false /*!active*/); 00242 00243 if (session.is_nil()) { 00244 link = 0; 00245 } 00246 return AcceptConnectResult(link._retn()); 00247 00248 } else { 00249 00250 this->pending_connections_[std::make_pair(remote_peer, local_peer)]. 00251 push_back(std::pair<TransportClient*, RepoId>(client, remote.repo_id_)); 00252 //can't call start session with connections_lock_ due to reactor 00253 //call in session->start which could deadlock with passive_connection 00254 guard.release(); 00255 MulticastSession_rch session = 00256 this->start_session(link, remote_peer, false /*!active*/); 00257 00258 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00259 00260 } 00261 }
bool OpenDDS::DCPS::MulticastTransport::configure_i | ( | TransportInst * | config | ) | [protected, virtual] |
Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 338 of file MulticastTransport.cpp.
References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), and TheServiceParticipant.
00339 { 00340 this->config_i_ = dynamic_cast<MulticastInst*>(config); 00341 00342 if (this->config_i_ == 0) { 00343 ACE_ERROR_RETURN((LM_ERROR, 00344 ACE_TEXT("(%P|%t) ERROR: ") 00345 ACE_TEXT("MulticastTransport[%@]::configure_i: ") 00346 ACE_TEXT("invalid configuration!\n"), this), 00347 false); 00348 } 00349 00350 this->config_i_->_add_ref(); 00351 00352 // Override with DCPSDefaultAddress. 00353 if (this->config_i_->local_address_.empty () && 00354 !TheServiceParticipant->default_address ().empty ()) { 00355 this->config_i_->local_address_ = TheServiceParticipant->default_address ().c_str (); 00356 } 00357 00358 if (!this->config_i_->group_address_.is_multicast()) { 00359 ACE_ERROR_RETURN((LM_ERROR, 00360 ACE_TEXT("(%P|%t) ERROR: ") 00361 ACE_TEXT("MulticastTransport[%@]::configure_i: ") 00362 ACE_TEXT("invalid configuration: address %C is not ") 00363 ACE_TEXT("multicast.\n"), 00364 this, this->config_i_->group_address_.get_host_addr()), 00365 false); 00366 } 00367 00368 this->create_reactor_task(this->config_i_->async_send_); 00369 00370 return true; 00371 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 163 of file MulticastTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), client_links_, OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and start_session().
00166 { 00167 // Check that the remote reliability matches. 00168 if (get_remote_reliability(remote) != this->config_i_->is_reliable()) { 00169 return AcceptConnectResult(); 00170 } 00171 00172 GuardThreadType guard_links(this->links_lock_); 00173 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32 00174 | RepoIdConverter(attribs.local_id_).participantId(); 00175 Links::const_iterator link_iter = this->client_links_.find(local_peer); 00176 MulticastDataLink_rch link; 00177 00178 if (link_iter == this->client_links_.end()) { 00179 00180 link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/); 00181 this->client_links_[local_peer] = link; 00182 } else { 00183 link = link_iter->second; 00184 } 00185 00186 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32 00187 | RepoIdConverter(remote.repo_id_).participantId(); 00188 00189 MulticastSession_rch session = 00190 this->start_session(link, remote_peer, true /*active*/); 00191 00192 if (session.is_nil()) { 00193 Links::iterator to_remove = this->client_links_.find(local_peer); 00194 if (to_remove != this->client_links_.end()) { 00195 this->client_links_.erase(to_remove); 00196 } 00197 return AcceptConnectResult(); 00198 } 00199 return AcceptConnectResult(link._retn()); 00200 }
bool OpenDDS::DCPS::MulticastTransport::connection_info_i | ( | TransportLocator & | info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 399 of file MulticastTransport.cpp.
References config_i_.
00400 { 00401 this->config_i_->populate_locator(info); 00402 return true; 00403 }
MulticastDataLink * OpenDDS::DCPS::MulticastTransport::make_datalink | ( | const RepoId & | local_id, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 44 of file MulticastTransport.cpp.
References config_i_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportImpl::reactor_task(), and VDBG_LVL.
00047 { 00048 RcHandle<MulticastSessionFactory> session_factory; 00049 00050 if (this->config_i_->reliable_) { 00051 ACE_NEW_RETURN(session_factory, ReliableSessionFactory, 0); 00052 00053 } else { 00054 ACE_NEW_RETURN(session_factory, BestEffortSessionFactory, 0); 00055 } 00056 00057 MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32 00058 | RepoIdConverter(local_id).participantId(); 00059 00060 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink " 00061 "peers: local %#08x%08x priority %d active %d\n", 00062 this->config_i_->name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer, 00063 priority, active), 2); 00064 00065 MulticastDataLink_rch link; 00066 ACE_NEW_RETURN(link, 00067 MulticastDataLink(this, 00068 session_factory.in(), 00069 local_peer, 00070 active), 00071 0); 00072 00073 // Configure link with transport configuration and reactor task: 00074 TransportReactorTask_rch rtask = reactor_task(); 00075 //link->configure(this->config_i_.in(), rtask.in()); 00076 link->configure(config_i_.in(), rtask.in()); 00077 00078 // Assign send strategy: 00079 MulticastSendStrategy* send_strategy; 00080 ACE_NEW_RETURN(send_strategy, MulticastSendStrategy(link.in()), 0); 00081 link->send_strategy(send_strategy); 00082 00083 // Assign receive strategy: 00084 MulticastReceiveStrategy* recv_strategy; 00085 ACE_NEW_RETURN(recv_strategy, MulticastReceiveStrategy(link.in()), 0); 00086 link->receive_strategy(recv_strategy); 00087 00088 // Join multicast group: 00089 if (!link->join(this->config_i_->group_address_)) { 00090 ACE_TCHAR str[64]; 00091 this->config_i_->group_address_.addr_to_string(str, 00092 sizeof(str)/sizeof(str[0])); 00093 ACE_ERROR_RETURN((LM_ERROR, 00094 ACE_TEXT("(%P|%t) ERROR: ") 00095 ACE_TEXT("MulticastTransport::make_datalink: ") 00096 ACE_TEXT("failed to join multicast group: %s!\n"), 00097 str), 00098 0); 00099 } 00100 00101 return link._retn(); 00102 }
typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP | ( | MulticastPeer | , | |
MulticastDataLink_rch | ||||
) | [private] |
link for pubs.
OpenDDS::DCPS::MulticastTransport::OPENDDS_SET | ( | Peers | ) | [private] |
void OpenDDS::DCPS::MulticastTransport::passive_connection | ( | MulticastPeer | local_peer, | |
MulticastPeer | remote_peer | |||
) |
Definition at line 290 of file MulticastTransport.cpp.
References OpenDDS::DCPS::find(), pending_connections_, server_links_, and VDBG_LVL.
Referenced by OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00291 { 00292 GuardThreadType guard(this->connections_lock_); 00293 00294 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection " 00295 "from remote peer %#08x%08x to local peer %#08x%08x\n", 00296 this->config_i_->name().c_str(), 00297 (unsigned int) (remote_peer >> 32), 00298 (unsigned int) remote_peer, 00299 (unsigned int) (local_peer >> 32), 00300 (unsigned int) local_peer), 2); 00301 00302 const Peers peers(remote_peer, local_peer); 00303 const PendConnMap::iterator pend = this->pending_connections_.find(peers); 00304 //if connection was pending, calls to use_datalink finalized the connection 00305 //if it was not previously pending, accept_datalink() will finalize connection 00306 this->connections_.insert(peers); 00307 00308 Links::const_iterator server_link = this->server_links_.find(local_peer); 00309 DataLink_rch link; 00310 00311 if (server_link != this->server_links_.end()) { 00312 link = static_rchandle_cast<DataLink>(server_link->second); 00313 MulticastSession_rch session = server_link->second->find_or_create_session(remote_peer); 00314 session->set_acked(); 00315 } 00316 00317 if (pend != pending_connections_.end()) { 00318 Callbacks tmp(pend->second); 00319 for (size_t i = 0; i < tmp.size(); ++i) { 00320 const PendConnMap::iterator pend = pending_connections_.find(peers); 00321 if (pend != pending_connections_.end()) { 00322 const Callbacks::iterator tmp_iter = find(pend->second.begin(), 00323 pend->second.end(), 00324 tmp.at(i)); 00325 if (tmp_iter != pend->second.end()) { 00326 TransportClient* pend_client = tmp.at(i).first; 00327 RepoId remote_repo = tmp.at(i).second; 00328 guard.release(); 00329 pend_client->use_datalink(remote_repo, link); 00330 guard.acquire(); 00331 } 00332 } 00333 } 00334 } 00335 }
void OpenDDS::DCPS::MulticastTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 406 of file MulticastTransport.cpp.
00407 { 00408 // No-op for multicast: keep both the client_link_ and server_link_ around 00409 // until the transport is shut down. 00410 }
void OpenDDS::DCPS::MulticastTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 374 of file MulticastTransport.cpp.
References client_links_, config_i_, and server_links_.
00375 { 00376 GuardThreadType guard_links(this->links_lock_); 00377 Links::iterator link; 00378 00379 for (link = this->client_links_.begin(); 00380 link != this->client_links_.end(); 00381 ++link) { 00382 if (link->second.in()) { 00383 link->second->transport_shutdown(); 00384 } 00385 } 00386 00387 for (link = this->server_links_.begin(); 00388 link != this->server_links_.end(); 00389 ++link) { 00390 if (link->second.in()) { 00391 link->second->transport_shutdown(); 00392 } 00393 } 00394 00395 this->config_i_ = 0; 00396 }
MulticastSession * OpenDDS::DCPS::MulticastTransport::start_session | ( | const MulticastDataLink_rch & | link, | |
MulticastPeer | remote_peer, | |||
bool | active | |||
) | [private] |
Definition at line 105 of file MulticastTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
Referenced by accept_datalink(), and connect_datalink().
00107 { 00108 if (link.is_nil()) { 00109 ACE_ERROR_RETURN((LM_ERROR, 00110 ACE_TEXT("(%P|%t) ERROR: ") 00111 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00112 ACE_TEXT("link is nil\n"), 00113 this->config_i_->name().c_str()), 00114 0); 00115 } 00116 00117 MulticastSession_rch session = link->find_or_create_session(remote_peer); 00118 00119 if (session.is_nil()) { 00120 ACE_ERROR_RETURN((LM_ERROR, 00121 ACE_TEXT("(%P|%t) ERROR: ") 00122 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00123 ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"), 00124 this->config_i_->name().c_str(), 00125 (unsigned int)(remote_peer >> 32), 00126 (unsigned int) remote_peer), 00127 0); 00128 } 00129 00130 const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer())); 00131 00132 if (!session->start(active, acked)) { 00133 ACE_ERROR_RETURN((LM_ERROR, 00134 ACE_TEXT("(%P|%t) ERROR: ") 00135 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00136 ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"), 00137 this->config_i_->name().c_str(), 00138 (unsigned int)(remote_peer >> 32), 00139 (unsigned int) remote_peer), 00140 0); 00141 } 00142 00143 return session._retn(); 00144 }
void OpenDDS::DCPS::MulticastTransport::stop_accepting_or_connecting | ( | TransportClient * | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 264 of file MulticastTransport.cpp.
References pending_connections_, and VDBG.
00266 { 00267 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n")); 00268 00269 GuardThreadType guard(this->connections_lock_); 00270 00271 for (PendConnMap::iterator it = this->pending_connections_.begin(); 00272 it != this->pending_connections_.end(); ++it) { 00273 bool erased_from_it = false; 00274 for (size_t i = 0; i < it->second.size(); ++i) { 00275 if (it->second[i].first == client && it->second[i].second == remote_id) { 00276 erased_from_it = true; 00277 it->second.erase(it->second.begin() + i); 00278 break; 00279 } 00280 } 00281 00282 if (erased_from_it && it->second.empty()) { 00283 this->pending_connections_.erase(it); 00284 return; 00285 } 00286 } 00287 }
virtual std::string OpenDDS::DCPS::MulticastTransport::transport_type | ( | ) | const [inline, protected, virtual] |
Links OpenDDS::DCPS::MulticastTransport::client_links_ [private] |
Definition at line 74 of file MulticastTransport.h.
Referenced by connect_datalink(), and shutdown_i().
Definition at line 69 of file MulticastTransport.h.
Referenced by configure_i(), connection_info_i(), make_datalink(), and shutdown_i().
Definition at line 84 of file MulticastTransport.h.
Definition at line 71 of file MulticastTransport.h.
PendConnMap OpenDDS::DCPS::MulticastTransport::pending_connections_ [private] |
Definition at line 88 of file MulticastTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
Links OpenDDS::DCPS::MulticastTransport::server_links_ [private] |
link for subs.
Definition at line 76 of file MulticastTransport.h.
Referenced by accept_datalink(), passive_connection(), and shutdown_i().