RtpsUdpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpTransport.h"
00009 #include "RtpsUdpInst.h"
00010 #include "RtpsUdpInst_rch.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013 
00014 #include "dds/DCPS/AssociationData.h"
00015 
00016 #include "dds/DCPS/transport/framework/TransportClient.h"
00017 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00018 
00019 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00020 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00021 
00022 #include "ace/CDR_Base.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Sock_Connect.h"
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 RtpsUdpTransport::RtpsUdpTransport(const TransportInst_rch& inst)
00030   : default_listener_(0)
00031 {
00032   if (!inst.is_nil()) {
00033     if (!configure(inst.in())) {
00034       throw Transport::UnableToCreate();
00035     }
00036   }
00037 }
00038 
00039 RtpsUdpDataLink*
00040 RtpsUdpTransport::make_datalink(const GuidPrefix_t& local_prefix)
00041 {
00042   TransportReactorTask_rch rt = reactor_task();
00043   ACE_NEW_RETURN(link_,
00044                  RtpsUdpDataLink(this, local_prefix, config_i_.in(), rt.in()),
00045                  0);
00046 
00047   RtpsUdpSendStrategy* send_strategy;
00048   ACE_NEW_RETURN(send_strategy, RtpsUdpSendStrategy(link_.in()), 0);
00049   link_->send_strategy(send_strategy);
00050 
00051   RtpsUdpReceiveStrategy* recv_strategy;
00052   ACE_NEW_RETURN(recv_strategy, RtpsUdpReceiveStrategy(link_.in()), 0);
00053   link_->receive_strategy(recv_strategy);
00054 
00055   if (!link_->open(unicast_socket_)) {
00056     ACE_ERROR_RETURN((LM_ERROR,
00057                       ACE_TEXT("(%P|%t) ERROR: ")
00058                       ACE_TEXT("RtpsUdpTransport::make_datalink: ")
00059                       ACE_TEXT("failed to open DataLink for socket %d\n"),
00060                       unicast_socket_.get_handle()),
00061                      0);
00062   }
00063 
00064   // RtpsUdpDataLink now owns the socket
00065   unicast_socket_.set_handle(ACE_INVALID_HANDLE);
00066 
00067   return RtpsUdpDataLink_rch(link_)._retn();
00068 }
00069 
00070 TransportImpl::AcceptConnectResult
00071 RtpsUdpTransport::connect_datalink(const RemoteTransport& remote,
00072                                    const ConnectionAttribs& attribs,
00073                                    TransportClient* client )
00074 {
00075   GuardThreadType guard_links(this->links_lock_);
00076   RtpsUdpDataLink_rch link = link_;
00077   if (link_.is_nil()) {
00078     link = make_datalink(attribs.local_id_.guidPrefix);
00079     if (link.is_nil()) {
00080       return AcceptConnectResult();
00081     }
00082   }
00083 
00084   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00085                attribs.local_reliable_, remote.reliable_,
00086                attribs.local_durable_, remote.durable_);
00087 
00088   if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix,
00089                        sizeof(GuidPrefix_t))) {
00090     return AcceptConnectResult(link._retn()); // "loopback" connection return link right away
00091   }
00092 
00093   if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){
00094     return AcceptConnectResult(link._retn());
00095   }
00096 
00097   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00098      // link was started by the reactor thread before we could add a callback
00099      VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2);
00100      return AcceptConnectResult(link._retn());
00101   }
00102 
00103   GuardType guard(connections_lock_);
00104   add_pending_connection(client, link.in());
00105   VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
00106   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00107 }
00108 
00109 TransportImpl::AcceptConnectResult
00110 RtpsUdpTransport::accept_datalink(const RemoteTransport& remote,
00111                                   const ConnectionAttribs& attribs,
00112                                   TransportClient* )
00113 {
00114   GuardThreadType guard_links(this->links_lock_);
00115   RtpsUdpDataLink_rch link = link_;
00116   if (link_.is_nil()) {
00117     link = make_datalink(attribs.local_id_.guidPrefix);
00118     if (link.is_nil()) {
00119       return AcceptConnectResult();
00120     }
00121   }
00122   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00123                attribs.local_reliable_, remote.reliable_,
00124                attribs.local_durable_, remote.durable_);
00125   return AcceptConnectResult(link._retn());
00126 }
00127 
00128 
00129 void
00130 RtpsUdpTransport::stop_accepting_or_connecting(TransportClient* client,
00131                                                const RepoId& remote_id)
00132 {
00133   GuardType guard(connections_lock_);
00134   typedef OPENDDS_MULTIMAP(TransportClient*, DataLink_rch)::iterator iter_t;
00135   const std::pair<iter_t, iter_t> range =
00136         pending_connections_.equal_range(client);
00137   for (iter_t iter = range.first; iter != range.second; ++iter) {
00138      iter->second->remove_on_start_callback(client, remote_id);
00139   }
00140   pending_connections_.erase(range.first, range.second);
00141 }
00142 
00143 void
00144 RtpsUdpTransport::use_datalink(const RepoId& local_id,
00145                                const RepoId& remote_id,
00146                                const TransportBLOB& remote_data,
00147                                bool local_reliable, bool remote_reliable,
00148                                bool local_durable, bool remote_durable)
00149 {
00150   bool requires_inline_qos;
00151   ACE_INET_Addr addr = get_connection_addr(remote_data, requires_inline_qos);
00152   link_->add_locator(remote_id, addr, requires_inline_qos);
00153   link_->associated(local_id, remote_id, local_reliable, remote_reliable,
00154                     local_durable, remote_durable);
00155 }
00156 
00157 ACE_INET_Addr
00158 RtpsUdpTransport::get_connection_addr(const TransportBLOB& remote,
00159                                       bool& requires_inline_qos) const
00160 {
00161   using namespace OpenDDS::RTPS;
00162   LocatorSeq locators;
00163   DDS::ReturnCode_t result =
00164     blob_to_locators(remote, locators, requires_inline_qos);
00165   if (result != DDS::RETCODE_OK) {
00166     return ACE_INET_Addr();
00167   }
00168 
00169   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
00170     ACE_INET_Addr addr;
00171     // If conversion was successful
00172     if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) {
00173       // if this is a unicast address, or if we are allowing multicast
00174       if (!addr.is_multicast() || config_i_->use_multicast_) {
00175         return addr;
00176       }
00177     }
00178   }
00179 
00180   // Return default address
00181   return ACE_INET_Addr();
00182 }
00183 
00184 bool
00185 RtpsUdpTransport::connection_info_i(TransportLocator& info) const
00186 {
00187   this->config_i_->populate_locator(info);
00188   return true;
00189 }
00190 
00191 void
00192 RtpsUdpTransport::register_for_reader(const RepoId& participant,
00193                                       const RepoId& writerid,
00194                                       const RepoId& readerid,
00195                                       const TransportLocatorSeq& locators,
00196                                       OpenDDS::DCPS::DiscoveryListener* listener)
00197 {
00198   const TransportBLOB* blob = this->config_i_->get_blob(locators);
00199   if (!blob)
00200     return;
00201   if (link_ == 0) {
00202     make_datalink(participant.guidPrefix);
00203   }
00204   bool requires_inline_qos;
00205   link_->register_for_reader(writerid, readerid, get_connection_addr(*blob, requires_inline_qos), listener);
00206 }
00207 
00208 void
00209 RtpsUdpTransport::unregister_for_reader(const RepoId& /*participant*/,
00210                                         const RepoId& writerid,
00211                                         const RepoId& readerid)
00212 {
00213   if (link_ != 0) {
00214     link_->unregister_for_reader(writerid, readerid);
00215   }
00216 }
00217 
00218 void
00219 RtpsUdpTransport::register_for_writer(const RepoId& participant,
00220                                       const RepoId& readerid,
00221                                       const RepoId& writerid,
00222                                       const TransportLocatorSeq& locators,
00223                                       DiscoveryListener* listener)
00224 {
00225   const TransportBLOB* blob = this->config_i_->get_blob(locators);
00226   if (!blob)
00227     return;
00228   if (link_ == 0) {
00229     make_datalink(participant.guidPrefix);
00230   }
00231   bool requires_inline_qos;
00232   link_->register_for_writer(readerid, writerid, get_connection_addr(*blob, requires_inline_qos), listener);
00233 }
00234 
00235 void
00236 RtpsUdpTransport::unregister_for_writer(const RepoId& /*participant*/,
00237                                         const RepoId& readerid,
00238                                         const RepoId& writerid)
00239 {
00240   if (link_ != 0) {
00241     link_->unregister_for_writer(readerid, writerid);
00242   }
00243 }
00244 
00245 bool
00246 RtpsUdpTransport::configure_i(TransportInst* config)
00247 {
00248   config_i_ = RtpsUdpInst_rch(dynamic_cast<RtpsUdpInst*>(config), false);
00249 
00250   if (config_i_.is_nil()) {
00251     ACE_ERROR_RETURN((LM_ERROR,
00252                       ACE_TEXT("(%P|%t) ERROR: ")
00253                       ACE_TEXT("RtpsUdpTransport::configure_i: ")
00254                       ACE_TEXT("invalid configuration!\n")),
00255                      false);
00256   }
00257 
00258   // Override with DCPSDefaultAddress.
00259   if (this->config_i_->local_address() == ACE_INET_Addr () &&
00260       !TheServiceParticipant->default_address ().empty ()) {
00261     this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00262   }
00263 
00264   // Open the socket here so that any addresses/ports left
00265   // unspecified in the RtpsUdpInst are known by the time we get to
00266   // connection_info_i().  Opening the sockets here also allows us to
00267   // detect and report errors during DataReader/Writer setup instead
00268   // of during association.
00269 
00270   if (!open_appropriate_socket_type(unicast_socket_, config_i_->local_address())) {
00271     ACE_ERROR_RETURN((LM_ERROR,
00272                       ACE_TEXT("(%P|%t) ERROR: ")
00273                       ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:")
00274                       ACE_TEXT("%m\n")),
00275                       false);
00276   }
00277 
00278   if (config_i_->local_address().get_port_number() == 0) {
00279 
00280     ACE_INET_Addr address;
00281     if (unicast_socket_.get_local_addr(address) != 0) {
00282       ACE_ERROR_RETURN((LM_ERROR,
00283         ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"),
00284         ACE_TEXT("cannot get local addr")), false);
00285     }
00286     config_i_->local_address_set_port(address.get_port_number());
00287   }
00288 
00289   create_reactor_task();
00290 
00291   if (config_i_->opendds_discovery_default_listener_) {
00292     RtpsUdpDataLink_rch link =
00293       make_datalink(config_i_->opendds_discovery_guid_.guidPrefix);
00294     link->default_listener(config_i_->opendds_discovery_default_listener_);
00295     default_listener_ =
00296       dynamic_cast<TransportClient*>(config_i_->opendds_discovery_default_listener_);
00297   }
00298 
00299   return true;
00300 }
00301 
00302 void
00303 RtpsUdpTransport::shutdown_i()
00304 {
00305   if (!link_.is_nil()) {
00306     link_->transport_shutdown();
00307   }
00308   link_ = 0;
00309   config_i_ = 0;
00310 }
00311 
00312 void
00313 RtpsUdpTransport::release_datalink(DataLink* /*link*/)
00314 {
00315   // No-op for rtps_udp: keep the link_ around until the transport is shut down.
00316 }
00317 
00318 void
00319 RtpsUdpTransport::pre_detach(TransportClient* c)
00320 {
00321   if (default_listener_ && !link_.is_nil() && c == default_listener_) {
00322     link_->default_listener(0);
00323     default_listener_ = 0;
00324   }
00325 }
00326 
00327 bool
00328 RtpsUdpTransport::map_ipv4_to_ipv6() const
00329 {
00330   bool map = false;
00331   ACE_INET_Addr tmp;
00332   link_->unicast_socket().get_local_addr(tmp);
00333   if (tmp.get_type() != AF_INET) {
00334     map = true;
00335   }
00336   return map;
00337 }
00338 } // namespace DCPS
00339 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7