69 const bool is_loopback = cfg && (remote_address == cfg->local_address());
70 return PriorityKey(priority, remote_address, is_loopback, active);
88 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
97 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.
in()), 0);
106 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.
in()), 0);
109 "Unable to bind new TcpDataLink[%@] to " 110 "TcpTransport in links_ map.\n", link.
in()));
124 connection->set_datalink(link);
130 conn_timeout.
msec(cfg->active_conn_timeout_period_);
137 VDBG_LVL((
LM_ERROR,
"(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
145 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.
in()), 0);
150 "Unable to unbind failed TcpDataLink[%@] from " 151 "TcpTransport links_ map.\n", link.
in()));
162 "completed synchronously.\n"), 0);
166 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
202 ACE_TEXT(
"found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.
in()), 0);
206 ACE_TEXT(
"found link[%@] in pending release list but was unable to shift back to links_.\n"), link.
in()), 0);
209 ACE_TEXT(
"found link[%@] in pending release list but was unable to cancel release.\n"), link.
in()), 0);
231 "accepting connection from remote %C\n",
239 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n", attribs.
priority_,
247 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::accept_datalink found datalink link[%@]\n", link.
in()), 0);
257 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TcpTransport::accept_datalink create new link[%@]\n", link.
in()), 0);
260 "(%P|%t) ERROR: TcpTransport::accept_datalink " 261 "Unable to bind new TcpDataLink[%@] to " 262 "TcpTransport in links_ map.\n", link.
in()));
273 const ConnectionMap::iterator iter =
connections_.find(key);
276 connection = iter->second;
281 if (connection.
is_nil()) {
292 "connected link %@.\n", link.
in()), 2);
303 "stop connecting to remote: %C\n",
304 LogGuid(remote_id).c_str()), 5);
307 typedef PendConnMap::iterator iter_t;
308 const std::pair<iter_t, iter_t> range =
311 for (iter_t iter = range.first; iter != range.second; ++iter) {
312 iter->second->remove_on_start_callback(client, remote_id);
335 ACE_TEXT(
"(%P|%t) TcpTransport::configure_i overriding with DCPSDefaultAddress\n")), 2);
341 config->local_address_string().c_str(),
LogAddr(config->local_address()).c_str()), 2);
345 if (this->
acceptor_->open(config->local_address(),
354 if (this->
acceptor_->acceptor().get_local_addr(address) != 0) {
356 ACE_TEXT(
"(%P|%t) ERROR: TcpTransport::configure_i ")
358 ACE_TEXT(
"cannot get local addr\n")));
368 if (config->local_address().is_any()) {
370 config->local_address(port, hostname.c_str());
373 ACE_TEXT(
"(%P|%t) ERROR: Failed to resolve a local address using fully qualified hostname '%C'\n"),
381 else if (config->local_address().get_port_number() == 0) {
382 config->local_address_set_port(port);
397 entry->
int_id_->client_stop(local_id);
401 entry->
int_id_->client_stop(local_id);
431 it->second->shutdown();
450 entry->
int_id_->transport_shutdown();
458 entry->
int_id_->transport_shutdown();
477 cfg->get_public_address().c_str()), 2);
479 cfg->populate_locator(local_info, flags);
496 "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to " 504 enum LinkAction { None, StopLink, ScheduleLinkRelease };
505 LinkAction linkAction = None;
518 "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey " 519 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
532 "(%P|%t) TcpTransport::release_datalink datalink_release_delay " 542 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 543 "pending_release_links_ map: %p\n", released_link.
in(),
ACE_TEXT(
"bind")));
544 linkAction = StopLink;
549 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 550 "pending_release_links_ map: already bound\n", released_link.
in()));
551 linkAction = StopLink;
555 linkAction = ScheduleLinkRelease;
565 linkAction = StopLink;
569 switch (linkAction) {
574 case ScheduleLinkRelease:
583 std::stringstream buffer;
586 ACE_TEXT(
"(%P|%t) TcpTransport::release_datalink() - ")
587 ACE_TEXT(
"link[%@] with priority %d released.\n%C"),
589 link->transport_priority(),
590 buffer.str().c_str()));
616 remote_address == cfg->local_address(),
617 connection->is_connector());
621 LogAddr(remote_address).c_str()), 2);
635 ACE_TEXT(
"(%P|%t) TcpTransport::passive_connection() - ")
636 ACE_TEXT(
"ERROR: connect_tcp_datalink failed\n")), 5);
651 ACE_TEXT(
"(%P|%t) TcpTransport::passive_connection() - # of before connections: %d\n"),
657 ACE_TEXT(
"(%P|%t) TcpTransport::passive_connection() - ")
658 ACE_TEXT(
"ERROR: connection with %C at priority %d already exists, ")
659 ACE_TEXT(
"overwriting previously established connection.\n"),
660 LogAddr(remote_address).c_str(),
661 connection->transport_priority()));
666 ACE_TEXT(
"(%P|%t) TcpTransport::passive_connection() - # of after connections: %d\n"),
692 ACE_TEXT(
"(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
693 ACE_TEXT(
"creating send strategy with priority %d.\n"),
702 cfg->max_output_pause_period_),
708 if (link.
connect(connection, send_strategy, receive_strategy) != 0) {
736 connection->get_remote_address(),
737 connection->get_remote_address() == cfg->local_address(),
738 connection->is_connector());
750 if (old_con.
in() != connection.
in())
768 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 769 "Failed to downcast DataLink to TcpDataLink.\n"));
781 "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey " 782 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
793 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 794 "Failed to find link %@ tcp_link %@ PriorityKey " 795 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
AddrLinkMap links_
This is the map of connected DataLinks.
RcHandle< T > rchandle_from(T *pointer)
PendConnMap pending_connections_
const TimeDuration & datalink_release_delay() const
virtual void release_datalink(DataLink *link)
Called by the DataLink to release itself.
virtual int connect(SVC_HANDLER *&svc_handler, const typename PEER_CONNECTOR::PEER_ADDR &remote_addr, const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, const typename PEER_CONNECTOR::PEER_ADDR &local_addr=reinterpret_cast< const peer_addr_type & >(peer_addr_type::sap_any), int reuse_addr=0, int flags=O_RDWR, int perms=0)
bool find_datalink_i(const PriorityKey &key, TcpDataLink_rch &link)
LockType links_lock_
This lock is used to protect the links_ data member.
void passive_connection(const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
TcpTransport(const TcpInst_rch &inst)
Encapsulate a priority value and internet address as a key.
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
void set_scheduling_release(bool scheduling_release)
const char * c_str() const
void schedule_stop(const MonotonicTimePoint &schedule_to_stop_at)
int fresh_link(TcpConnection_rch connection)
unique_ptr< TcpAcceptor > acceptor_
Used to accept passive connections on our local_address_.
reference_wrapper< T > ref(T &r)
TcpConnection_rch get_connection()
virtual int open(ACE_Reactor *r=ACE_Reactor::instance(), int flags=0)
ACE_INET_Addr & address()
static TimePoint_T< MonotonicClock > now()
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
void async_connect_failed(const PriorityKey &key)
int connect(const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
Priority & transport_priority()
int reconnect(const TcpConnection_rch &connection)
int hostname(char name[], size_t maxnamelen)
virtual void unbind_link(DataLink *link)
Remove any pending_release mappings.
bool is_shut_down() const
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, bool active)
void schedule_delayed_release()
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
void set_release_pending(bool flag)
Set release pending flag.
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
ConnectionMap connections_
int find(const EXT_ID &ext_id, INT_ID &int_id) const
Atomic< size_t > last_link_
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
static const TimeDuration zero_value
virtual void shutdown_i()
ReactorTask_rch reactor_task()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
void do_association_actions()
virtual void client_stop(const GUID_t &local_id)
static ACE_INET_Addr get_remote_address(const TransportBLOB &remote)
u_short get_port_number(void) const
unsigned long msec(void) const
virtual bool configure_i(const TcpInst_rch &config)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
String str(unsigned decimal_places=3, bool just_sec=false) const
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Reactor * get_reactor()
String get_fully_qualified_hostname(ACE_INET_Addr *addr)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
Connector connector_
Open TcpConnections using non-blocking connect.
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
LockType connections_lock_
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
AddrLinkMap pending_release_links_
int next(ACE_Hash_Map_Entry< EXT_ID, INT_ID > *&next_entry) const
int reuse_existing_connection(const TcpConnection_rch &connection)
TcpInst_rch config() const
#define TheServiceParticipant
DDS::OctetSeq TransportBLOB
const ACE_INET_Addr & remote_address() const
Accessor for the remote address.
The Internal API and Implementation of OpenDDS.
virtual bool connection_info_i(TransportLocator &local_info, ConnectionInfoFlags flags) const
size_t ConnectionInfoFlags
TransportInst_rch config() const
void invoke_on_start_callbacks(bool success)