RtpsUdpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpTransport.h"
00009 #include "RtpsUdpInst.h"
00010 #include "RtpsUdpInst_rch.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013 
00014 #include "dds/DCPS/AssociationData.h"
00015 
00016 #include "dds/DCPS/transport/framework/TransportClient.h"
00017 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00018 
00019 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00020 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00021 
00022 #include "ace/CDR_Base.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Sock_Connect.h"
00025 
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 RtpsUdpTransport::RtpsUdpTransport(RtpsUdpInst& inst)
00032   : TransportImpl(inst)
00033 #if defined(OPENDDS_SECURITY)
00034   , local_crypto_handle_(DDS::HANDLE_NIL)
00035 #endif
00036 {
00037   if (! (configure_i(inst) && open())) {
00038     throw Transport::UnableToCreate();
00039   }
00040 }
00041 
00042 RtpsUdpInst&
00043 RtpsUdpTransport::config() const
00044 {
00045   return static_cast<RtpsUdpInst&>(TransportImpl::config());
00046 }
00047 
00048 RtpsUdpDataLink_rch
00049 RtpsUdpTransport::make_datalink(const GuidPrefix_t& local_prefix)
00050 {
00051 
00052   RtpsUdpDataLink_rch link = make_rch<RtpsUdpDataLink>(ref(*this), local_prefix, config(), reactor_task());
00053 
00054 #if defined(OPENDDS_SECURITY)
00055   link->local_crypto_handle(local_crypto_handle_);
00056 #endif
00057 
00058   if (!link->open(unicast_socket_)) {
00059     ACE_ERROR((LM_ERROR,
00060                       ACE_TEXT("(%P|%t) ERROR: ")
00061                       ACE_TEXT("RtpsUdpTransport::make_datalink: ")
00062                       ACE_TEXT("failed to open DataLink for socket %d\n"),
00063                       unicast_socket_.get_handle()));
00064     return RtpsUdpDataLink_rch();
00065   }
00066 
00067   // RtpsUdpDataLink now owns the socket
00068   unicast_socket_.set_handle(ACE_INVALID_HANDLE);
00069 
00070   return link;
00071 }
00072 
00073 TransportImpl::AcceptConnectResult
00074 RtpsUdpTransport::connect_datalink(const RemoteTransport& remote,
00075                                    const ConnectionAttribs& attribs,
00076                                    const TransportClient_rch& client )
00077 {
00078   GuardThreadType guard_links(this->links_lock_);
00079 
00080   if (link_.is_nil()) {
00081     link_ =  make_datalink(attribs.local_id_.guidPrefix);
00082     if (link_.is_nil()) {
00083       return AcceptConnectResult();
00084     }
00085   }
00086 
00087   RtpsUdpDataLink_rch link = link_;
00088 
00089 
00090   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00091                attribs.local_reliable_, remote.reliable_,
00092                attribs.local_durable_, remote.durable_);
00093 
00094   if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix,
00095                        sizeof(GuidPrefix_t))) {
00096     return AcceptConnectResult(link); // "loopback" connection return link right away
00097   }
00098 
00099   if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){
00100     return AcceptConnectResult(link);
00101   }
00102 
00103   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00104      // link was started by the reactor thread before we could add a callback
00105      VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2);
00106      return AcceptConnectResult(link);
00107   }
00108 
00109   GuardType guard(connections_lock_);
00110   add_pending_connection(client, link);
00111   VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
00112   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00113 }
00114 
00115 TransportImpl::AcceptConnectResult
00116 RtpsUdpTransport::accept_datalink(const RemoteTransport& remote,
00117                                   const ConnectionAttribs& attribs,
00118                                   const TransportClient_rch& )
00119 {
00120   GuardThreadType guard_links(this->links_lock_);
00121   if (link_.is_nil()) {
00122     link_=  make_datalink(attribs.local_id_.guidPrefix);
00123     if (link_.is_nil()) {
00124       return AcceptConnectResult();
00125     }
00126   }
00127   RtpsUdpDataLink_rch link = link_;
00128 
00129   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00130                attribs.local_reliable_, remote.reliable_,
00131                attribs.local_durable_, remote.durable_);
00132   return AcceptConnectResult(link);
00133 }
00134 
00135 
00136 void
00137 RtpsUdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00138                                                const RepoId& remote_id)
00139 {
00140   GuardType guard(connections_lock_);
00141   typedef PendConnMap::iterator iter_t;
00142   const std::pair<iter_t, iter_t> range =
00143         pending_connections_.equal_range(client);
00144   for (iter_t iter = range.first; iter != range.second; ++iter) {
00145      iter->second->remove_on_start_callback(client, remote_id);
00146   }
00147   pending_connections_.erase(range.first, range.second);
00148 }
00149 
00150 void
00151 RtpsUdpTransport::use_datalink(const RepoId& local_id,
00152                                const RepoId& remote_id,
00153                                const TransportBLOB& remote_data,
00154                                bool local_reliable, bool remote_reliable,
00155                                bool local_durable, bool remote_durable)
00156 {
00157   bool requires_inline_qos;
00158   unsigned int blob_bytes_read;
00159   ACE_INET_Addr addr = get_connection_addr(remote_data, &requires_inline_qos,
00160                                            &blob_bytes_read);
00161   link_->add_locator(remote_id, addr, requires_inline_qos);
00162 
00163 #if defined(OPENDDS_SECURITY)
00164   if (remote_data.length() > blob_bytes_read) {
00165     link_->populate_security_handles(local_id, remote_id,
00166                                      remote_data.get_buffer() + blob_bytes_read,
00167                                      remote_data.length() - blob_bytes_read);
00168   }
00169 #endif
00170 
00171   link_->associated(local_id, remote_id, local_reliable, remote_reliable,
00172                     local_durable, remote_durable);
00173 }
00174 
00175 ACE_INET_Addr
00176 RtpsUdpTransport::get_connection_addr(const TransportBLOB& remote,
00177                                       bool* requires_inline_qos,
00178                                       unsigned int* blob_bytes_read) const
00179 {
00180   using namespace OpenDDS::RTPS;
00181   LocatorSeq locators;
00182   DDS::ReturnCode_t result =
00183     blob_to_locators(remote, locators, requires_inline_qos, blob_bytes_read);
00184   if (result != DDS::RETCODE_OK) {
00185     return ACE_INET_Addr();
00186   }
00187 
00188   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
00189     ACE_INET_Addr addr;
00190     // If conversion was successful
00191     if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) {
00192       // if this is a unicast address, or if we are allowing multicast
00193       if (!addr.is_multicast() || config().use_multicast_) {
00194         return addr;
00195       }
00196     }
00197   }
00198 
00199   // Return default address
00200   return ACE_INET_Addr();
00201 }
00202 
00203 bool
00204 RtpsUdpTransport::connection_info_i(TransportLocator& info) const
00205 {
00206   this->config().populate_locator(info);
00207   return true;
00208 }
00209 
00210 void
00211 RtpsUdpTransport::register_for_reader(const RepoId& participant,
00212                                       const RepoId& writerid,
00213                                       const RepoId& readerid,
00214                                       const TransportLocatorSeq& locators,
00215                                       OpenDDS::DCPS::DiscoveryListener* listener)
00216 {
00217   const TransportBLOB* blob = this->config().get_blob(locators);
00218   if (!blob) {
00219     return;
00220   }
00221 
00222   if (!link_) {
00223     link_ = make_datalink(participant.guidPrefix);
00224   }
00225 
00226   link_->register_for_reader(writerid, readerid, get_connection_addr(*blob),
00227                              listener);
00228 }
00229 
00230 void
00231 RtpsUdpTransport::unregister_for_reader(const RepoId& /*participant*/,
00232                                         const RepoId& writerid,
00233                                         const RepoId& readerid)
00234 {
00235   if (link_) {
00236     link_->unregister_for_reader(writerid, readerid);
00237   }
00238 }
00239 
00240 void
00241 RtpsUdpTransport::register_for_writer(const RepoId& participant,
00242                                       const RepoId& readerid,
00243                                       const RepoId& writerid,
00244                                       const TransportLocatorSeq& locators,
00245                                       DiscoveryListener* listener)
00246 {
00247   const TransportBLOB* blob = this->config().get_blob(locators);
00248   if (!blob) {
00249     return;
00250   }
00251 
00252   if (!link_) {
00253     link_ = make_datalink(participant.guidPrefix);
00254   }
00255 
00256   link_->register_for_writer(readerid, writerid, get_connection_addr(*blob),
00257                              listener);
00258 }
00259 
00260 void
00261 RtpsUdpTransport::unregister_for_writer(const RepoId& /*participant*/,
00262                                         const RepoId& readerid,
00263                                         const RepoId& writerid)
00264 {
00265   if (link_) {
00266     link_->unregister_for_writer(readerid, writerid);
00267   }
00268 }
00269 
00270 bool
00271 RtpsUdpTransport::configure_i(RtpsUdpInst& config)
00272 {
00273   // Override with DCPSDefaultAddress.
00274   if (config.local_address() == ACE_INET_Addr () &&
00275       !TheServiceParticipant->default_address ().empty ()) {
00276     config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00277   }
00278 
00279   // Open the socket here so that any addresses/ports left
00280   // unspecified in the RtpsUdpInst are known by the time we get to
00281   // connection_info_i().  Opening the sockets here also allows us to
00282   // detect and report errors during DataReader/Writer setup instead
00283   // of during association.
00284 
00285   if (!open_appropriate_socket_type(unicast_socket_, config.local_address())) {
00286     ACE_ERROR_RETURN((LM_ERROR,
00287                       ACE_TEXT("(%P|%t) ERROR: ")
00288                       ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:")
00289                       ACE_TEXT("%m\n")),
00290                       false);
00291   }
00292 
00293   if (config.local_address().get_port_number() == 0) {
00294 
00295     ACE_INET_Addr address;
00296     if (unicast_socket_.get_local_addr(address) != 0) {
00297       ACE_ERROR_RETURN((LM_ERROR,
00298         ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"),
00299         ACE_TEXT("cannot get local addr")), false);
00300     }
00301     config.local_address_set_port(address.get_port_number());
00302   }
00303 
00304   create_reactor_task();
00305 
00306   if (config.opendds_discovery_default_listener_) {
00307     link_= make_datalink(config.opendds_discovery_guid_.guidPrefix);
00308     link_->default_listener(*config.opendds_discovery_default_listener_);
00309   }
00310 
00311   return true;
00312 }
00313 
00314 void
00315 RtpsUdpTransport::shutdown_i()
00316 {
00317   if (!link_.is_nil()) {
00318     link_->transport_shutdown();
00319   }
00320   link_.reset();
00321 }
00322 
00323 void
00324 RtpsUdpTransport::release_datalink(DataLink* /*link*/)
00325 {
00326   // No-op for rtps_udp: keep the link_ around until the transport is shut down.
00327 }
00328 
00329 
00330 
00331 bool
00332 RtpsUdpTransport::map_ipv4_to_ipv6() const
00333 {
00334   bool map = false;
00335   ACE_INET_Addr tmp;
00336   link_->unicast_socket().get_local_addr(tmp);
00337   if (tmp.get_type() != AF_INET) {
00338     map = true;
00339   }
00340   return map;
00341 }
00342 } // namespace DCPS
00343 } // namespace OpenDDS
00344 
00345 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1