OpenDDS::DCPS::RtpsUdpTransport Class Reference

#include <RtpsUdpTransport.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RtpsUdpTransport (const TransportInst_rch &inst)

Private Types

typedef ACE_Thread_Mutex ThreadLockType
typedef ACE_Guard< ThreadLockTypeGuardThreadType
typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType

Private Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual void stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id)
virtual bool configure_i (TransportInst *config)
virtual void shutdown_i ()
virtual void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
virtual void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
virtual void register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *)
virtual void unregister_for_writer (const RepoId &, const RepoId &, const RepoId &)
virtual bool connection_info_i (TransportLocator &info) const
ACE_INET_Addr get_connection_addr (const TransportBLOB &data, bool &requires_inline_qos) const
virtual void release_datalink (DataLink *link)
void pre_detach (TransportClient *client)
virtual OPENDDS_STRING transport_type () const
RtpsUdpDataLinkmake_datalink (const GuidPrefix_t &local_prefix)
void use_datalink (const RepoId &local_id, const RepoId &remote_id, const TransportBLOB &remote_data, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable)
bool map_ipv4_to_ipv6 () const

Private Attributes

RcHandle< RtpsUdpInstconfig_i_
ThreadLockType links_lock_
LockType connections_lock_
RtpsUdpDataLink_rch link_
ACE_SOCK_Dgram unicast_socket_
TransportClientdefault_listener_

Detailed Description

Definition at line 26 of file RtpsUdpTransport.h.


Member Typedef Documentation

typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::RtpsUdpTransport::GuardThreadType [private]

Definition at line 87 of file RtpsUdpTransport.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::RtpsUdpTransport::GuardType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 93 of file RtpsUdpTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::RtpsUdpTransport::LockType [private]

This protects the connections_ and the pending_connections_ data members.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 92 of file RtpsUdpTransport.h.

typedef ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpTransport::ThreadLockType [private]

Definition at line 86 of file RtpsUdpTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RtpsUdpTransport::RtpsUdpTransport ( const TransportInst_rch inst  )  [explicit]

Definition at line 29 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

00030   : default_listener_(0)
00031 {
00032   if (!inst.is_nil()) {
00033     if (!configure(inst.in())) {
00034       throw Transport::UnableToCreate();
00035     }
00036   }
00037 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::accept_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [private, virtual]

Definition at line 110 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, make_datalink(), and use_datalink().

00113 {
00114   GuardThreadType guard_links(this->links_lock_);
00115   RtpsUdpDataLink_rch link = link_;
00116   if (link_.is_nil()) {
00117     link = make_datalink(attribs.local_id_.guidPrefix);
00118     if (link.is_nil()) {
00119       return AcceptConnectResult();
00120     }
00121   }
00122   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00123                attribs.local_reliable_, remote.reliable_,
00124                attribs.local_durable_, remote.durable_);
00125   return AcceptConnectResult(link._retn());
00126 }

bool OpenDDS::DCPS::RtpsUdpTransport::configure_i ( TransportInst config  )  [private, virtual]

Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 246 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), default_listener_, make_datalink(), OpenDDS::DCPS::open_appropriate_socket_type(), TheServiceParticipant, and unicast_socket_.

00247 {
00248   config_i_ = RtpsUdpInst_rch(dynamic_cast<RtpsUdpInst*>(config), false);
00249 
00250   if (config_i_.is_nil()) {
00251     ACE_ERROR_RETURN((LM_ERROR,
00252                       ACE_TEXT("(%P|%t) ERROR: ")
00253                       ACE_TEXT("RtpsUdpTransport::configure_i: ")
00254                       ACE_TEXT("invalid configuration!\n")),
00255                      false);
00256   }
00257 
00258   // Override with DCPSDefaultAddress.
00259   if (this->config_i_->local_address() == ACE_INET_Addr () &&
00260       !TheServiceParticipant->default_address ().empty ()) {
00261     this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00262   }
00263 
00264   // Open the socket here so that any addresses/ports left
00265   // unspecified in the RtpsUdpInst are known by the time we get to
00266   // connection_info_i().  Opening the sockets here also allows us to
00267   // detect and report errors during DataReader/Writer setup instead
00268   // of during association.
00269 
00270   if (!open_appropriate_socket_type(unicast_socket_, config_i_->local_address())) {
00271     ACE_ERROR_RETURN((LM_ERROR,
00272                       ACE_TEXT("(%P|%t) ERROR: ")
00273                       ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:")
00274                       ACE_TEXT("%m\n")),
00275                       false);
00276   }
00277 
00278   if (config_i_->local_address().get_port_number() == 0) {
00279 
00280     ACE_INET_Addr address;
00281     if (unicast_socket_.get_local_addr(address) != 0) {
00282       ACE_ERROR_RETURN((LM_ERROR,
00283         ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"),
00284         ACE_TEXT("cannot get local addr")), false);
00285     }
00286     config_i_->local_address_set_port(address.get_port_number());
00287   }
00288 
00289   create_reactor_task();
00290 
00291   if (config_i_->opendds_discovery_default_listener_) {
00292     RtpsUdpDataLink_rch link =
00293       make_datalink(config_i_->opendds_discovery_guid_.guidPrefix);
00294     link->default_listener(config_i_->opendds_discovery_default_listener_);
00295     default_listener_ =
00296       dynamic_cast<TransportClient*>(config_i_->opendds_discovery_default_listener_);
00297   }
00298 
00299   return true;
00300 }

TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::connect_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [private, virtual]

Definition at line 71 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), connections_lock_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, make_datalink(), OpenDDS::DCPS::TransportClient::repo_id_, use_datalink(), and VDBG_LVL.

00074 {
00075   GuardThreadType guard_links(this->links_lock_);
00076   RtpsUdpDataLink_rch link = link_;
00077   if (link_.is_nil()) {
00078     link = make_datalink(attribs.local_id_.guidPrefix);
00079     if (link.is_nil()) {
00080       return AcceptConnectResult();
00081     }
00082   }
00083 
00084   use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00085                attribs.local_reliable_, remote.reliable_,
00086                attribs.local_durable_, remote.durable_);
00087 
00088   if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix,
00089                        sizeof(GuidPrefix_t))) {
00090     return AcceptConnectResult(link._retn()); // "loopback" connection return link right away
00091   }
00092 
00093   if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){
00094     return AcceptConnectResult(link._retn());
00095   }
00096 
00097   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00098      // link was started by the reactor thread before we could add a callback
00099      VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2);
00100      return AcceptConnectResult(link._retn());
00101   }
00102 
00103   GuardType guard(connections_lock_);
00104   add_pending_connection(client, link.in());
00105   VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
00106   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00107 }

bool OpenDDS::DCPS::RtpsUdpTransport::connection_info_i ( TransportLocator info  )  const [private, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 185 of file RtpsUdpTransport.cpp.

References config_i_.

00186 {
00187   this->config_i_->populate_locator(info);
00188   return true;
00189 }

ACE_INET_Addr OpenDDS::DCPS::RtpsUdpTransport::get_connection_addr ( const TransportBLOB data,
bool &  requires_inline_qos 
) const [private]

Definition at line 158 of file RtpsUdpTransport.cpp.

References OpenDDS::RTPS::blob_to_locators(), config_i_, OpenDDS::RTPS::locator_to_address(), map_ipv4_to_ipv6(), and DDS::RETCODE_OK.

Referenced by register_for_reader(), register_for_writer(), and use_datalink().

00160 {
00161   using namespace OpenDDS::RTPS;
00162   LocatorSeq locators;
00163   DDS::ReturnCode_t result =
00164     blob_to_locators(remote, locators, requires_inline_qos);
00165   if (result != DDS::RETCODE_OK) {
00166     return ACE_INET_Addr();
00167   }
00168 
00169   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
00170     ACE_INET_Addr addr;
00171     // If conversion was successful
00172     if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) {
00173       // if this is a unicast address, or if we are allowing multicast
00174       if (!addr.is_multicast() || config_i_->use_multicast_) {
00175         return addr;
00176       }
00177     }
00178   }
00179 
00180   // Return default address
00181   return ACE_INET_Addr();
00182 }

RtpsUdpDataLink * OpenDDS::DCPS::RtpsUdpTransport::make_datalink ( const GuidPrefix_t local_prefix  )  [private]

Definition at line 40 of file RtpsUdpTransport.cpp.

References config_i_, OpenDDS::DCPS::RcHandle< T >::in(), link_, OpenDDS::DCPS::TransportImpl::reactor_task(), and unicast_socket_.

Referenced by accept_datalink(), configure_i(), connect_datalink(), register_for_reader(), and register_for_writer().

00041 {
00042   TransportReactorTask_rch rt = reactor_task();
00043   ACE_NEW_RETURN(link_,
00044                  RtpsUdpDataLink(this, local_prefix, config_i_.in(), rt.in()),
00045                  0);
00046 
00047   RtpsUdpSendStrategy* send_strategy;
00048   ACE_NEW_RETURN(send_strategy, RtpsUdpSendStrategy(link_.in()), 0);
00049   link_->send_strategy(send_strategy);
00050 
00051   RtpsUdpReceiveStrategy* recv_strategy;
00052   ACE_NEW_RETURN(recv_strategy, RtpsUdpReceiveStrategy(link_.in()), 0);
00053   link_->receive_strategy(recv_strategy);
00054 
00055   if (!link_->open(unicast_socket_)) {
00056     ACE_ERROR_RETURN((LM_ERROR,
00057                       ACE_TEXT("(%P|%t) ERROR: ")
00058                       ACE_TEXT("RtpsUdpTransport::make_datalink: ")
00059                       ACE_TEXT("failed to open DataLink for socket %d\n"),
00060                       unicast_socket_.get_handle()),
00061                      0);
00062   }
00063 
00064   // RtpsUdpDataLink now owns the socket
00065   unicast_socket_.set_handle(ACE_INVALID_HANDLE);
00066 
00067   return RtpsUdpDataLink_rch(link_)._retn();
00068 }

bool OpenDDS::DCPS::RtpsUdpTransport::map_ipv4_to_ipv6 (  )  const [private]

Definition at line 328 of file RtpsUdpTransport.cpp.

References link_.

Referenced by get_connection_addr().

00329 {
00330   bool map = false;
00331   ACE_INET_Addr tmp;
00332   link_->unicast_socket().get_local_addr(tmp);
00333   if (tmp.get_type() != AF_INET) {
00334     map = true;
00335   }
00336   return map;
00337 }

void OpenDDS::DCPS::RtpsUdpTransport::pre_detach ( TransportClient client  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 319 of file RtpsUdpTransport.cpp.

References default_listener_, OpenDDS::DCPS::RcHandle< T >::is_nil(), and link_.

00320 {
00321   if (default_listener_ && !link_.is_nil() && c == default_listener_) {
00322     link_->default_listener(0);
00323     default_listener_ = 0;
00324   }
00325 }

void OpenDDS::DCPS::RtpsUdpTransport::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
OpenDDS::DCPS::DiscoveryListener listener 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 192 of file RtpsUdpTransport.cpp.

References config_i_, get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().

00197 {
00198   const TransportBLOB* blob = this->config_i_->get_blob(locators);
00199   if (!blob)
00200     return;
00201   if (link_ == 0) {
00202     make_datalink(participant.guidPrefix);
00203   }
00204   bool requires_inline_qos;
00205   link_->register_for_reader(writerid, readerid, get_connection_addr(*blob, requires_inline_qos), listener);
00206 }

void OpenDDS::DCPS::RtpsUdpTransport::register_for_writer ( const RepoId ,
const RepoId ,
const RepoId ,
const TransportLocatorSeq ,
DiscoveryListener  
) [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 219 of file RtpsUdpTransport.cpp.

References config_i_, get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().

00224 {
00225   const TransportBLOB* blob = this->config_i_->get_blob(locators);
00226   if (!blob)
00227     return;
00228   if (link_ == 0) {
00229     make_datalink(participant.guidPrefix);
00230   }
00231   bool requires_inline_qos;
00232   link_->register_for_writer(readerid, writerid, get_connection_addr(*blob, requires_inline_qos), listener);
00233 }

void OpenDDS::DCPS::RtpsUdpTransport::release_datalink ( DataLink link  )  [private, virtual]

The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 313 of file RtpsUdpTransport.cpp.

00314 {
00315   // No-op for rtps_udp: keep the link_ around until the transport is shut down.
00316 }

void OpenDDS::DCPS::RtpsUdpTransport::shutdown_i (  )  [private, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 303 of file RtpsUdpTransport.cpp.

References config_i_, OpenDDS::DCPS::RcHandle< T >::is_nil(), and link_.

00304 {
00305   if (!link_.is_nil()) {
00306     link_->transport_shutdown();
00307   }
00308   link_ = 0;
00309   config_i_ = 0;
00310 }

void OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting ( TransportClient client,
const RepoId remote_id 
) [private, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 130 of file RtpsUdpTransport.cpp.

References connections_lock_, and OpenDDS::DCPS::TransportImpl::OPENDDS_MULTIMAP().

00132 {
00133   GuardType guard(connections_lock_);
00134   typedef OPENDDS_MULTIMAP(TransportClient*, DataLink_rch)::iterator iter_t;
00135   const std::pair<iter_t, iter_t> range =
00136         pending_connections_.equal_range(client);
00137   for (iter_t iter = range.first; iter != range.second; ++iter) {
00138      iter->second->remove_on_start_callback(client, remote_id);
00139   }
00140   pending_connections_.erase(range.first, range.second);
00141 }

virtual OPENDDS_STRING OpenDDS::DCPS::RtpsUdpTransport::transport_type (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 73 of file RtpsUdpTransport.h.

00073 { return "rtps_udp"; }

void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 209 of file RtpsUdpTransport.cpp.

References link_.

00212 {
00213   if (link_ != 0) {
00214     link_->unregister_for_reader(writerid, readerid);
00215   }
00216 }

void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer ( const RepoId ,
const RepoId ,
const RepoId  
) [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 236 of file RtpsUdpTransport.cpp.

References link_.

00239 {
00240   if (link_ != 0) {
00241     link_->unregister_for_writer(readerid, writerid);
00242   }
00243 }

void OpenDDS::DCPS::RtpsUdpTransport::use_datalink ( const RepoId local_id,
const RepoId remote_id,
const TransportBLOB remote_data,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable 
) [private]

Definition at line 144 of file RtpsUdpTransport.cpp.

References get_connection_addr(), and link_.

Referenced by accept_datalink(), and connect_datalink().

00149 {
00150   bool requires_inline_qos;
00151   ACE_INET_Addr addr = get_connection_addr(remote_data, requires_inline_qos);
00152   link_->add_locator(remote_id, addr, requires_inline_qos);
00153   link_->associated(local_id, remote_id, local_reliable, remote_reliable,
00154                     local_durable, remote_durable);
00155 }


Member Data Documentation

RcHandle<RtpsUdpInst> OpenDDS::DCPS::RtpsUdpTransport::config_i_ [private]

Definition at line 83 of file RtpsUdpTransport.h.

Referenced by configure_i(), connection_info_i(), get_connection_addr(), make_datalink(), register_for_reader(), register_for_writer(), and shutdown_i().

LockType OpenDDS::DCPS::RtpsUdpTransport::connections_lock_ [private]

Definition at line 94 of file RtpsUdpTransport.h.

Referenced by connect_datalink(), and stop_accepting_or_connecting().

TransportClient* OpenDDS::DCPS::RtpsUdpTransport::default_listener_ [private]

Definition at line 106 of file RtpsUdpTransport.h.

Referenced by configure_i(), and pre_detach().

RtpsUdpDataLink_rch OpenDDS::DCPS::RtpsUdpTransport::link_ [private]

RTPS uses only one link per transport. This link can be safely reused by any clients that belong to the same domain participant (same GUID prefix). Use by a second participant is not possible because the network location returned by connection_info_i() can't be shared among participants.

Definition at line 101 of file RtpsUdpTransport.h.

Referenced by accept_datalink(), connect_datalink(), make_datalink(), map_ipv4_to_ipv6(), pre_detach(), register_for_reader(), register_for_writer(), shutdown_i(), unregister_for_reader(), unregister_for_writer(), and use_datalink().

ThreadLockType OpenDDS::DCPS::RtpsUdpTransport::links_lock_ [private]

Definition at line 88 of file RtpsUdpTransport.h.

ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpTransport::unicast_socket_ [private]

Definition at line 104 of file RtpsUdpTransport.h.

Referenced by configure_i(), and make_datalink().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:37 2016 for OpenDDS by  doxygen 1.4.7