OpenDDS::DCPS::UdpTransport Class Reference

#include <UdpTransport.h>

Inheritance diagram for OpenDDS::DCPS::UdpTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 UdpTransport (const TransportInst_rch &inst)
void passive_connection (const ACE_INET_Addr &remote_address, ACE_Message_Block *data)

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual void stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id)
virtual bool configure_i (TransportInst *config)
virtual void shutdown_i ()
virtual bool connection_info_i (TransportLocator &info) const
ACE_INET_Addr get_connection_addr (const TransportBLOB &data) const
virtual void release_datalink (DataLink *link)
virtual std::string transport_type () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
typedef std::vector< DataLink::OnStartCallbackCallbacks

Private Member Functions

UdpDataLinkmake_datalink (const ACE_INET_Addr &remote_address, Priority priority, bool active)
PriorityKey blob_to_key (const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active)
typedef OPENDDS_MAP (PriorityKey, UdpDataLink_rch) UdpDataLinkMap
 Map of fully associated DataLinks for this transport. Protected.
typedef OPENDDS_MAP (PriorityKey, Callbacks) PendConnMap

Private Attributes

RcHandle< UdpInstconfig_i_
LockType client_links_lock_
 This lock is used to protect the client_links_ data member.
UdpDataLinkMap client_links_
UdpDataLink_rch server_link_
 The single datalink for the passive side. No locking required.
ACE_Recursive_Thread_Mutex connections_lock_
std::set< PriorityKeyserver_link_keys_
PendConnMap pending_connections_
std::set< PriorityKeypending_server_link_keys_

Detailed Description

Definition at line 25 of file UdpTransport.h.


Member Typedef Documentation

typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::UdpTransport::Callbacks [private]

Definition at line 90 of file UdpTransport.h.

typedef ACE_Condition<LockType> OpenDDS::DCPS::UdpTransport::ConditionType [private]

Definition at line 66 of file UdpTransport.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::UdpTransport::GuardType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 65 of file UdpTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::UdpTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 64 of file UdpTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::UdpTransport::UdpTransport ( const TransportInst_rch inst  )  [explicit]

Definition at line 26 of file UdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

00027 {
00028   if (!inst.is_nil()) {
00029     if (!configure(inst.in())) {
00030       throw Transport::UnableToCreate();
00031     }
00032   }
00033 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::accept_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 114 of file UdpTransport.cpp.

References blob_to_key(), config_i_, connections_lock_, pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::TransportClient::repo_id_, server_link_, server_link_keys_, and VDBG.

00117 {
00118   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00119   //GuardType guard(connections_lock_);
00120   const PriorityKey key = blob_to_key(remote.blob_,
00121                                       attribs.priority_, config_i_->local_address(), false /* !active */);
00122   if (server_link_keys_.count(key)) {
00123     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00124     return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00125   }
00126 
00127   else if (pending_server_link_keys_.count(key)) {
00128     pending_server_link_keys_.erase(key);
00129     server_link_keys_.insert(key);
00130     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00131     return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00132   } else {
00133     const DataLink::OnStartCallback callback(client, remote.repo_id_);
00134     pending_connections_[key].push_back(callback);
00135     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00136     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00137   }
00138      return AcceptConnectResult();
00139 }

PriorityKey OpenDDS::DCPS::UdpTransport::blob_to_key ( const TransportBLOB remote,
Priority  priority,
ACE_INET_Addr  local_addr,
bool  active 
) [private]

Definition at line 251 of file UdpTransport.cpp.

References OpenDDS::DCPS::NetworkAddress::to_addr().

Referenced by accept_datalink(), connect_datalink(), and passive_connection().

00255 {
00256   NetworkAddress network_order_address;
00257   ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00258 
00259   if ((cdr >> network_order_address) == 0) {
00260     ACE_ERROR((LM_ERROR,
00261                ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00262                ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00263   }
00264 
00265   ACE_INET_Addr remote_address;
00266   network_order_address.to_addr(remote_address);
00267   const bool is_loopback = remote_address == local_addr;
00268 
00269   return PriorityKey(priority, remote_address, is_loopback, active);
00270 }

bool OpenDDS::DCPS::UdpTransport::configure_i ( TransportInst config  )  [protected, virtual]

Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 166 of file UdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), make_datalink(), server_link_, and TheServiceParticipant.

00167 {
00168   config_i_ = UdpInst_rch(dynamic_cast<UdpInst*>(config), false);
00169 
00170   if (config_i_ == 0) {
00171     ACE_ERROR_RETURN((LM_ERROR,
00172                       ACE_TEXT("(%P|%t) ERROR: ")
00173                       ACE_TEXT("UdpTransport::configure_i: ")
00174                       ACE_TEXT("invalid configuration!\n")),
00175                      false);
00176   }
00177 
00178   create_reactor_task();
00179 
00180   // Override with DCPSDefaultAddress.
00181   if (this->config_i_->local_address() == ACE_INET_Addr () &&
00182       !TheServiceParticipant->default_address ().empty ()) {
00183     this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00184   }
00185 
00186   // Our "server side" data link is created here, similar to the acceptor_
00187   // in the TcpTransport implementation.  This establishes a socket as an
00188   // endpoint that we can advertise to peers via connection_info_i().
00189   server_link_ = make_datalink(this->config_i_->local_address(), 0 /* priority */, false);
00190   return true;
00191 }

TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::connect_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 77 of file UdpTransport.cpp.

References blob_to_key(), client_links_, client_links_lock_, config_i_, get_connection_addr(), make_datalink(), and VDBG.

00080 {
00081   UdpInst_rch tmp_config(this->config_i_.in(), false);
00082   if (this->is_shut_down() || this->config_i_.is_nil()) {
00083     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00084   }
00085   const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00086   const bool active = true;
00087   const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, tmp_config->local_address(), active);
00088   tmp_config = 0; //no longer need to hold on to local config
00089   GuardType guard(client_links_lock_);
00090   if (this->is_shut_down() || this->config_i_.is_nil()) {
00091     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00092   }
00093 
00094   const UdpDataLinkMap::iterator it(client_links_.find(key));
00095   if (it != client_links_.end()) {
00096     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00097     return AcceptConnectResult(UdpDataLink_rch(it->second)._retn());
00098   }
00099 
00100   // Create new DataLink for logical connection:
00101   UdpDataLink_rch link = make_datalink(remote_address,
00102                                        attribs.priority_,
00103                                        active);
00104 
00105   if (!link.is_nil()) {
00106     client_links_.insert(UdpDataLinkMap::value_type(key, link));
00107     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00108   }
00109 
00110   return AcceptConnectResult(link._retn());
00111 }

bool OpenDDS::DCPS::UdpTransport::connection_info_i ( TransportLocator info  )  const [protected, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 211 of file UdpTransport.cpp.

References config_i_.

00212 {
00213   this->config_i_->populate_locator(info);
00214   return true;
00215 }

ACE_INET_Addr OpenDDS::DCPS::UdpTransport::get_connection_addr ( const TransportBLOB data  )  const [protected]

Definition at line 218 of file UdpTransport.cpp.

References OpenDDS::DCPS::NetworkAddress::to_addr().

Referenced by connect_datalink().

00219 {
00220   ACE_INET_Addr local_address;
00221   NetworkAddress network_address;
00222 
00223   size_t len = data.length();
00224   const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00225 
00226   ACE_InputCDR cdr(buffer, len);
00227   cdr >> network_address;
00228 
00229   network_address.to_addr(local_address);
00230 
00231   return local_address;
00232 }

UdpDataLink * OpenDDS::DCPS::UdpTransport::make_datalink ( const ACE_INET_Addr &  remote_address,
Priority  priority,
bool  active 
) [private]

Definition at line 36 of file UdpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), config_i_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and OpenDDS::DCPS::TransportImpl::reactor_task().

Referenced by configure_i(), and connect_datalink().

00038 {
00039   UdpDataLink_rch link;
00040   ACE_NEW_RETURN(link, UdpDataLink(this, priority, active), 0);
00041 
00042   if (link.is_nil()) {
00043     ACE_ERROR_RETURN((LM_ERROR,
00044                       ACE_TEXT("(%P|%t) ERROR: ")
00045                       ACE_TEXT("UdpTransport::make_datalink: ")
00046                       ACE_TEXT("failed to create DataLink!\n")),
00047                      0);
00048   }
00049 
00050   // Configure link with transport configuration and reactor task:
00051   TransportReactorTask_rch rtask = reactor_task();
00052   link->configure(config_i_.in(), rtask.in());
00053 
00054   // Assign send strategy:
00055   UdpSendStrategy* send_strategy;
00056   ACE_NEW_RETURN(send_strategy, UdpSendStrategy(link.in()), 0);
00057   link->send_strategy(send_strategy);
00058 
00059   // Assign receive strategy:
00060   UdpReceiveStrategy* recv_strategy;
00061   ACE_NEW_RETURN(recv_strategy, UdpReceiveStrategy(link.in()), 0);
00062   link->receive_strategy(recv_strategy);
00063 
00064   // Open logical connection:
00065   if (!link->open(remote_address)) {
00066     ACE_ERROR_RETURN((LM_ERROR,
00067                       ACE_TEXT("(%P|%t) ERROR: ")
00068                       ACE_TEXT("UdpTransport::make_datalink: ")
00069                       ACE_TEXT("failed to open DataLink!\n")),
00070                      0);
00071   }
00072 
00073   return link._retn();
00074 }

typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP ( PriorityKey  ,
Callbacks   
) [private]

typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP ( PriorityKey  ,
UdpDataLink_rch   
) [private]

Map of fully associated DataLinks for this transport. Protected.

void OpenDDS::DCPS::UdpTransport::passive_connection ( const ACE_INET_Addr &  remote_address,
ACE_Message_Block *  data 
)

Definition at line 273 of file UdpTransport.cpp.

References blob_to_key(), config_i_, connections_lock_, OpenDDS::DCPS::find(), pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::Serializer::read_octet_array(), server_link_, server_link_keys_, and VDBG.

00275 {
00276   CORBA::ULong octet_size =
00277     static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00278   Priority priority;
00279   Serializer serializer(data);
00280   serializer >> priority;
00281   TransportBLOB blob(octet_size);
00282   blob.length(octet_size);
00283   serializer.read_octet_array(blob.get_buffer(), octet_size);
00284 
00285   // Send an ack so that the active side can return from
00286   // connect_datalink_i().  This is just a single byte of
00287   // arbitrary data, the remote side is not yet using the
00288   // framework (TransportHeader, DataSampleHeader,
00289   // ReceiveStrategy).
00290   const char ack_data = 23;
00291   server_link_->socket().send(&ack_data, 1, remote_address);
00292 
00293   const PriorityKey key = blob_to_key(blob, priority, config_i_->local_address(), false /* passive */);
00294 
00295   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00296 
00297   //GuardType guard(connections_lock_);
00298   const PendConnMap::iterator pend = pending_connections_.find(key);
00299 
00300   if (pend != pending_connections_.end()) {
00301 
00302      //don't hold connections_lock_ while calling use_datalink
00303      //guard.release();
00304 
00305     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00306 
00307     const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00308 
00309     //Insert key now to make sure when releasing guard to call use_datalink
00310     //if an accept_datalink obtains lock first it will see that it can proceed
00311     //with using the link and do its own use_datalink call.
00312     server_link_keys_.insert(key);
00313 
00314     //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting
00315     //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can
00316     //still exit the loop without checking the size of invalid memory
00317     //size_t num_callbacks = pend->second.size();
00318 
00319     //Create a copy of the vector of callbacks to process, making sure that each is
00320     //still present in the actual pending_connections_ before calling use_datalink
00321     Callbacks tmp(pend->second);
00322     for (size_t i = 0; i < tmp.size(); ++i) {
00323       const PendConnMap::iterator pend = pending_connections_.find(key);
00324       if (pend != pending_connections_.end()) {
00325         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00326                                                   pend->second.end(),
00327                                                   tmp.at(i));
00328         if (tmp_iter != pend->second.end()) {
00329           TransportClient* pend_client = tmp.at(i).first;
00330           RepoId remote_repo = tmp.at(i).second;
00331           guard.release();
00332           pend_client->use_datalink(remote_repo, link);
00333           guard.acquire();
00334         }
00335       }
00336     }
00337   } else {
00338     // still hold guard(connections_lock_) at this point so
00339     // pending_server_link_keys_ is protected for insert
00340 
00341     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00342     // accept_datalink() will complete the connection.
00343     pending_server_link_keys_.insert(key);
00344   }
00345 }

void OpenDDS::DCPS::UdpTransport::release_datalink ( DataLink link  )  [protected, virtual]

The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 235 of file UdpTransport.cpp.

References client_links_, client_links_lock_, and OpenDDS::DCPS::DataLink::stop().

00236 {
00237   GuardType guard(client_links_lock_);
00238   for (UdpDataLinkMap::iterator it(client_links_.begin());
00239        it != client_links_.end(); ++it) {
00240     // We are guaranteed to have exactly one matching DataLink
00241     // in the map; release any resources held and return.
00242     if (link == static_cast<DataLink*>(it->second.in())) {
00243       link->stop();
00244       client_links_.erase(it);
00245       return;
00246     }
00247   }
00248 }

void OpenDDS::DCPS::UdpTransport::shutdown_i (  )  [protected, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 194 of file UdpTransport.cpp.

References client_links_, client_links_lock_, config_i_, and server_link_.

00195 {
00196   // Shutdown reserved datalinks and release configuration:
00197   GuardType guard(client_links_lock_);
00198   for (UdpDataLinkMap::iterator it(client_links_.begin());
00199        it != client_links_.end(); ++it) {
00200     it->second->transport_shutdown();
00201   }
00202   client_links_.clear();
00203 
00204   server_link_->transport_shutdown();
00205   server_link_ = 0;
00206 
00207   config_i_ = 0;
00208 }

void OpenDDS::DCPS::UdpTransport::stop_accepting_or_connecting ( TransportClient client,
const RepoId remote_id 
) [protected, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 142 of file UdpTransport.cpp.

References connections_lock_, pending_connections_, and VDBG.

00144 {
00145   VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00146 
00147   //GuardType guard(connections_lock_);
00148   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00149 
00150   for (PendConnMap::iterator it = pending_connections_.begin();
00151        it != pending_connections_.end(); ++it) {
00152     for (size_t i = 0; i < it->second.size(); ++i) {
00153       if (it->second[i].first == client && it->second[i].second == remote_id) {
00154         it->second.erase(it->second.begin() + i);
00155         break;
00156       }
00157     }
00158     if (it->second.empty()) {
00159       pending_connections_.erase(it);
00160       return;
00161     }
00162   }
00163 }

virtual std::string OpenDDS::DCPS::UdpTransport::transport_type (  )  const [inline, protected, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 53 of file UdpTransport.h.

00053 { return "udp"; }


Member Data Documentation

UdpDataLinkMap OpenDDS::DCPS::UdpTransport::client_links_ [private]

Definition at line 74 of file UdpTransport.h.

Referenced by connect_datalink(), release_datalink(), and shutdown_i().

LockType OpenDDS::DCPS::UdpTransport::client_links_lock_ [private]

This lock is used to protect the client_links_ data member.

Definition at line 69 of file UdpTransport.h.

Referenced by connect_datalink(), release_datalink(), and shutdown_i().

RcHandle<UdpInst> OpenDDS::DCPS::UdpTransport::config_i_ [private]

Definition at line 62 of file UdpTransport.h.

Referenced by accept_datalink(), configure_i(), connect_datalink(), connection_info_i(), make_datalink(), passive_connection(), and shutdown_i().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::UdpTransport::connections_lock_ [private]

This protects the pending_connections_, pending_server_link_keys_, and server_link_keys_ data members.

Definition at line 82 of file UdpTransport.h.

Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().

PendConnMap OpenDDS::DCPS::UdpTransport::pending_connections_ [private]

Locked by connections_lock_. Tracks expected connections that we have learned about in accept_datalink() but have not yet performed the handshake.

Definition at line 95 of file UdpTransport.h.

Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().

std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::pending_server_link_keys_ [private]

Locked by connections_lock_. These are passive-side PriorityKeys that have finished handshaking, but have not been processed by accept_datalink()

Definition at line 100 of file UdpTransport.h.

Referenced by accept_datalink(), and passive_connection().

UdpDataLink_rch OpenDDS::DCPS::UdpTransport::server_link_ [private]

The single datalink for the passive side. No locking required.

Definition at line 77 of file UdpTransport.h.

Referenced by accept_datalink(), configure_i(), passive_connection(), and shutdown_i().

std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::server_link_keys_ [private]

Locked by connections_lock_. These are passive-side PriorityKeys that have been fully associated (processed by accept_datalink() and finished handshaking). They are ready for use and reuse via server_link_.

Definition at line 88 of file UdpTransport.h.

Referenced by accept_datalink(), and passive_connection().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:40 2016 for OpenDDS by  doxygen 1.4.7