#include <MulticastTransport.h>
Definition at line 29 of file MulticastTransport.h.
typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::MulticastTransport::Callbacks [private] |
Definition at line 91 of file MulticastTransport.h.
typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::MulticastTransport::GuardThreadType [private] |
Definition at line 66 of file MulticastTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::MulticastTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 63 of file MulticastTransport.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::MulticastTransport::LockType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 62 of file MulticastTransport.h.
typedef std::pair<MulticastPeer, MulticastPeer> OpenDDS::DCPS::MulticastTransport::Peers [private] |
Definition at line 92 of file MulticastTransport.h.
typedef ACE_Thread_Mutex OpenDDS::DCPS::MulticastTransport::ThreadLockType [private] |
Definition at line 65 of file MulticastTransport.h.
OpenDDS::DCPS::MulticastTransport::MulticastTransport | ( | MulticastInst & | inst | ) | [explicit] |
Definition at line 30 of file MulticastTransport.cpp.
References configure_i(), and OpenDDS::DCPS::TransportImpl::open().
00031 : TransportImpl(inst) 00032 { 00033 if (! (configure_i(inst) && open())) { 00034 throw Transport::UnableToCreate(); 00035 } 00036 }
OpenDDS::DCPS::MulticastTransport::~MulticastTransport | ( | ) |
Definition at line 38 of file MulticastTransport.cpp.
TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
active
active
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 194 of file MulticastTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, config(), connections_lock_, OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), pending_connections_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::push_back(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::RcHandle< T >::reset(), server_links_, start_session(), and VDBG.
00197 { 00198 // Check that the remote reliability matches. 00199 if (get_remote_reliability(remote) != this->config().is_reliable()) { 00200 return AcceptConnectResult(); 00201 } 00202 00203 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32 00204 | RepoIdConverter(attribs.local_id_).participantId(); 00205 00206 GuardThreadType guard_links(this->links_lock_); 00207 00208 Links::const_iterator link_iter = this->server_links_.find(local_peer); 00209 MulticastDataLink_rch link; 00210 00211 if (link_iter == this->server_links_.end()) { 00212 00213 link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/); 00214 this->server_links_[local_peer] = link; 00215 } else { 00216 link = link_iter->second; 00217 } 00218 00219 guard_links.release(); 00220 00221 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32 00222 | RepoIdConverter(remote.repo_id_).participantId(); 00223 GuardThreadType guard(this->connections_lock_); 00224 00225 if (connections_.count(std::make_pair(remote_peer, local_peer))) { 00226 //can't call start session with connections_lock_ due to reactor 00227 //call in session->start which could deadlock with passive_connection 00228 guard.release(); 00229 00230 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n")); 00231 MulticastSession_rch session( 00232 this->start_session(link, remote_peer, false /*!active*/)); 00233 00234 if (session.is_nil()) { 00235 link.reset(); 00236 } 00237 return AcceptConnectResult(link); 00238 00239 } else { 00240 00241 this->pending_connections_[std::make_pair(remote_peer, local_peer)]. 00242 push_back(std::make_pair(client, remote.repo_id_)); 00243 //can't call start session with connections_lock_ due to reactor 00244 //call in session->start which could deadlock with passive_connection 00245 guard.release(); 00246 MulticastSession_rch session( 00247 this->start_session(link, remote_peer, false /*!active*/)); 00248 00249 return AcceptConnectResult( 00250 session ? AcceptConnectResult::ACR_SUCCESS : AcceptConnectResult::ACR_FAILED 00251 ); 00252 00253 } 00254 }
MulticastInst & OpenDDS::DCPS::MulticastTransport::config | ( | ) | const |
Expose the configuration information so others can see what we can do.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 44 of file MulticastTransport.cpp.
Referenced by accept_datalink(), OpenDDS::DCPS::MulticastDataLink::config(), configure_i(), connect_datalink(), connection_info_i(), make_datalink(), passive_connection(), and start_session().
00045 { 00046 return static_cast<MulticastInst&>(TransportImpl::config()); 00047 }
bool OpenDDS::DCPS::MulticastTransport::configure_i | ( | MulticastInst & | config | ) | [protected] |
Definition at line 333 of file MulticastTransport.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastInst::async_send_, config(), OpenDDS::DCPS::TransportImpl::create_reactor_task(), OpenDDS::DCPS::MulticastInst::group_address_, ACE_INET_Addr::is_multicast(), LM_ERROR, OpenDDS::DCPS::MulticastInst::local_address_, and TheServiceParticipant.
Referenced by MulticastTransport().
00334 { 00335 // Override with DCPSDefaultAddress. 00336 if (config.local_address_.empty () && 00337 !TheServiceParticipant->default_address ().empty ()) { 00338 config.local_address_ = TheServiceParticipant->default_address ().c_str (); 00339 } 00340 00341 if (!config.group_address_.is_multicast()) { 00342 ACE_ERROR_RETURN((LM_ERROR, 00343 ACE_TEXT("(%P|%t) ERROR: ") 00344 ACE_TEXT("MulticastTransport[%@]::configure_i: ") 00345 ACE_TEXT("invalid configuration: address %C is not ") 00346 ACE_TEXT("multicast.\n"), 00347 this, this->config().group_address_.get_host_addr()), 00348 false); 00349 } 00350 00351 this->create_reactor_task(config.async_send_); 00352 00353 return true; 00354 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::MulticastTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 155 of file MulticastTransport.cpp.
References client_links_, config(), OpenDDS::DCPS::get_remote_reliability(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, and start_session().
00158 { 00159 // Check that the remote reliability matches. 00160 if (get_remote_reliability(remote) != this->config().is_reliable()) { 00161 return AcceptConnectResult(); 00162 } 00163 00164 GuardThreadType guard_links(this->links_lock_); 00165 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32 00166 | RepoIdConverter(attribs.local_id_).participantId(); 00167 Links::const_iterator link_iter = this->client_links_.find(local_peer); 00168 MulticastDataLink_rch link; 00169 00170 if (link_iter == this->client_links_.end()) { 00171 link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/); 00172 this->client_links_[local_peer] = link; 00173 } else { 00174 link = link_iter->second; 00175 } 00176 00177 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32 00178 | RepoIdConverter(remote.repo_id_).participantId(); 00179 00180 MulticastSession_rch session( 00181 this->start_session(link, remote_peer, true /*active*/)); 00182 00183 if (session.is_nil()) { 00184 Links::iterator to_remove = this->client_links_.find(local_peer); 00185 if (to_remove != this->client_links_.end()) { 00186 this->client_links_.erase(to_remove); 00187 } 00188 return AcceptConnectResult(); 00189 } 00190 return AcceptConnectResult(link); 00191 }
bool OpenDDS::DCPS::MulticastTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 382 of file MulticastTransport.cpp.
References config(), and OpenDDS::DCPS::MulticastInst::populate_locator().
00383 { 00384 this->config().populate_locator(info); 00385 return true; 00386 }
MulticastDataLink_rch OpenDDS::DCPS::MulticastTransport::make_datalink | ( | const RepoId & | local_id, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 50 of file MulticastTransport.cpp.
References ACE_TEXT(), ACE_INET_Addr::addr_to_string(), config(), OpenDDS::DCPS::MulticastInst::group_address_, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), str, and VDBG_LVL.
Referenced by accept_datalink(), and connect_datalink().
00053 { 00054 00055 RcHandle<MulticastSessionFactory> session_factory; 00056 00057 if (this->config().is_reliable()) { 00058 session_factory = make_rch<ReliableSessionFactory>(); 00059 00060 } else { 00061 session_factory = make_rch<BestEffortSessionFactory>(); 00062 } 00063 00064 MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32 00065 | RepoIdConverter(local_id).participantId(); 00066 00067 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink " 00068 "peers: local %#08x%08x priority %d active %d\n", 00069 this->config().name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer, 00070 priority, active), 2); 00071 00072 TransportReactorTask_rch rtask(reactor_task()); 00073 MulticastDataLink_rch link(make_rch<MulticastDataLink>(ref(*this), 00074 session_factory, 00075 local_peer, 00076 ref(config()), 00077 rtask.in(), 00078 active)); 00079 00080 // Join multicast group: 00081 if (!link->join(this->config().group_address_)) { 00082 ACE_TCHAR str[64]; 00083 this->config().group_address_.addr_to_string(str, 00084 sizeof(str)/sizeof(str[0])); 00085 ACE_ERROR((LM_ERROR, 00086 ACE_TEXT("(%P|%t) ERROR: ") 00087 ACE_TEXT("MulticastTransport::make_datalink: ") 00088 ACE_TEXT("failed to join multicast group: %s!\n"), 00089 str)); 00090 return MulticastDataLink_rch(); 00091 } 00092 00093 return link; 00094 }
typedef OpenDDS::DCPS::MulticastTransport::OPENDDS_MAP | ( | MulticastPeer | , | |
MulticastDataLink_rch | ||||
) | [private] |
link for pubs.
OpenDDS::DCPS::MulticastTransport::OPENDDS_SET | ( | Peers | ) | [private] |
void OpenDDS::DCPS::MulticastTransport::passive_connection | ( | MulticastPeer | local_peer, | |
MulticastPeer | remote_peer | |||
) |
Definition at line 283 of file MulticastTransport.cpp.
References ACE_Guard< ACE_LOCK >::acquire(), config(), connections_lock_, OpenDDS::DCPS::find(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), pending_connections_, ACE_Guard< ACE_LOCK >::release(), server_links_, OpenDDS::DCPS::static_rchandle_cast(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00284 { 00285 GuardThreadType guard(this->connections_lock_); 00286 00287 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection " 00288 "from remote peer %#08x%08x to local peer %#08x%08x\n", 00289 this->config().name().c_str(), 00290 (unsigned int) (remote_peer >> 32), 00291 (unsigned int) remote_peer, 00292 (unsigned int) (local_peer >> 32), 00293 (unsigned int) local_peer), 2); 00294 00295 const Peers peers(remote_peer, local_peer); 00296 const PendConnMap::iterator pend = this->pending_connections_.find(peers); 00297 //if connection was pending, calls to use_datalink finalized the connection 00298 //if it was not previously pending, accept_datalink() will finalize connection 00299 this->connections_.insert(peers); 00300 00301 Links::const_iterator server_link = this->server_links_.find(local_peer); 00302 DataLink_rch link; 00303 00304 if (server_link != this->server_links_.end()) { 00305 link = static_rchandle_cast<DataLink>(server_link->second); 00306 MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer)); 00307 session->set_acked(); 00308 } 00309 00310 if (pend != pending_connections_.end()) { 00311 Callbacks tmp(pend->second); 00312 for (size_t i = 0; i < tmp.size(); ++i) { 00313 const PendConnMap::iterator pend = pending_connections_.find(peers); 00314 if (pend != pending_connections_.end()) { 00315 const Callbacks::iterator tmp_iter = find(pend->second.begin(), 00316 pend->second.end(), 00317 tmp.at(i)); 00318 if (tmp_iter != pend->second.end()) { 00319 TransportClient_wrch pend_client = tmp.at(i).first; 00320 RepoId remote_repo = tmp.at(i).second; 00321 guard.release(); 00322 TransportClient_rch client = pend_client.lock(); 00323 if (client) 00324 client->use_datalink(remote_repo, link); 00325 guard.acquire(); 00326 } 00327 } 00328 } 00329 } 00330 }
void OpenDDS::DCPS::MulticastTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 389 of file MulticastTransport.cpp.
void OpenDDS::DCPS::MulticastTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 357 of file MulticastTransport.cpp.
References client_links_, links_lock_, and server_links_.
00358 { 00359 GuardThreadType guard_links(this->links_lock_); 00360 Links::iterator link; 00361 00362 for (link = this->client_links_.begin(); 00363 link != this->client_links_.end(); 00364 ++link) { 00365 if (link->second.in()) { 00366 link->second->transport_shutdown(); 00367 } 00368 } 00369 client_links_.clear(); 00370 00371 for (link = this->server_links_.begin(); 00372 link != this->server_links_.end(); 00373 ++link) { 00374 if (link->second.in()) { 00375 link->second->transport_shutdown(); 00376 } 00377 } 00378 server_links_.clear(); 00379 }
MulticastSession_rch OpenDDS::DCPS::MulticastTransport::start_session | ( | const MulticastDataLink_rch & | link, | |
MulticastPeer | remote_peer, | |||
bool | active | |||
) | [private] |
Definition at line 97 of file MulticastTransport.cpp.
References ACE_TEXT(), config(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and LM_ERROR.
Referenced by accept_datalink(), and connect_datalink().
00099 { 00100 if (link.is_nil()) { 00101 ACE_ERROR_RETURN((LM_ERROR, 00102 ACE_TEXT("(%P|%t) ERROR: ") 00103 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00104 ACE_TEXT("link is nil\n"), 00105 this->config().name().c_str()), 00106 MulticastSession_rch()); 00107 } 00108 00109 MulticastSession_rch session(link->find_or_create_session(remote_peer)); 00110 00111 if (session.is_nil()) { 00112 ACE_ERROR_RETURN((LM_ERROR, 00113 ACE_TEXT("(%P|%t) ERROR: ") 00114 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00115 ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"), 00116 this->config().name().c_str(), 00117 (unsigned int)(remote_peer >> 32), 00118 (unsigned int) remote_peer), 00119 MulticastSession_rch()); 00120 } 00121 00122 const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer())); 00123 00124 if (!session->start(active, acked)) { 00125 ACE_ERROR_RETURN((LM_ERROR, 00126 ACE_TEXT("(%P|%t) ERROR: ") 00127 ACE_TEXT("MulticastTransport[%C]::start_session: ") 00128 ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"), 00129 this->config().name().c_str(), 00130 (unsigned int)(remote_peer >> 32), 00131 (unsigned int) remote_peer), 00132 MulticastSession_rch()); 00133 } 00134 00135 return session; 00136 }
void OpenDDS::DCPS::MulticastTransport::stop_accepting_or_connecting | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 257 of file MulticastTransport.cpp.
References connections_lock_, LM_DEBUG, pending_connections_, and VDBG.
00259 { 00260 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n")); 00261 00262 GuardThreadType guard(this->connections_lock_); 00263 00264 for (PendConnMap::iterator it = this->pending_connections_.begin(); 00265 it != this->pending_connections_.end(); ++it) { 00266 bool erased_from_it = false; 00267 for (size_t i = 0; i < it->second.size(); ++i) { 00268 if (it->second[i].first == client && it->second[i].second == remote_id) { 00269 erased_from_it = true; 00270 it->second.erase(it->second.begin() + i); 00271 break; 00272 } 00273 } 00274 00275 if (erased_from_it && it->second.empty()) { 00276 this->pending_connections_.erase(it); 00277 return; 00278 } 00279 } 00280 }
virtual std::string OpenDDS::DCPS::MulticastTransport::transport_type | ( | ) | const [inline, protected, virtual] |
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 58 of file MulticastTransport.h.
Links OpenDDS::DCPS::MulticastTransport::client_links_ [private] |
Definition at line 80 of file MulticastTransport.h.
Referenced by connect_datalink(), and shutdown_i().
Definition at line 90 of file MulticastTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
Definition at line 77 of file MulticastTransport.h.
Referenced by accept_datalink(), connect_datalink(), and shutdown_i().
PendConnMap OpenDDS::DCPS::MulticastTransport::pending_connections_ [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 94 of file MulticastTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
Links OpenDDS::DCPS::MulticastTransport::server_links_ [private] |
link for subs.
Definition at line 82 of file MulticastTransport.h.
Referenced by accept_datalink(), passive_connection(), and shutdown_i().