OpenDDS::DCPS::UdpTransport Class Reference

#include <UdpTransport.h>

Inheritance diagram for OpenDDS::DCPS::UdpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpTransport:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 UdpTransport (UdpInst &inst)
void passive_connection (const ACE_INET_Addr &remote_address, const Message_Block_Ptr &data)
UdpInstconfig () const

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual void stop_accepting_or_connecting (const TransportClient_wrch &client, const RepoId &remote_id)
bool configure_i (UdpInst &config)
virtual void shutdown_i ()
virtual bool connection_info_i (TransportLocator &info) const
ACE_INET_Addr get_connection_addr (const TransportBLOB &data) const
virtual void release_datalink (DataLink *link)
virtual std::string transport_type () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
typedef std::vector
< DataLink::OnStartCallback
Callbacks

Private Member Functions

UdpDataLink_rch make_datalink (const ACE_INET_Addr &remote_address, Priority priority, bool active)
PriorityKey blob_to_key (const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active)
typedef OPENDDS_MAP (PriorityKey, UdpDataLink_rch) UdpDataLinkMap
 Map of fully associated DataLinks for this transport. Protected.
typedef OPENDDS_MAP (PriorityKey, Callbacks) PendConnMap

Private Attributes

LockType client_links_lock_
 This lock is used to protect the client_links_ data member.
UdpDataLinkMap client_links_
UdpDataLink_rch server_link_
 The single datalink for the passive side. No locking required.
ACE_Recursive_Thread_Mutex connections_lock_
std::set< PriorityKeyserver_link_keys_
PendConnMap pending_connections_
std::set< PriorityKeypending_server_link_keys_

Detailed Description

Definition at line 28 of file UdpTransport.h.


Member Typedef Documentation

Definition at line 91 of file UdpTransport.h.

Definition at line 67 of file UdpTransport.h.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 66 of file UdpTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::UdpTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 65 of file UdpTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::UdpTransport::UdpTransport ( UdpInst inst  )  [explicit]

Definition at line 28 of file UdpTransport.cpp.

References configure_i(), and OpenDDS::DCPS::TransportImpl::open().

00029   : TransportImpl(inst)
00030 {
00031   if (!(configure_i(inst) && open())) {
00032     throw Transport::UnableToCreate();
00033   }
00034 }

Here is the call graph for this function:


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::accept_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
) [protected, virtual]

accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 102 of file UdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), config(), connections_lock_, LM_DEBUG, pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, server_link_, server_link_keys_, and VDBG.

00105 {
00106   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00107   //GuardType guard(connections_lock_);
00108   const PriorityKey key = blob_to_key(remote.blob_,
00109                                       attribs.priority_, config().local_address(), false /* !active */);
00110   if (server_link_keys_.count(key)) {
00111     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00112     return AcceptConnectResult(UdpDataLink_rch(server_link_));
00113   }
00114 
00115   else if (pending_server_link_keys_.count(key)) {
00116     pending_server_link_keys_.erase(key);
00117     server_link_keys_.insert(key);
00118     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00119     return AcceptConnectResult(UdpDataLink_rch(server_link_));
00120   } else {
00121     const DataLink::OnStartCallback callback(client, remote.repo_id_);
00122     pending_connections_[key].push_back(callback);
00123     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00124     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00125   }
00126      return AcceptConnectResult();
00127 }

Here is the call graph for this function:

PriorityKey OpenDDS::DCPS::UdpTransport::blob_to_key ( const TransportBLOB remote,
Priority  priority,
ACE_INET_Addr  local_addr,
bool  active 
) [private]

Definition at line 230 of file UdpTransport.cpp.

References ACE_TEXT(), LM_ERROR, and OpenDDS::DCPS::NetworkAddress::to_addr().

Referenced by accept_datalink(), connect_datalink(), and passive_connection().

00234 {
00235   NetworkAddress network_order_address;
00236   ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00237 
00238   if (!(cdr >> network_order_address)) {
00239     ACE_ERROR((LM_ERROR,
00240                ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00241                ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00242   }
00243 
00244   ACE_INET_Addr remote_address;
00245   network_order_address.to_addr(remote_address);
00246   const bool is_loopback = remote_address == local_addr;
00247 
00248   return PriorityKey(priority, remote_address, is_loopback, active);
00249 }

Here is the call graph for this function:

Here is the caller graph for this function:

UdpInst & OpenDDS::DCPS::UdpTransport::config (  )  const

Expose the configuration information so others can see what we can do.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 37 of file UdpTransport.cpp.

Referenced by accept_datalink(), connection_info_i(), and passive_connection().

00038 {
00039   return static_cast<UdpInst&>(TransportImpl::config());
00040 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::UdpTransport::configure_i ( UdpInst config  )  [protected]

Definition at line 154 of file UdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::create_reactor_task(), OpenDDS::DCPS::UdpInst::local_address(), make_datalink(), server_link_, and TheServiceParticipant.

Referenced by UdpTransport().

00155 {
00156   create_reactor_task();
00157 
00158   // Override with DCPSDefaultAddress.
00159   if (config.local_address() == ACE_INET_Addr () &&
00160       !TheServiceParticipant->default_address ().empty ()) {
00161 
00162     config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00163   }
00164 
00165   // Our "server side" data link is created here, similar to the acceptor_
00166   // in the TcpTransport implementation.  This establishes a socket as an
00167   // endpoint that we can advertise to peers via connection_info_i().
00168   server_link_ = make_datalink(config.local_address(), 0 /* priority */, false);
00169   return true;
00170 }

Here is the call graph for this function:

Here is the caller graph for this function:

TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::connect_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
) [protected, virtual]

connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 65 of file UdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), client_links_, client_links_lock_, get_connection_addr(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), LM_DEBUG, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, and VDBG.

00068 {
00069 
00070   if (this->is_shut_down()) {
00071     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00072   }
00073   const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00074   const bool active = true;
00075   const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, this->config().local_address(), active);
00076 
00077   GuardType guard(client_links_lock_);
00078   if (this->is_shut_down()) {
00079     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00080   }
00081 
00082   const UdpDataLinkMap::iterator it(client_links_.find(key));
00083   if (it != client_links_.end()) {
00084     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00085     return AcceptConnectResult(UdpDataLink_rch(it->second));
00086   }
00087 
00088   // Create new DataLink for logical connection:
00089   UdpDataLink_rch link (make_datalink(remote_address,
00090                                        attribs.priority_,
00091                                        active));
00092 
00093   if (!link.is_nil()) {
00094     client_links_.insert(UdpDataLinkMap::value_type(key, link));
00095     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00096   }
00097 
00098   return AcceptConnectResult(link);
00099 }

Here is the call graph for this function:

bool OpenDDS::DCPS::UdpTransport::connection_info_i ( TransportLocator local_info  )  const [protected, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 190 of file UdpTransport.cpp.

References config(), and OpenDDS::DCPS::UdpInst::populate_locator().

00191 {
00192   this->config().populate_locator(info);
00193   return true;
00194 }

Here is the call graph for this function:

ACE_INET_Addr OpenDDS::DCPS::UdpTransport::get_connection_addr ( const TransportBLOB data  )  const [protected]

Definition at line 197 of file UdpTransport.cpp.

References len, and OpenDDS::DCPS::NetworkAddress::to_addr().

Referenced by connect_datalink().

00198 {
00199   ACE_INET_Addr local_address;
00200   NetworkAddress network_address;
00201 
00202   size_t len = data.length();
00203   const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00204 
00205   ACE_InputCDR cdr(buffer, len);
00206   if (cdr >> network_address) {
00207     network_address.to_addr(local_address);
00208   }
00209 
00210   return local_address;
00211 }

Here is the call graph for this function:

Here is the caller graph for this function:

UdpDataLink_rch OpenDDS::DCPS::UdpTransport::make_datalink ( const ACE_INET_Addr remote_address,
Priority  priority,
bool  active 
) [private]

Definition at line 44 of file UdpTransport.cpp.

References ACE_TEXT(), OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::TransportImpl::reactor_task(), and OpenDDS::DCPS::ref().

Referenced by configure_i(), and connect_datalink().

00046 {
00047   TransportReactorTask_rch rtask (reactor_task());
00048   UdpDataLink_rch link(make_rch<UdpDataLink>(ref(*this), priority, rtask.in(), active));
00049   // Configure link with transport configuration and reactor task:
00050 
00051   // Open logical connection:
00052   if (link->open(remote_address)) {
00053     return link;
00054   }
00055 
00056   ACE_ERROR((LM_ERROR,
00057               ACE_TEXT("(%P|%t) ERROR: ")
00058               ACE_TEXT("UdpTransport::make_datalink: ")
00059               ACE_TEXT("failed to open DataLink!\n")));
00060 
00061   return UdpDataLink_rch();
00062 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP ( PriorityKey  ,
Callbacks   
) [private]
typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP ( PriorityKey  ,
UdpDataLink_rch   
) [private]

Map of fully associated DataLinks for this transport. Protected.

void OpenDDS::DCPS::UdpTransport::passive_connection ( const ACE_INET_Addr remote_address,
const Message_Block_Ptr data 
)

Definition at line 252 of file UdpTransport.cpp.

References ACE_Guard< ACE_LOCK >::acquire(), blob_to_key(), config(), connections_lock_, OpenDDS::DCPS::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), pending_connections_, pending_server_link_keys_, ACE_Guard< ACE_LOCK >::release(), server_link_, server_link_keys_, OpenDDS::DCPS::static_rchandle_cast(), and VDBG.

00254 {
00255   CORBA::ULong octet_size =
00256     static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00257   Priority priority;
00258   Serializer serializer(data.get());
00259   serializer >> priority;
00260   TransportBLOB blob(octet_size);
00261   blob.length(octet_size);
00262   serializer.read_octet_array(blob.get_buffer(), octet_size);
00263 
00264   // Send an ack so that the active side can return from
00265   // connect_datalink_i().  This is just a single byte of
00266   // arbitrary data, the remote side is not yet using the
00267   // framework (TransportHeader, DataSampleHeader,
00268   // ReceiveStrategy).
00269   const char ack_data = 23;
00270   if (server_link_->socket().send(&ack_data, 1, remote_address) <= 0) {
00271     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection failed to send ack\n"));
00272   }
00273 
00274   const PriorityKey key = blob_to_key(blob, priority, config().local_address(), false /* passive */);
00275 
00276   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00277 
00278   //GuardType guard(connections_lock_);
00279   const PendConnMap::iterator pend = pending_connections_.find(key);
00280 
00281   if (pend != pending_connections_.end()) {
00282 
00283      //don't hold connections_lock_ while calling use_datalink
00284      //guard.release();
00285 
00286     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00287 
00288     const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00289 
00290     //Insert key now to make sure when releasing guard to call use_datalink
00291     //if an accept_datalink obtains lock first it will see that it can proceed
00292     //with using the link and do its own use_datalink call.
00293     server_link_keys_.insert(key);
00294 
00295     //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting
00296     //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can
00297     //still exit the loop without checking the size of invalid memory
00298     //size_t num_callbacks = pend->second.size();
00299 
00300     //Create a copy of the vector of callbacks to process, making sure that each is
00301     //still present in the actual pending_connections_ before calling use_datalink
00302     Callbacks tmp(pend->second);
00303     for (size_t i = 0; i < tmp.size(); ++i) {
00304       const PendConnMap::iterator pend = pending_connections_.find(key);
00305       if (pend != pending_connections_.end()) {
00306         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00307                                                   pend->second.end(),
00308                                                   tmp.at(i));
00309         if (tmp_iter != pend->second.end()) {
00310           TransportClient_wrch pend_client = tmp.at(i).first;
00311           RepoId remote_repo = tmp.at(i).second;
00312           guard.release();
00313           TransportClient_rch client = pend_client.lock();
00314           if (client)
00315             client->use_datalink(remote_repo, link);
00316           guard.acquire();
00317         }
00318       }
00319     }
00320   } else {
00321     // still hold guard(connections_lock_) at this point so
00322     // pending_server_link_keys_ is protected for insert
00323 
00324     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00325     // accept_datalink() will complete the connection.
00326     pending_server_link_keys_.insert(key);
00327   }
00328 }

Here is the call graph for this function:

void OpenDDS::DCPS::UdpTransport::release_datalink ( DataLink link  )  [protected, virtual]

Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 214 of file UdpTransport.cpp.

References client_links_, client_links_lock_, and OpenDDS::DCPS::DataLink::stop().

00215 {
00216   GuardType guard(client_links_lock_);
00217   for (UdpDataLinkMap::iterator it(client_links_.begin());
00218        it != client_links_.end(); ++it) {
00219     // We are guaranteed to have exactly one matching DataLink
00220     // in the map; release any resources held and return.
00221     if (link == static_cast<DataLink*>(it->second.in())) {
00222       link->stop();
00223       client_links_.erase(it);
00224       return;
00225     }
00226   }
00227 }

Here is the call graph for this function:

void OpenDDS::DCPS::UdpTransport::shutdown_i (  )  [protected, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 173 of file UdpTransport.cpp.

References client_links_, client_links_lock_, OpenDDS::DCPS::RcHandle< T >::reset(), and server_link_.

00174 {
00175   // Shutdown reserved datalinks and release configuration:
00176   GuardType guard(client_links_lock_);
00177   for (UdpDataLinkMap::iterator it(client_links_.begin());
00178        it != client_links_.end(); ++it) {
00179     it->second->transport_shutdown();
00180   }
00181   client_links_.clear();
00182 
00183   if (server_link_) {
00184     server_link_->transport_shutdown();
00185     server_link_.reset();
00186   }
00187 }

Here is the call graph for this function:

void OpenDDS::DCPS::UdpTransport::stop_accepting_or_connecting ( const TransportClient_wrch client,
const RepoId remote_id 
) [protected, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 130 of file UdpTransport.cpp.

References connections_lock_, LM_DEBUG, pending_connections_, and VDBG.

00132 {
00133   VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00134 
00135   //GuardType guard(connections_lock_);
00136   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00137 
00138   for (PendConnMap::iterator it = pending_connections_.begin();
00139        it != pending_connections_.end(); ++it) {
00140     for (size_t i = 0; i < it->second.size(); ++i) {
00141       if (it->second[i].first == client && it->second[i].second == remote_id) {
00142         it->second.erase(it->second.begin() + i);
00143         break;
00144       }
00145     }
00146     if (it->second.empty()) {
00147       pending_connections_.erase(it);
00148       return;
00149     }
00150   }
00151 }

virtual std::string OpenDDS::DCPS::UdpTransport::transport_type (  )  const [inline, protected, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 56 of file UdpTransport.h.

00056 { return "udp"; }


Member Data Documentation

Definition at line 75 of file UdpTransport.h.

Referenced by connect_datalink(), release_datalink(), and shutdown_i().

This lock is used to protect the client_links_ data member.

Definition at line 70 of file UdpTransport.h.

Referenced by connect_datalink(), release_datalink(), and shutdown_i().

This protects the pending_connections_, pending_server_link_keys_, and server_link_keys_ data members.

Definition at line 83 of file UdpTransport.h.

Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().

Locked by connections_lock_. Tracks expected connections that we have learned about in accept_datalink() but have not yet performed the handshake.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 96 of file UdpTransport.h.

Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().

Locked by connections_lock_. These are passive-side PriorityKeys that have finished handshaking, but have not been processed by accept_datalink()

Definition at line 101 of file UdpTransport.h.

Referenced by accept_datalink(), and passive_connection().

The single datalink for the passive side. No locking required.

Definition at line 78 of file UdpTransport.h.

Referenced by accept_datalink(), configure_i(), passive_connection(), and shutdown_i().

Locked by connections_lock_. These are passive-side PriorityKeys that have been fully associated (processed by accept_datalink() and finished handshaking). They are ready for use and reuse via server_link_.

Definition at line 89 of file UdpTransport.h.

Referenced by accept_datalink(), and passive_connection().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1