OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::TcpTransport Class Reference

#include <TcpTransport.h>

Inheritance diagram for OpenDDS::DCPS::TcpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpTransport:
Collaboration graph
[legend]

Classes

class  Connector
 

Public Member Functions

 TcpTransport (const TcpInst_rch &inst)
 
virtual ~TcpTransport ()
 
int fresh_link (TcpConnection_rch connection)
 
virtual void unbind_link (DataLink *link)
 Remove any pending_release mappings. More...
 
TcpInst_rch config () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportImpl
virtual ~TransportImpl ()
 
bool release_link_resources (DataLink *link)
 
TransportInst_rch config () const
 
virtual void register_for_reader (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
 
virtual void unregister_for_reader (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void update_locators (const GUID_t &, const TransportLocatorSeq &)
 
virtual void get_last_recv_locator (const GUID_t &, TransportLocator &)
 
virtual void rtps_relay_address_change ()
 
virtual void append_transport_statistics (TransportStatisticsSequence &)
 
ACE_Reactor_Timer_Interfacetimer () const
 Interface to the transport's reactor for scheduling timers. More...
 
ACE_Reactorreactor () const
 
ACE_thread_t reactor_owner () const
 
bool is_shut_down () const
 
void create_reactor_task (bool useAsyncSend=false, const OPENDDS_STRING &name="")
 
void dump ()
 Diagnostic aid. More...
 
OPENDDS_STRING dump_to_str ()
 
void report ()
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
virtual void rtps_relay_only_now (bool)
 
virtual void use_rtps_relay_now (bool)
 
virtual void use_ice_now (bool)
 
ReactorTask_rch reactor_task ()
 
EventDispatcher_rch event_dispatcher ()
 
int acquire ()
 
int tryacquire ()
 
int release ()
 
int remove ()
 
bool connection_info (TransportLocator &local_info, ConnectionInfoFlags flags) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef ACE_Hash_Map_Manager_Ex< PriorityKey, TcpDataLink_rch, ACE_Hash< PriorityKey >, ACE_Equal_To< PriorityKey >, ACE_Null_MutexAddrLinkMap
 Map Type: (key) PriorityKey to (value) TcpDataLink_rch. More...
 
typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef ConditionVariable< LockTypeConditionVariableType
 

Private Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual void stop_accepting_or_connecting (const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
 
virtual bool configure_i (const TcpInst_rch &config)
 
virtual void client_stop (const GUID_t &local_id)
 
virtual void shutdown_i ()
 
virtual bool connection_info_i (TransportLocator &local_info, ConnectionInfoFlags flags) const
 
virtual void release_datalink (DataLink *link)
 Called by the DataLink to release itself. More...
 
virtual std::string transport_type () const
 
void async_connect_failed (const PriorityKey &key)
 
void passive_connection (const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
 
bool find_datalink_i (const PriorityKey &key, TcpDataLink_rch &link)
 
int connect_tcp_datalink (TcpDataLink &link, const TcpConnection_rch &connection)
 Common code used by accept_datalink(), passive_connection(), and active completion. More...
 
PriorityKey blob_to_key (const TransportBLOB &remote, Priority priority, bool active)
 
typedef OPENDDS_MAP (PriorityKey, TcpDataLink_rch) LinkMap
 
typedef OPENDDS_MAP (PriorityKey, TcpConnection_rch) ConnectionMap
 

Private Attributes

unique_ptr< TcpAcceptoracceptor_
 Used to accept passive connections on our local_address_. More...
 
Connector connector_
 Open TcpConnections using non-blocking connect. More...
 
AddrLinkMap links_
 This is the map of connected DataLinks. More...
 
AddrLinkMap pending_release_links_
 
LockType links_lock_
 This lock is used to protect the links_ data member. More...
 
ConnectionMap connections_
 
LockType connections_lock_
 
Atomic< size_t > last_link_
 

Friends

class TcpConnection
 
class TcpDataLink
 

Additional Inherited Members

- Public Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType lock_
 Lock to protect the config_ and reactor_task_ data members. More...
 
WeakRcHandle< TransportInstconfig_
 
ReactorTask_rch reactor_task_
 
EventDispatcher_rch event_dispatcher_
 smart ptr to the associated DL cleanup task More...
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
- Protected Types inherited from OpenDDS::DCPS::TransportImpl
typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportImpl
 TransportImpl (TransportInst_rch config)
 
bool open ()
 
typedef OPENDDS_MULTIMAP (TransportClient_wrch, DataLink_rch) PendConnMap
 
void add_pending_connection (const TransportClient_rch &client, DataLink_rch link)
 
void shutdown ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType pending_connections_lock_
 Lock to protect the pending_connections_ data member. More...
 
PendConnMap pending_connections_
 
AtomicBool is_shut_down_
 Id of the last link established. More...
 

Detailed Description

This class provides the "Tcp" transport specific implementation. It creates the acceptor for listening the incoming requests using TCP and maintains a collection of TCP specific connections/datalinks.

Notes about object ownership: 1) Own the datalink objects, passive connection objects, acceptor object and TcpConnectionReplaceTask object(used during reconnecting). 2) Reference to TransportReactorTask object owned by base class.

Definition at line 48 of file TcpTransport.h.

Member Typedef Documentation

◆ AddrLinkMap

Map Type: (key) PriorityKey to (value) TcpDataLink_rch.

Definition at line 121 of file TcpTransport.h.

◆ ConditionVariableType

Definition at line 128 of file TcpTransport.h.

◆ GuardType

Definition at line 127 of file TcpTransport.h.

◆ LockType

Definition at line 126 of file TcpTransport.h.

Constructor & Destructor Documentation

◆ TcpTransport()

OpenDDS::DCPS::TcpTransport::TcpTransport ( const TcpInst_rch inst)
explicit

Definition at line 36 of file TcpTransport.cpp.

References configure_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::open(), and OpenDDS::DCPS::TransportImpl::shutdown().

37  : TransportImpl(inst)
38  , acceptor_(new TcpAcceptor(RcHandle<TcpTransport>(this, inc_count())))
39  , last_link_(0)
40 {
41  DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
42 
43  if (!(configure_i(inst) && open())) {
44  this->shutdown();
45  throw Transport::UnableToCreate();
46  }
47 
48 }
Atomic< size_t > last_link_
Definition: TcpTransport.h:168
virtual bool configure_i(const TcpInst_rch &config)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
unique_ptr< TcpAcceptor > acceptor_
Used to accept passive connections on our local_address_.
Definition: TcpTransport.h:144
TransportImpl(TransportInst_rch config)

◆ ~TcpTransport()

OpenDDS::DCPS::TcpTransport::~TcpTransport ( )
virtual

Definition at line 50 of file TcpTransport.cpp.

References DBG_ENTRY_LVL.

51 {
52  DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
53 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ accept_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::accept_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
privatevirtual

accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 219 of file TcpTransport.cpp.

References ACE_ERROR, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::DataLink::add_on_start_callback(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), OpenDDS::DCPS::LogGuid::c_str(), connect_tcp_datalink(), connections_, connections_lock_, DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::do_association_actions(), find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), links_, links_lock_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::rchandle_from(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::RcHandle< T >::reset(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.

222 {
223  DBG_ENTRY_LVL("TcpTransport", "accept_datalink", 6);
224 
225  if (is_shut_down()) {
226  return AcceptConnectResult();
227  }
228 
229 
230  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
231  "accepting connection from remote %C\n",
232  LogGuid(attribs.local_id_).c_str(),
233  LogGuid(remote.repo_id_).c_str()), 5);
234 
235  const PriorityKey key =
236  blob_to_key(remote.blob_, attribs.priority_, false /* !active */);
237 
238  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
239  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n", attribs.priority_,
240  LogAddr(key.address()).c_str(), key.is_loopback(), key.is_active()), 2);
241 
242  TcpDataLink_rch link;
243  {
244  GuardType guard(links_lock_);
245 
246  if (find_datalink_i(key, link)) {
247  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink found datalink link[%@]\n", link.in()), 0);
248  link->add_on_start_callback(client, remote.repo_id_);
249  add_pending_connection(client, link);
250  guard.release();
251  link->do_association_actions();
252  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
253  }
254 
255  link = make_rch<TcpDataLink>(rchandle_from(this), key.address(), key.priority(),
256  key.is_loopback(), key.is_active());
257  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink create new link[%@]\n", link.in()), 0);
258  if (links_.bind(key, link) != 0 /*OK*/) {
259  ACE_ERROR((LM_ERROR,
260  "(%P|%t) ERROR: TcpTransport::accept_datalink "
261  "Unable to bind new TcpDataLink[%@] to "
262  "TcpTransport in links_ map.\n", link.in()));
263  return AcceptConnectResult();
264  }
265 
266  link->add_on_start_callback(client, remote.repo_id_);
267  add_pending_connection(client, link);
268  }
269 
270  TcpConnection_rch connection;
271  {
273  const ConnectionMap::iterator iter = connections_.find(key);
274 
275  if (iter != connections_.end()) {
276  connection = iter->second;
277  connections_.erase(iter);
278  }
279  }
280 
281  if (connection.is_nil()) {
282  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
283  }
284 
285  if (connect_tcp_datalink(*link, connection) == -1) {
286  GuardType guard(links_lock_);
287  links_.unbind(key);
288  link.reset();
289  }
290 
291  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
292  "connected link %@.\n", link.in()), 2);
293  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
294 }
#define ACE_ERROR(X)
sequence< octet > key
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, bool active)
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
RcHandle< TcpConnection > TcpConnection_rch
bool find_datalink_i(const PriorityKey &key, TcpDataLink_rch &link)
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
RcHandle< TcpDataLink > TcpDataLink_rch
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define VDBG_LVL(DBG_ARGS, LEVEL)
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)

◆ async_connect_failed()

void OpenDDS::DCPS::TcpTransport::async_connect_failed ( const PriorityKey key)
private

Definition at line 171 of file TcpTransport.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), links_, links_lock_, LM_WARNING, ACE_Guard< ACE_LOCK >::release(), and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind().

Referenced by OpenDDS::DCPS::TcpConnection::active_open(), and OpenDDS::DCPS::TcpConnection::handle_timeout().

172 {
173  if (DCPS_debug_level >= 2) {
174  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: Failed to make active connection.\n"));
175  }
176  GuardType guard(links_lock_);
177  TcpDataLink_rch link;
178  links_.find(key, link);
179  links_.unbind(key);
180  guard.release();
181 
182  if (link.in()) {
183  link->invoke_on_start_callbacks(false);
184  }
185 }
#define ACE_DEBUG(X)
sequence< octet > key
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
int unbind(const EXT_ID &ext_id)
int find(const EXT_ID &ext_id, INT_ID &int_id) const
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RcHandle< TcpDataLink > TcpDataLink_rch
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155

◆ blob_to_key()

PriorityKey OpenDDS::DCPS::TcpTransport::blob_to_key ( const TransportBLOB remote,
Priority  priority,
bool  active 
)
private

Definition at line 63 of file TcpTransport.cpp.

References config(), and OpenDDS::DCPS::AssociationData::get_remote_address().

Referenced by accept_datalink(), and connect_datalink().

66 {
67  const ACE_INET_Addr remote_address = AssociationData::get_remote_address(remote);
68  TcpInst_rch cfg = config();
69  const bool is_loopback = cfg && (remote_address == cfg->local_address());
70  return PriorityKey(priority, remote_address, is_loopback, active);
71 }
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
static ACE_INET_Addr get_remote_address(const TransportBLOB &remote)
TcpInst_rch config() const

◆ client_stop()

void OpenDDS::DCPS::TcpTransport::client_stop ( const GUID_t local_id)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 390 of file TcpTransport.cpp.

References ACE_Hash_Map_Entry< EXT_ID, INT_ID >::int_id_, links_, links_lock_, ACE_Hash_Map_Iterator_Base_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::next(), and pending_release_links_.

391 {
392  GuardType guard(links_lock_);
393 
394  AddrLinkMap::ENTRY* entry;
395 
396  for (AddrLinkMap::ITERATOR itr(links_); itr.next(entry); itr.advance()) {
397  entry->int_id_->client_stop(local_id);
398  }
399 
400  for (AddrLinkMap::ITERATOR itr(pending_release_links_); itr.next(entry); itr.advance()) {
401  entry->int_id_->client_stop(local_id);
402  }
403 }
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
ACE_Hash_Map_Iterator_Ex< PriorityKey, TcpDataLink_rch, ACE_Hash< PriorityKey >, ACE_Equal_To< PriorityKey >, ACE_Null_Mutex > ITERATOR

◆ config()

TcpInst_rch OpenDDS::DCPS::TcpTransport::config ( ) const

Definition at line 57 of file TcpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::config(), and OpenDDS::DCPS::dynamic_rchandle_cast().

Referenced by blob_to_key(), connect_datalink(), connect_tcp_datalink(), connection_info_i(), fresh_link(), and passive_connection().

58 {
60 }
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
TransportInst_rch config() const

◆ configure_i()

bool OpenDDS::DCPS::TcpTransport::configure_i ( const TcpInst_rch config)
privatevirtual

Definition at line 319 of file TcpTransport.cpp.

References acceptor_, ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), connector_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), DBG_ENTRY_LVL, OpenDDS::DCPS::get_fully_qualified_hostname(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::ReactorTask::get_reactor(), hostname(), LM_DEBUG, LM_ERROR, ACE_Connector< class, class >::open(), OpenDDS::DCPS::TransportImpl::reactor_task(), TheServiceParticipant, and VDBG_LVL.

Referenced by TcpTransport().

320 {
321  DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
322 
323  if (!config) {
324  return false;
325  }
326 
327  this->create_reactor_task(false, "TcpTransport" + config->name());
328 
329  connector_.open(reactor_task()->get_reactor());
330 
331  // Override with DCPSDefaultAddress.
332  if (config->local_address() == ACE_INET_Addr() &&
333  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
334  VDBG_LVL((LM_DEBUG,
335  ACE_TEXT("(%P|%t) TcpTransport::configure_i overriding with DCPSDefaultAddress\n")), 2);
336 
337  config->local_address(TheServiceParticipant->default_address().to_addr());
338  }
339 
340  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::configure_i opening acceptor for %C on %C\n"),
341  config->local_address_string().c_str(), LogAddr(config->local_address()).c_str()), 2);
342 
343  // Open our acceptor object so that we can accept passive connections
344  // on our config->local_address_.
345  if (this->acceptor_->open(config->local_address(),
346  this->reactor_task()->get_reactor()) != 0) {
347  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C: %p\n"),
348  LogAddr(config->local_address()).c_str(), ACE_TEXT("open")), false);
349  }
350 
351  // update the port number (incase port zero was given).
352  ACE_INET_Addr address;
353 
354  if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
355  ACE_ERROR((LM_ERROR,
356  ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
357  ACE_TEXT("- %p"),
358  ACE_TEXT("cannot get local addr\n")));
359  }
360 
361  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C\n"),
362  LogAddr(address).c_str()), 2);
363 
364  const unsigned short port = address.get_port_number();
365 
366  // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully
367  // qualified hostname and actual listening port number.
368  if (config->local_address().is_any()) {
369  const std::string hostname = get_fully_qualified_hostname();
370  config->local_address(port, hostname.c_str());
371  if (config->local_address() == ACE_INET_Addr()) {
372  ACE_ERROR_RETURN((LM_ERROR,
373  ACE_TEXT("(%P|%t) ERROR: Failed to resolve a local address using fully qualified hostname '%C'\n"),
374  hostname.c_str()),
375  false);
376  }
377  }
378 
379  // Now we got the actual listening port. Update the port number in the configuration
380  // if it's 0 originally.
381  else if (config->local_address().get_port_number() == 0) {
382  config->local_address_set_port(port);
383  }
384 
385  // Ahhh... The sweet smell of success!
386  return true;
387 }
#define ACE_ERROR(X)
ReactorTask_rch reactor_task()
Connector connector_
Open TcpConnections using non-blocking connect.
Definition: TcpTransport.h:152
virtual int open(ACE_Reactor *r=ACE_Reactor::instance(), int flags=0)
TcpInst_rch config() const
int hostname(char name[], size_t maxnamelen)
ACE_TEXT("TCP_Factory")
u_short get_port_number(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant
String get_fully_qualified_hostname(ACE_INET_Addr *addr)
unique_ptr< TcpAcceptor > acceptor_
Used to accept passive connections on our local_address_.
Definition: TcpTransport.h:144
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ connect_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::connect_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
privatevirtual

connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 74 of file TcpTransport.cpp.

References ACE_ERROR, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::DataLink::add_on_start_callback(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), config(), ACE_Connector< class, class >::connect(), connector_, DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::do_association_actions(), EWOULDBLOCK, find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::TransportImpl::is_shut_down(), links_, links_lock_, LM_DEBUG, LM_ERROR, ACE_Time_Value::msec(), OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::DataLink::transport_priority(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), ACE_Synch_Options::USE_REACTOR, ACE_Synch_Options::USE_TIMEOUT, and VDBG_LVL.

77 {
78  DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
79 
80  if (is_shut_down()) {
81  return AcceptConnectResult();
82  }
83 
84  const PriorityKey key =
85  blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
86 
87  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
88  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
89  key.priority(), LogAddr(key.address()).c_str(), key.is_loopback(),
90  key.is_active()), 0);
91 
92  TcpDataLink_rch link;
93  {
94  GuardType guard(links_lock_);
95 
96  if (find_datalink_i(key, link)) {
97  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
98  link->add_on_start_callback(client, remote.repo_id_);
99  add_pending_connection(client, link);
100  link->do_association_actions();
101  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
102  }
103 
104  link = make_rch<TcpDataLink>(rchandle_from(this), key.address(), attribs.priority_,
105  key.is_loopback(), true /*active*/);
106  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
107  if (links_.bind(key, link) != 0 /*OK*/) {
108  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
109  "Unable to bind new TcpDataLink[%@] to "
110  "TcpTransport in links_ map.\n", link.in()));
111  return AcceptConnectResult();
112  }
113 
114  link->add_on_start_callback(client, remote.repo_id_);
115  add_pending_connection(client, link);
116  }
117 
118  int ret = -1; // Default to failure in case config() is returns an invalid RCH
119  errno = EINVAL; // Anything other than EWOULDBLOCK
120 
121  TcpInst_rch cfg = config();
122  if (cfg) {
123  TcpConnection_rch connection(make_rch<TcpConnection>(key.address(), link->transport_priority(), cfg));
124  connection->set_datalink(link);
125 
126  TcpConnection* pConn = connection.in();
127 
128  // Can't make this call while holding onto TransportClient::lock_
129  ACE_Time_Value conn_timeout;
130  conn_timeout.msec(cfg->active_conn_timeout_period_);
131 
133  }
134 
135  if (ret == -1 && errno != EWOULDBLOCK) {
136 
137  VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
138  ACE_ERROR((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
139  //If the connection fails and, in the interim between releasing
140  //lock and re-acquiring to remove the failed link, another association may have found
141  //the datalink in links_ (always using find_datalink_i) so must allow the other
142  //association to either try to connect again (might succeed for it)
143  //or try another transport. If find_datalink_i was called for this datalink, an
144  //on_start_callback will be registered and can be invoked.
145  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
146  {
147  GuardType guard(links_lock_);
148  if (links_.unbind(key, link) != 0 /*OK*/) {
149  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
150  "Unable to unbind failed TcpDataLink[%@] from "
151  "TcpTransport links_ map.\n", link.in()));
152  }
153  }
154  link->invoke_on_start_callbacks(false);
155 
156  return AcceptConnectResult();
157  }
158 
159  if (ret == 0) {
160  // connect() completed synchronously and called TcpConnection::active_open().
161  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
162  "completed synchronously.\n"), 0);
163  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
164  }
165 
166  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
167  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
168 }
#define ACE_ERROR(X)
virtual int connect(SVC_HANDLER *&svc_handler, const typename PEER_CONNECTOR::PEER_ADDR &remote_addr, const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, const typename PEER_CONNECTOR::PEER_ADDR &local_addr=reinterpret_cast< const peer_addr_type & >(peer_addr_type::sap_any), int reuse_addr=0, int flags=O_RDWR, int perms=0)
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
sequence< octet > key
Connector connector_
Open TcpConnections using non-blocking connect.
Definition: TcpTransport.h:152
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, bool active)
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
RcHandle< TcpConnection > TcpConnection_rch
bool find_datalink_i(const PriorityKey &key, TcpDataLink_rch &link)
TcpInst_rch config() const
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
RcHandle< TcpDataLink > TcpDataLink_rch
unsigned long msec(void) const
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define VDBG_LVL(DBG_ARGS, LEVEL)
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)

◆ connect_tcp_datalink()

int OpenDDS::DCPS::TcpTransport::connect_tcp_datalink ( TcpDataLink link,
const TcpConnection_rch connection 
)
private

Common code used by accept_datalink(), passive_connection(), and active completion.

Code common to make_active_connection() and make_passive_connection().

Definition at line 674 of file TcpTransport.cpp.

References ACE_DEBUG, ACE_TEXT(), config(), OpenDDS::DCPS::TcpDataLink::connect(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, last_link_, LM_DEBUG, OpenDDS::DCPS::Atomic< T >::load(), OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), and OpenDDS::DCPS::DataLink::transport_priority().

Referenced by accept_datalink(), OpenDDS::DCPS::TcpConnection::active_open(), passive_connection(), and OpenDDS::DCPS::TcpDataLink::reconnect().

676 {
677  DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
678 
679  if (link.reuse_existing_connection(connection) == 0) {
680  return 0;
681  }
682 
683  TcpInst_rch cfg = config();
684  if (!cfg) {
685  return -1;
686  }
687 
688  ++last_link_;
689 
690  if (DCPS_debug_level > 4) {
691  ACE_DEBUG((LM_DEBUG,
692  ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
693  ACE_TEXT("creating send strategy with priority %d.\n"),
694  last_link_.load(), link.transport_priority()));
695  }
696 
697  connection->id() = last_link_;
698 
699  TcpSendStrategy_rch send_strategy (
700  make_rch<TcpSendStrategy>(last_link_.load(), ref(link),
701  new TcpSynchResource(link,
702  cfg->max_output_pause_period_),
703  this->reactor_task(), link.transport_priority()));
704 
705  TcpReceiveStrategy_rch receive_strategy(
706  make_rch<TcpReceiveStrategy>(ref(link), this->reactor_task()));
707 
708  if (link.connect(connection, send_strategy, receive_strategy) != 0) {
709  return -1;
710  }
711 
712  return 0;
713 }
#define ACE_DEBUG(X)
ReactorTask_rch reactor_task()
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
Atomic< size_t > last_link_
Definition: TcpTransport.h:168
TcpInst_rch config() const
T load() const
Definition: Atomic.h:33
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< TcpReceiveStrategy > TcpReceiveStrategy_rch
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ connection_info_i()

bool OpenDDS::DCPS::TcpTransport::connection_info_i ( TransportLocator local_info,
ConnectionInfoFlags  flags 
) const
privatevirtual

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 470 of file TcpTransport.cpp.

References config(), DBG_ENTRY_LVL, LM_DEBUG, and VDBG_LVL.

471 {
472  DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
473 
474  TcpInst_rch cfg = config();
475  if (cfg) {
476  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address string <%C>\n",
477  cfg->get_public_address().c_str()), 2);
478 
479  cfg->populate_locator(local_info, flags);
480  return true;
481  }
482 
483  return false;
484 }
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
TcpInst_rch config() const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ find_datalink_i()

bool OpenDDS::DCPS::TcpTransport::find_datalink_i ( const PriorityKey key,
TcpDataLink_rch link 
)
private

Definition at line 189 of file TcpTransport.cpp.

References ACE_TEXT(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::DataLink::cancel_release(), DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, pending_release_links_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::TcpDataLink::set_release_pending(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.

Referenced by accept_datalink(), and connect_datalink().

190 {
191  DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
192 
193  if (links_.find(key, link) == 0 /*OK*/) {
194  return true;
195  } else if (pending_release_links_.find(key, link) == 0 /*OK*/) {
196  if (link->cancel_release()) {
197  link->set_release_pending(false);
198 
199  if (pending_release_links_.unbind(key, link) == 0 /*OK*/
200  && links_.bind(key, link) == 0 /*OK*/) {
201  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
202  ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
203  return true;
204  }
205  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
206  ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
207  } else {
208  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
209  ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
210  }
211  link.reset(); // don't return link to TransportClient
212  return false;
213  }
214 
215  return false;
216 }
sequence< octet > key
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
int find(const EXT_ID &ext_id, INT_ID &int_id) const
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ fresh_link()

int OpenDDS::DCPS::TcpTransport::fresh_link ( TcpConnection_rch  connection)

This function is called by the TcpReconnectTask thread to check if the passively accepted connection is the re-established connection. If it is, then the "old" connection object in the datalink is replaced by the "new" connection object.

Definition at line 719 of file TcpTransport.cpp.

References config(), DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::TcpDataLink::get_connection(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), links_, links_lock_, and OpenDDS::DCPS::TcpDataLink::reconnect().

Referenced by passive_connection().

720 {
721  DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
722 
723  TcpDataLink_rch link;
724  GuardType guard(this->links_lock_);
725 
726  if (is_shut_down()) {
727  return 0;
728  }
729 
730  TcpInst_rch cfg = config();
731  if (!cfg) {
732  return -1;
733  }
734 
735  PriorityKey key(connection->transport_priority(),
736  connection->get_remote_address(),
737  connection->get_remote_address() == cfg->local_address(),
738  connection->is_connector());
739 
740  if (this->links_.find(key, link) == 0) {
741  TcpConnection_rch old_con = link->get_connection();
742 
743  // The connection is accepted but may not be associated with the datalink
744  // at this point. The thread calling add_associations() will associate
745  // the datalink with the connection in make_passive_connection().
746  if (old_con.is_nil()) {
747  return 0;
748  }
749 
750  if (old_con.in() != connection.in())
751  // Replace the "old" connection object with the "new" connection object.
752  {
753  return link->reconnect(connection);
754  }
755  }
756 
757  return 0;
758 }
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
sequence< octet > key
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
RcHandle< TcpConnection > TcpConnection_rch
TcpInst_rch config() const
int find(const EXT_ID &ext_id, INT_ID &int_id) const
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
RcHandle< TcpDataLink > TcpDataLink_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155

◆ OPENDDS_MAP() [1/2]

typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP ( PriorityKey  ,
TcpDataLink_rch   
)
private

◆ OPENDDS_MAP() [2/2]

typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP ( PriorityKey  ,
TcpConnection_rch   
)
private

◆ passive_connection()

void OpenDDS::DCPS::TcpTransport::passive_connection ( const ACE_INET_Addr remote_address,
const TcpConnection_rch connection 
)
private

Called by the TcpConnection object when it has been created by the acceptor and needs to be attached to a DataLink. The DataLink may or may not already be created and waiting for this passive connection to appear.

This method is called by a TcpConnection object that has been created and opened by our acceptor_ as a result of passively accepting a connection on our local address. Ultimately, the connection object needs to be paired with a DataLink object that is (or will be) expecting this passive connection to be established.

Definition at line 600 of file TcpTransport.cpp.

References ACE_ERROR, ACE_TEXT(), config(), connect_tcp_datalink(), connections_, connections_lock_, DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), fresh_link(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), links_, links_lock_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::release(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpConnection::handle_setup_input().

602 {
603  DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
604 
605  if (is_shut_down()) {
606  return;
607  }
608 
609  TcpInst_rch cfg = config();
610  if (!cfg) {
611  return;
612  }
613 
614  const PriorityKey key(connection->transport_priority(),
615  remote_address,
616  remote_address == cfg->local_address(),
617  connection->is_connector());
618 
619  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
620  ACE_TEXT("established with %C.\n"),
621  LogAddr(remote_address).c_str()), 2);
622 
623  GuardType connection_guard(connections_lock_);
624  TcpDataLink_rch link;
625  {
626  GuardType guard(links_lock_);
627  links_.find(key, link);
628  }
629 
630  if (!link.is_nil()) {
631  connection_guard.release();
632 
633  if (connect_tcp_datalink(*link, connection) == -1) {
634  VDBG_LVL((LM_ERROR,
635  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
636  ACE_TEXT("ERROR: connect_tcp_datalink failed\n")), 5);
637  GuardType guard(links_lock_);
638  links_.unbind(key);
639 
640  } else {
641  this->fresh_link(connection);
642  }
643 
644  return;
645  }
646 
647  // If we reach this point, this link was not in links_, so the
648  // accept_datalink() call hasn't happened yet. Store in connections_ for the
649  // accept_datalink() method to find.
650  VDBG_LVL((LM_DEBUG,
651  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - # of before connections: %d\n"),
652  connections_.size()), 5);
653  const ConnectionMap::iterator where = connections_.find(key);
654 
655  if (where != connections_.end()) {
656  ACE_ERROR((LM_ERROR,
657  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
658  ACE_TEXT("ERROR: connection with %C at priority %d already exists, ")
659  ACE_TEXT("overwriting previously established connection.\n"),
660  LogAddr(remote_address).c_str(),
661  connection->transport_priority()));
662  }
663 
664  connections_[key] = connection;
665  VDBG_LVL((LM_DEBUG,
666  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - # of after connections: %d\n"),
667  connections_.size()), 5);
668 
669  this->fresh_link(connection);
670 }
#define ACE_ERROR(X)
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
sequence< octet > key
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
TcpInst_rch config() const
int unbind(const EXT_ID &ext_id)
int find(const EXT_ID &ext_id, INT_ID &int_id) const
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
ACE_TEXT("TCP_Factory")
RcHandle< TcpDataLink > TcpDataLink_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define VDBG_LVL(DBG_ARGS, LEVEL)
int fresh_link(TcpConnection_rch connection)

◆ release_datalink()

void OpenDDS::DCPS::TcpTransport::release_datalink ( DataLink link)
privatevirtual

Called by the DataLink to release itself.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 487 of file TcpTransport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::DataLink::datalink_release_delay(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), links_, links_lock_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), pending_release_links_, OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), OpenDDS::DCPS::DataLink::schedule_stop(), OpenDDS::DCPS::TcpDataLink::set_release_pending(), OpenDDS::DCPS::DataLink::set_scheduling_release(), OpenDDS::DCPS::TimeDuration::str(), OpenDDS::DCPS::DataLink::transport_priority(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), VDBG_LVL, and OpenDDS::DCPS::TimeDuration::zero_value.

488 {
489  DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
490 
491  TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
492 
493  if (tcp_link == 0) {
494  // Really an assertion failure
495  ACE_ERROR((LM_ERROR,
496  "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
497  "TcpDataLink.\n"));
498  return;
499  }
500 
501  TcpDataLink_rch released_link;
502 
503  // Possible actions that will be taken to release the link.
504  enum LinkAction { None, StopLink, ScheduleLinkRelease };
505  LinkAction linkAction = None;
506 
507  // Scope for locking to protect the links (and pending_release) containers.
508  GuardType guard(this->links_lock_);
509 
510  // Attempt to remove the TcpDataLink from our links_ map.
511  PriorityKey key(
512  tcp_link->transport_priority(),
513  tcp_link->remote_address(),
514  tcp_link->is_loopback(),
515  tcp_link->is_active());
516 
517  VDBG_LVL((LM_DEBUG,
518  "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
519  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
520  link,
521  tcp_link->transport_priority(),
522  LogAddr(tcp_link->remote_address()).c_str(),
523  (int)tcp_link->is_loopback(),
524  (int)tcp_link->is_active()), 2);
525 
526  if (this->links_.unbind(key, released_link) != 0) {
527  //No op
528  } else if (link->datalink_release_delay() > TimeDuration::zero_value) {
529  link->set_scheduling_release(true);
530 
531  VDBG_LVL((LM_DEBUG,
532  "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
533  "is %C\n",
534  link->datalink_release_delay().str().c_str()), 4);
535 
536  // Atomic value update, safe to perform here.
537  released_link->set_release_pending(true);
538 
539  switch (this->pending_release_links_.bind(key, released_link)) {
540  case -1:
541  ACE_ERROR((LM_ERROR,
542  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
543  "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
544  linkAction = StopLink;
545  break;
546 
547  case 1:
548  ACE_ERROR((LM_ERROR,
549  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
550  "pending_release_links_ map: already bound\n", released_link.in()));
551  linkAction = StopLink;
552  break;
553 
554  case 0:
555  linkAction = ScheduleLinkRelease;
556  break;
557 
558  default:
559  break;
560  }
561 
562  } else { // datalink_release_delay_ is 0
563  link->set_scheduling_release(true);
564 
565  linkAction = StopLink;
566  }
567 
568  // Actions are executed outside of the lock scope.
569  switch (linkAction) {
570  case StopLink:
571  link->schedule_stop(MonotonicTimePoint::now());
572  break;
573 
574  case ScheduleLinkRelease:
575  link->schedule_delayed_release();
576  break;
577 
578  case None:
579  break;
580  }
581 
582  if (DCPS_debug_level > 9) {
583  std::stringstream buffer;
584  buffer << *link;
585  ACE_DEBUG((LM_DEBUG,
586  ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
587  ACE_TEXT("link[%@] with priority %d released.\n%C"),
588  link,
589  link->transport_priority(),
590  buffer.str().c_str()));
591  }
592 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
sequence< octet > key
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
static const TimeDuration zero_value
Definition: TimeDuration.h:31
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< TcpDataLink > TcpDataLink_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ shutdown_i()

void OpenDDS::DCPS::TcpTransport::shutdown_i ( )
privatevirtual

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 406 of file TcpTransport.cpp.

References acceptor_, connections_, connections_lock_, DBG_ENTRY_LVL, ACE_Hash_Map_Entry< EXT_ID, INT_ID >::int_id_, links_, links_lock_, ACE_Hash_Map_Iterator_Base_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::next(), OpenDDS::DCPS::TransportImpl::pending_connections_, OpenDDS::DCPS::TransportImpl::pending_connections_lock_, pending_release_links_, and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind_all().

407 {
408  DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
409 
410  {
411  GuardType guard(links_lock_);
412 
413  AddrLinkMap::ENTRY* entry;
414 
415  for (AddrLinkMap::ITERATOR itr(links_);
416  itr.next(entry);
417  itr.advance()) {
418  entry->int_id_->pre_stop_i();
419  }
420  }
421 
422  // Don't accept any more connections.
423  acceptor_->close();
424  acceptor_->transport_shutdown();
425 
426  {
427  {
429 
430  for (ConnectionMap::iterator it = connections_.begin(); it != connections_.end(); ++it) {
431  it->second->shutdown();
432  }
433  connections_.clear();
434  }
435  {
437  pending_connections_.clear();
438  }
439  }
440 
441  // Disconnect all of our DataLinks, and clear our links_ collection.
442  {
443  GuardType guard(links_lock_);
444 
445  AddrLinkMap::ENTRY* entry;
446 
447  for (AddrLinkMap::ITERATOR itr(links_);
448  itr.next(entry);
449  itr.advance()) {
450  entry->int_id_->transport_shutdown();
451  }
452 
453  links_.unbind_all();
454 
456  itr.next(entry);
457  itr.advance()) {
458  entry->int_id_->transport_shutdown();
459  }
460 
462  }
463 
464  // Tell our acceptor about this event so that it can drop its reference
465  // it holds to this TcpTransport object (via smart-pointer).
466  acceptor_->transport_shutdown();
467 }
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
ACE_Hash_Map_Iterator_Ex< PriorityKey, TcpDataLink_rch, ACE_Hash< PriorityKey >, ACE_Equal_To< PriorityKey >, ACE_Null_Mutex > ITERATOR
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
unique_ptr< TcpAcceptor > acceptor_
Used to accept passive connections on our local_address_.
Definition: TcpTransport.h:144

◆ stop_accepting_or_connecting()

void OpenDDS::DCPS::TcpTransport::stop_accepting_or_connecting ( const TransportClient_wrch client,
const GUID_t remote_id,
bool  disassociate,
bool  association_failed 
)
privatevirtual

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 297 of file TcpTransport.cpp.

References LM_DEBUG, OpenDDS::DCPS::TransportImpl::pending_connections_, OpenDDS::DCPS::TransportImpl::pending_connections_lock_, and VDBG_LVL.

301 {
302  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
303  "stop connecting to remote: %C\n",
304  LogGuid(remote_id).c_str()), 5);
305 
307  typedef PendConnMap::iterator iter_t;
308  const std::pair<iter_t, iter_t> range =
309  pending_connections_.equal_range(client);
310 
311  for (iter_t iter = range.first; iter != range.second; ++iter) {
312  iter->second->remove_on_start_callback(client, remote_id);
313  }
314 
315  pending_connections_.erase(range.first, range.second);
316 }
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
#define VDBG_LVL(DBG_ARGS, LEVEL)
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.

◆ transport_type()

virtual std::string OpenDDS::DCPS::TcpTransport::transport_type ( ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 86 of file TcpTransport.h.

86 { return "tcp"; }

◆ unbind_link()

void OpenDDS::DCPS::TcpTransport::unbind_link ( DataLink link)
virtual

Remove any pending_release mappings.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 761 of file TcpTransport.cpp.

References ACE_ERROR, OpenDDS::DCPS::DataLink::datalink_release_delay(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), OpenDDS::DCPS::TimeDuration::is_zero(), links_lock_, LM_DEBUG, LM_ERROR, pending_release_links_, OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::transport_priority(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.

762 {
763  TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
764 
765  if (tcp_link == 0) {
766  // Really an assertion failure
767  ACE_ERROR((LM_ERROR,
768  "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
769  "Failed to downcast DataLink to TcpDataLink.\n"));
770  return;
771  }
772 
773  // Attempt to remove the TcpDataLink from our links_ map.
774  PriorityKey key(
775  tcp_link->transport_priority(),
776  tcp_link->remote_address(),
777  tcp_link->is_loopback(),
778  tcp_link->is_active());
779 
780  VDBG_LVL((LM_DEBUG,
781  "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
782  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
783  link,
784  tcp_link->transport_priority(),
785  LogAddr(tcp_link->remote_address()).c_str(),
786  (int)tcp_link->is_loopback(),
787  (int)tcp_link->is_active()), 2);
788 
789  GuardType guard(this->links_lock_);
790 
791  if (pending_release_links_.unbind(key) && !link->datalink_release_delay().is_zero()) {
792  ACE_ERROR((LM_ERROR,
793  "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
794  "Failed to find link %@ tcp_link %@ PriorityKey "
795  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
796  link,
797  tcp_link,
798  tcp_link->transport_priority(),
799  LogAddr(tcp_link->remote_address()).c_str(),
800  (int)tcp_link->is_loopback(),
801  (int)tcp_link->is_active()));
802  }
803 }
#define ACE_ERROR(X)
sequence< octet > key
ACE_Guard< LockType > GuardType
Definition: TcpTransport.h:127
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
int unbind(const EXT_ID &ext_id)
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
#define VDBG_LVL(DBG_ARGS, LEVEL)

Friends And Related Function Documentation

◆ TcpConnection

friend class TcpConnection
friend

The TcpConnection is our friend. It tells us when it has been created (by our acceptor_), and is seeking the DataLink that should be (or will be) expecting the passive connection.

Definition at line 94 of file TcpTransport.h.

◆ TcpDataLink

friend class TcpDataLink
friend

Definition at line 95 of file TcpTransport.h.

Member Data Documentation

◆ acceptor_

unique_ptr<TcpAcceptor> OpenDDS::DCPS::TcpTransport::acceptor_
private

Used to accept passive connections on our local_address_.

Definition at line 144 of file TcpTransport.h.

Referenced by configure_i(), and shutdown_i().

◆ connections_

ConnectionMap OpenDDS::DCPS::TcpTransport::connections_
private

Map of passive connection objects that need to be paired with a DataLink.

Definition at line 163 of file TcpTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().

◆ connections_lock_

LockType OpenDDS::DCPS::TcpTransport::connections_lock_
private

This protects the connections_ and the pending_connections_ data members.

Definition at line 167 of file TcpTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().

◆ connector_

Connector OpenDDS::DCPS::TcpTransport::connector_
private

◆ last_link_

Atomic<size_t> OpenDDS::DCPS::TcpTransport::last_link_
private

Definition at line 168 of file TcpTransport.h.

Referenced by connect_tcp_datalink().

◆ links_

AddrLinkMap OpenDDS::DCPS::TcpTransport::links_
private

◆ links_lock_

LockType OpenDDS::DCPS::TcpTransport::links_lock_
private

This lock is used to protect the links_ data member.

Definition at line 159 of file TcpTransport.h.

Referenced by accept_datalink(), async_connect_failed(), client_stop(), connect_datalink(), fresh_link(), passive_connection(), release_datalink(), shutdown_i(), and unbind_link().

◆ pending_release_links_

AddrLinkMap OpenDDS::DCPS::TcpTransport::pending_release_links_
private

The documentation for this class was generated from the following files: