MulticastTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastTransport.h"
00009 #include "MulticastDataLink.h"
00010 #include "MulticastReceiveStrategy.h"
00011 #include "MulticastSendStrategy.h"
00012 #include "MulticastSession.h"
00013 #include "BestEffortSessionFactory.h"
00014 #include "ReliableSessionFactory.h"
00015 
00016 #include "ace/Log_Msg.h"
00017 #include "ace/Truncate.h"
00018 
00019 #include "dds/DCPS/RepoIdConverter.h"
00020 #include "dds/DCPS/AssociationData.h"
00021 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00022 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00023 #include "dds/DCPS/transport/framework/TransportClient.h"
00024 
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 namespace OpenDDS {
00028 namespace DCPS {
00029 
00030 MulticastTransport::MulticastTransport(MulticastInst& inst)
00031   : TransportImpl(inst)
00032 {
00033   if (! (configure_i(inst) && open())) {
00034     throw Transport::UnableToCreate();
00035   }
00036 }
00037 
00038 MulticastTransport::~MulticastTransport()
00039 {
00040 }
00041 
00042 
00043 MulticastInst&
00044 MulticastTransport::config() const
00045 {
00046   return static_cast<MulticastInst&>(TransportImpl::config());
00047 }
00048 
00049 MulticastDataLink_rch
00050 MulticastTransport::make_datalink(const RepoId& local_id,
00051                                   Priority priority,
00052                                   bool active)
00053 {
00054 
00055   RcHandle<MulticastSessionFactory> session_factory;
00056 
00057   if (this->config().is_reliable()) {
00058     session_factory = make_rch<ReliableSessionFactory>();
00059 
00060   } else {
00061     session_factory = make_rch<BestEffortSessionFactory>();
00062   }
00063 
00064   MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
00065                            | RepoIdConverter(local_id).participantId();
00066 
00067   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
00068             "peers: local %#08x%08x priority %d active %d\n",
00069             this->config().name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
00070             priority, active), 2);
00071 
00072   TransportReactorTask_rch rtask(reactor_task());
00073   MulticastDataLink_rch link(make_rch<MulticastDataLink>(ref(*this),
00074                                    session_factory,
00075                                    local_peer,
00076                                    ref(config()),
00077                                    rtask.in(),
00078                                    active));
00079 
00080   // Join multicast group:
00081   if (!link->join(this->config().group_address_)) {
00082     ACE_TCHAR str[64];
00083     this->config().group_address_.addr_to_string(str,
00084                                         sizeof(str)/sizeof(str[0]));
00085     ACE_ERROR((LM_ERROR,
00086                     ACE_TEXT("(%P|%t) ERROR: ")
00087                     ACE_TEXT("MulticastTransport::make_datalink: ")
00088                     ACE_TEXT("failed to join multicast group: %s!\n"),
00089                     str));
00090     return MulticastDataLink_rch();
00091   }
00092 
00093   return link;
00094 }
00095 
00096 MulticastSession_rch
00097 MulticastTransport::start_session(const MulticastDataLink_rch& link,
00098                                   MulticastPeer remote_peer, bool active)
00099 {
00100   if (link.is_nil()) {
00101     ACE_ERROR_RETURN((LM_ERROR,
00102                       ACE_TEXT("(%P|%t) ERROR: ")
00103                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00104                       ACE_TEXT("link is nil\n"),
00105                       this->config().name().c_str()),
00106                      MulticastSession_rch());
00107   }
00108 
00109   MulticastSession_rch session(link->find_or_create_session(remote_peer));
00110 
00111   if (session.is_nil()) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       ACE_TEXT("(%P|%t) ERROR: ")
00114                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00115                       ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00116                       this->config().name().c_str(),
00117                       (unsigned int)(remote_peer >> 32),
00118                       (unsigned int) remote_peer),
00119                      MulticastSession_rch());
00120   }
00121 
00122   const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
00123 
00124   if (!session->start(active, acked)) {
00125     ACE_ERROR_RETURN((LM_ERROR,
00126                       ACE_TEXT("(%P|%t) ERROR: ")
00127                       ACE_TEXT("MulticastTransport[%C]::start_session: ")
00128                       ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
00129                       this->config().name().c_str(),
00130                       (unsigned int)(remote_peer >> 32),
00131                       (unsigned int) remote_peer),
00132                      MulticastSession_rch());
00133   }
00134 
00135   return session;
00136 }
00137 
00138 static bool
00139 get_remote_reliability(const TransportImpl::RemoteTransport& remote)
00140 {
00141   NetworkAddress network_address;
00142   ACE_CDR::Boolean reliable;
00143 
00144   const size_t len = remote.blob_.length();
00145   const char* buffer = reinterpret_cast<const char*>(remote.blob_.get_buffer());
00146 
00147   ACE_InputCDR cdr(buffer, len);
00148   cdr >> network_address;
00149   cdr >> ACE_InputCDR::to_boolean(reliable);
00150 
00151   return reliable;
00152 }
00153 
00154 TransportImpl::AcceptConnectResult
00155 MulticastTransport::connect_datalink(const RemoteTransport& remote,
00156                                      const ConnectionAttribs& attribs,
00157                                      const TransportClient_rch&)
00158 {
00159   // Check that the remote reliability matches.
00160   if (get_remote_reliability(remote) != this->config().is_reliable()) {
00161     return AcceptConnectResult();
00162   }
00163 
00164   GuardThreadType guard_links(this->links_lock_);
00165   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00166                                  | RepoIdConverter(attribs.local_id_).participantId();
00167   Links::const_iterator link_iter = this->client_links_.find(local_peer);
00168   MulticastDataLink_rch link;
00169 
00170   if (link_iter == this->client_links_.end()) {
00171     link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/);
00172     this->client_links_[local_peer] = link;
00173   } else {
00174     link = link_iter->second;
00175   }
00176 
00177   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00178                             | RepoIdConverter(remote.repo_id_).participantId();
00179 
00180   MulticastSession_rch session(
00181     this->start_session(link, remote_peer, true /*active*/));
00182 
00183   if (session.is_nil()) {
00184     Links::iterator to_remove = this->client_links_.find(local_peer);
00185     if (to_remove != this->client_links_.end()) {
00186       this->client_links_.erase(to_remove);
00187     }
00188     return AcceptConnectResult();
00189   }
00190   return AcceptConnectResult(link);
00191 }
00192 
00193 TransportImpl::AcceptConnectResult
00194 MulticastTransport::accept_datalink(const RemoteTransport& remote,
00195                                     const ConnectionAttribs& attribs,
00196                                     const TransportClient_rch& client)
00197 {
00198   // Check that the remote reliability matches.
00199   if (get_remote_reliability(remote) != this->config().is_reliable()) {
00200     return AcceptConnectResult();
00201   }
00202 
00203   const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00204                                  | RepoIdConverter(attribs.local_id_).participantId();
00205 
00206   GuardThreadType guard_links(this->links_lock_);
00207 
00208   Links::const_iterator link_iter = this->server_links_.find(local_peer);
00209   MulticastDataLink_rch link;
00210 
00211   if (link_iter == this->server_links_.end()) {
00212 
00213     link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/);
00214     this->server_links_[local_peer] = link;
00215   } else {
00216     link = link_iter->second;
00217   }
00218 
00219   guard_links.release();
00220 
00221   MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00222                             | RepoIdConverter(remote.repo_id_).participantId();
00223   GuardThreadType guard(this->connections_lock_);
00224 
00225   if (connections_.count(std::make_pair(remote_peer, local_peer))) {
00226     //can't call start session with connections_lock_ due to reactor
00227     //call in session->start which could deadlock with passive_connection
00228     guard.release();
00229 
00230     VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
00231     MulticastSession_rch session(
00232       this->start_session(link, remote_peer, false /*!active*/));
00233 
00234     if (session.is_nil()) {
00235       link.reset();
00236     }
00237     return AcceptConnectResult(link);
00238 
00239   } else {
00240 
00241     this->pending_connections_[std::make_pair(remote_peer, local_peer)].
00242     push_back(std::make_pair(client, remote.repo_id_));
00243     //can't call start session with connections_lock_ due to reactor
00244     //call in session->start which could deadlock with passive_connection
00245     guard.release();
00246     MulticastSession_rch session(
00247       this->start_session(link, remote_peer, false /*!active*/));
00248 
00249     return AcceptConnectResult(
00250       session ? AcceptConnectResult::ACR_SUCCESS : AcceptConnectResult::ACR_FAILED
00251     );
00252 
00253   }
00254 }
00255 
00256 void
00257 MulticastTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00258                                                  const RepoId& remote_id)
00259 {
00260   VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
00261 
00262   GuardThreadType guard(this->connections_lock_);
00263 
00264   for (PendConnMap::iterator it = this->pending_connections_.begin();
00265        it != this->pending_connections_.end(); ++it) {
00266     bool erased_from_it = false;
00267     for (size_t i = 0; i < it->second.size(); ++i) {
00268       if (it->second[i].first == client && it->second[i].second == remote_id) {
00269         erased_from_it = true;
00270         it->second.erase(it->second.begin() + i);
00271         break;
00272       }
00273     }
00274 
00275     if (erased_from_it && it->second.empty()) {
00276       this->pending_connections_.erase(it);
00277       return;
00278     }
00279   }
00280 }
00281 
00282 void
00283 MulticastTransport::passive_connection(MulticastPeer local_peer, MulticastPeer remote_peer)
00284 {
00285   GuardThreadType guard(this->connections_lock_);
00286 
00287   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
00288             "from remote peer %#08x%08x to local peer %#08x%08x\n",
00289             this->config().name().c_str(),
00290             (unsigned int) (remote_peer >> 32),
00291             (unsigned int) remote_peer,
00292             (unsigned int) (local_peer >> 32),
00293             (unsigned int) local_peer), 2);
00294 
00295   const Peers peers(remote_peer, local_peer);
00296   const PendConnMap::iterator pend = this->pending_connections_.find(peers);
00297   //if connection was pending, calls to use_datalink finalized the connection
00298   //if it was not previously pending, accept_datalink() will finalize connection
00299   this->connections_.insert(peers);
00300 
00301   Links::const_iterator server_link = this->server_links_.find(local_peer);
00302   DataLink_rch link;
00303 
00304   if (server_link != this->server_links_.end()) {
00305     link = static_rchandle_cast<DataLink>(server_link->second);
00306     MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer));
00307     session->set_acked();
00308   }
00309 
00310   if (pend != pending_connections_.end()) {
00311     Callbacks tmp(pend->second);
00312     for (size_t i = 0; i < tmp.size(); ++i) {
00313       const PendConnMap::iterator pend = pending_connections_.find(peers);
00314       if (pend != pending_connections_.end()) {
00315         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00316                                                   pend->second.end(),
00317                                                   tmp.at(i));
00318         if (tmp_iter != pend->second.end()) {
00319           TransportClient_wrch pend_client = tmp.at(i).first;
00320           RepoId remote_repo = tmp.at(i).second;
00321           guard.release();
00322           TransportClient_rch client = pend_client.lock();
00323           if (client)
00324             client->use_datalink(remote_repo, link);
00325           guard.acquire();
00326         }
00327       }
00328     }
00329   }
00330 }
00331 
00332 bool
00333 MulticastTransport::configure_i(MulticastInst& config)
00334 {
00335   // Override with DCPSDefaultAddress.
00336   if (config.local_address_.empty () &&
00337       !TheServiceParticipant->default_address ().empty ()) {
00338     config.local_address_ = TheServiceParticipant->default_address ().c_str ();
00339   }
00340 
00341   if (!config.group_address_.is_multicast()) {
00342     ACE_ERROR_RETURN((LM_ERROR,
00343                       ACE_TEXT("(%P|%t) ERROR: ")
00344                       ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00345                       ACE_TEXT("invalid configuration: address %C is not ")
00346                       ACE_TEXT("multicast.\n"),
00347                       this, this->config().group_address_.get_host_addr()),
00348                      false);
00349   }
00350 
00351   this->create_reactor_task(config.async_send_);
00352 
00353   return true;
00354 }
00355 
00356 void
00357 MulticastTransport::shutdown_i()
00358 {
00359   GuardThreadType guard_links(this->links_lock_);
00360   Links::iterator link;
00361 
00362   for (link = this->client_links_.begin();
00363        link != this->client_links_.end();
00364        ++link) {
00365     if (link->second.in()) {
00366       link->second->transport_shutdown();
00367     }
00368   }
00369   client_links_.clear();
00370 
00371   for (link = this->server_links_.begin();
00372        link != this->server_links_.end();
00373        ++link) {
00374     if (link->second.in()) {
00375       link->second->transport_shutdown();
00376     }
00377   }
00378   server_links_.clear();
00379 }
00380 
00381 bool
00382 MulticastTransport::connection_info_i(TransportLocator& info) const
00383 {
00384   this->config().populate_locator(info);
00385   return true;
00386 }
00387 
00388 void
00389 MulticastTransport::release_datalink(DataLink* /*link*/)
00390 {
00391   // No-op for multicast: keep both the client_link_ and server_link_ around
00392   // until the transport is shut down.
00393 }
00394 
00395 } // namespace DCPS
00396 } // namespace OpenDDS
00397 
00398 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1