22 #include <dds/DdsDcpsInfoUtilsC.h> 36 , expected_transaction_id_(1)
37 , max_transaction_id_seen_(0)
38 , max_transaction_tail_(0)
40 , cdr_encapsulation_(false)
43 , reverse_lock_(
lock_)
53 ACE_TEXT(
"(%P|%t) TransportClient::~TransportClient: %C\n"),
62 for (
size_t i = 0; i <
impls_.size(); ++i) {
75 if (it->second->safe_to_remove()) {
92 ent && tc.
is_nil(); ent = ent->parent()) {
93 tc = ent->transport_config();
116 ACE_TEXT(
"(%P|%t) ERROR: TransportClient::enable_transport ")
117 ACE_TEXT(
"No TransportConfig found.\n")));
137 ACE_TEXT(
"(%P|%t) TransportClient::enable_transport_using_config ")
138 ACE_TEXT(
"passive_connect_duration_ configured as 0, changing to ")
148 for (
size_t i = 0; i < n; ++i) {
157 #if defined(OPENDDS_SECURITY) 168 ACE_TEXT(
"(%P|%t) ERROR: TransportClient::enable_transport ")
169 ACE_TEXT(
"No TransportImpl could be created.\n")));
180 for (
size_t i = 0; i < n; ++i) {
193 ACE_TEXT(
"(%P|%t) TransportClient::populate_connection_info: ")
213 ACE_TEXT(
"local %C remote %C no available impls\n"),
215 reader_log.
c_str()));
220 bool all_impls_shut_down =
true;
221 for (
size_t i = 0; i <
impls_.size(); ++i) {
224 all_impls_shut_down =
false;
229 if (all_impls_shut_down) {
234 ACE_TEXT(
"local %C remote %C all available impls previously shutdown\n"),
236 reader_log.
c_str()));
258 iter =
pending_.insert(std::make_pair(remote_copy, pa)).first;
262 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TransportClient::associate added PendingAssoc " 263 "between %C and remote %C\n",
264 tc_assoc_log.
c_str(),
265 remote_log.
c_str()), 0);
269 ACE_TEXT(
"(%P|%t) ERROR: TransportClient::associate ")
270 ACE_TEXT(
"already associating with remote.\n")));
289 for (
size_t i = 0; i <
impls_.size(); ++i) {
298 pend->
impls_.push_back(impl);
363 return !client_ && !scheduled_;
375 client = client_.lock();
379 if (client && client.get() ==
static_cast<TransportClient*
>(
const_cast<void*
>(arg))) {
397 ACE_TEXT(
"between local %C and remote %C unsuccessful because ")
400 remote_log.
c_str()), 0);
409 "attempt to connect_datalink between local %C and remote %C\n",
411 remote_log.
c_str()), 0);
423 ACE_TEXT(
"connect_datalink between local %C remote %C not successful\n"),
425 reader_log.
c_str()));
434 "connection between local %C and remote %C initiation successful\n",
436 remote_log.
c_str()), 0);
445 LogGuid remote_log(data_.remote_id_);
447 "between %C and remote %C\n",
449 remote_log.
c_str()), 0);
459 for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
460 if (data_.remote_data_[blob_index_].transport_type.in() == type) {
462 data_.remote_id_, data_.remote_data_[blob_index_].data, data_.discovery_locator_.data,
464 data_.publication_transport_priority_, data_.remote_reliable_, data_.remote_durable_};
479 ACE_TEXT(
"between %C and remote %C success\n"),
481 remote_log.
c_str()), 0);
486 "between %C and remote %C unsuccessful\n",
488 remote_log.
c_str()), 0);
505 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
507 remote_log.
c_str()), 0);
513 "result of initiate_connect_i (local: %C to remote: %C) was not success\n",
515 remote_log.
c_str()), 0);
546 GUID_t remote_id(remote_id_ref);
550 "TransportClient(%@) using datalink[%@] from %C\n",
553 peerId_log.
c_str()), 0);
555 PendingMap::iterator iter =
pending_.find(remote_id);
559 "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
562 peerId_log.
c_str()), 0);
575 "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
578 peerId_log.
c_str()), 0);
583 "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
586 peerId_log.
c_str()), 0);
589 "TransportClient(%@) about to add_link[%@] to remote: %C\n",
592 peerId_log.
c_str()), 0);
599 for (
size_t i = 0; i < pend->
impls_.size(); ++i) {
606 pend_guard.release();
609 prev_pending_.insert(std::make_pair(iter->first, iter->second));
638 for (PendingMap::iterator it =
pending_.begin(); it !=
pending_.end(); ++it) {
642 for (
size_t i = 0; i < it->second->impls_.size(); ++i) {
649 it->second->reset_client();
661 if (repos == 0 || length == 0) {
665 PendingMap::iterator iter =
pending_.find(repos[i]);
670 for (
size_t i = 0; i < iter->second->impls_.size(); ++i) {
677 iter->second->reset_client();
679 prev_pending_.insert(std::make_pair(iter->first, iter->second));
697 "TransportClient(%@) disassociating from %C\n",
699 peerId_log.
c_str()), 5);
703 PendingMap::iterator iter =
pending_.find(peerId);
708 for (
size_t i = 0; i < iter->second->impls_.size(); ++i) {
715 iter->second->reset_client();
717 prev_pending_.insert(std::make_pair(iter->first, iter->second));
728 ACE_TEXT(
"(%P|%t) TransportClient::disassociate: ")
729 ACE_TEXT(
"no link for remote peer %C\n"),
741 DataLinkSetMap released;
745 ACE_TEXT(
"(%P|%t) TransportClient::disassociate: ")
746 ACE_TEXT(
"about to release_reservations for link[%@]\n"),
753 if (!released.empty()) {
757 ACE_TEXT(
"(%P|%t) TransportClient::disassociate: ")
758 ACE_TEXT(
"about to remove_link[%@] from links_\n"),
766 ACE_TEXT(
"(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
779 const ImplsType impls =
impls_;
788 for (
size_t i = 0; i < impls.size(); ++i) {
804 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
820 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
838 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
854 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
869 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
887 for (ImplsType::iterator pos =
impls_.begin(), limit =
impls_.end();
893 if (endpoint) {
return endpoint; }
911 ACE_TEXT(
"(%P|%t) TransportClient::send_response: ")
912 ACE_TEXT(
"no link for publication %C, ")
913 ACE_TEXT(
"not sending response.\n"),
929 if (send_list.
head() == 0) {
933 send_i(send_list, transaction_id);
940 const GUID_t& destination)
944 if (send_list.
head()) {
964 if (transaction_id != 0)
992 if (pub_links.
is_nil() || pub_links->empty()) {
999 ACE_TEXT(
"(%P|%t) TransportClient::send_i: ")
1000 ACE_TEXT(
"no links for publication %C, ")
1001 ACE_TEXT(
"not sending element %@ for transaction: %d.\n"),
1016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 1027 typedef DataLinkSet::MapType MapType;
1028 MapType& map = pub_links->map();
1030 for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
1033 itr->second->target_intersection(cur->
get_pub_id(),
1036 if (ti.ptr() == 0 || ti->length() != n_subs) {
1038 subset = make_rch<DataLinkSet>();
1041 subset->insert_link(itr->second);
1046 "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
1053 VDBG((
LM_DEBUG,
"(%P|%t) DBG: filtered-out of all DataLinks.\n"));
1079 pub_links->send_control(cur);
1081 pub_links->send(cur);
1100 if (transaction_id != 0) {
1110 return rchandle_from(dynamic_cast<TransportSendListener*>(
this));
1116 return rchandle_from(dynamic_cast<TransportReceiveListener*>(
this));
1132 const GUID_t& destination)
1187 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: TransportClient::pending_association_with: " 1206 send_listener->data_acked(remote);
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
CORBA::ULong get_num_subs() const
DataSampleElement * tail() const
bool connection_info(TransportLocator &local_info, ConnectionInfoFlags flags) const
RcHandle< T > rchandle_from(T *pointer)
TransportImpl_rch get_or_create_impl()
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_UINT64 max_transaction_id_seen_
const DataSampleHeader & get_header() const
virtual SequenceNumber get_max_sn() const
DataLinkIndex data_link_index_
SendControlStatus send_control_to(const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
char message_id_
The enum MessageId.
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
virtual GUID_t get_guid() const =0
void enable_transport(bool reliable, bool durable)
TransportSendListener * get_send_listener() const
void remove_link(const DataLink_rch &link)
PrevPendingMap prev_pending_
virtual bool check_transport_qos(const TransportInst &inst)=0
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
void send_final_acks(const GUID_t &readerid)
bool remove_sample(const DataSampleElement *sample)
TransportLocatorSeq remote_data_
GUIDSeq_var filter_out_
tracking for Content-Filtering data
const GUID_t GUID_UNKNOWN
Nil value for GUID.
TransportConfig_rch config_
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
DataSampleElement * head() const
static const char DEFAULT_CONFIG_NAME[]
GUID_t get_pub_id() const
MonotonicTime_t participant_discovered_at_
const char * c_str() const
virtual void client_stop(const GUID_t &)
void send_stop(GUID_t repoId)
#define OPENDDS_ASSERT(C)
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
static const ConnectionInfoFlags CONNINFO_ALL
void populate_connection_info()
void domain_default_config(DDS::DomainId_t domain, const TransportConfig_rch &cfg)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
sequence< TransportLocator > TransportLocatorSeq
T::rv_reference move(T &p)
DataSampleElement * get_next_send_sample() const
ACE_UINT64 expected_transaction_id_
bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const
virtual void local_crypto_handle(DDS::Security::ParticipantCryptoHandle)
void disassociate(const GUID_t &peerId)
ACE_Guard< ACE_Thread_Mutex > lock_
TransportImpl::ConnectionAttribs attribs_
TransportConfig_rch global_config() const
void terminate_send_if_suspended()
TransportReceiveListener_rch get_receive_listener()
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
virtual ~TransportClient()
DataSampleElement * max_transaction_tail_
virtual Priority get_priority_value(const AssociationData &data) const =0
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
MonotonicTime_t participant_discovered_at_
bool remove_sample(const DataSampleElement *sample)
bool send_response(const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
int insert_link(const DataLink_rch &link)
static TimeDuration from_msec(const ACE_UINT64 &ms)
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
DataLinkSet_rch select_links(const GUID_t *remoteIds, const CORBA::ULong num_targets)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool is_shut_down() const
void terminate_send_if_suspended()
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
bool associated_with(const GUID_t &remote) const
RcHandle< PendingAssocTimer > pending_assoc_timer_
TransportLocatorSeq conn_info_
bool pending_association_with(const GUID_t &remote) const
virtual OPENDDS_STRING transport_type() const =0
unsigned long passive_connect_duration_
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
Seq::size_type grow(Seq &seq)
bool initiate_connect_i(TransportImpl::AcceptConnectResult &result, TransportImpl_rch impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
virtual DDS::DomainId_t domain_id() const =0
void data_acked(const GUID_t &remote)
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
virtual void transport_assoc_done(int, const GUID_t &)
DataLinkIdTypeGUIDMap filter_per_link_
OPENDDS_STRING name() const
bool is_leading(const GUID_t &reader_id) const
void use_datalink(const GUID_t &remote_id, const DataLink_rch &link)
ACE_UINT64 transaction_id() const
Mix-in class for DDS entities which directly use the transport layer.
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
bool associate(const AssociationData &peer, bool active)
virtual bool requires_cdr_encapsulation() const
Does the transport require a CDR-encapsulated data payload?
unsigned long long ACE_UINT64
void remove_listener(const GUID_t &local_id)
TransportConfig_rch fix_empty_default()
Priority publication_transport_priority_
Reverse_Lock_t reverse_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
virtual void unregister_for_reader(const GUID_t &, const GUID_t &, const GUID_t &)
static const unsigned long DEFAULT_PASSIVE_CONNECT_DURATION
virtual void register_for_reader(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
bool initiate_connect(TransportClient *tc, Guard &guard)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static TransportRegistry * instance()
Return a singleton instance of this class.
int handle_timeout(const ACE_Time_Value &time, const void *arg)
bool remove_all_msgs(const GUID_t &pub_id)
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const OpenDDS::DCPS::GUID_t * get_sub_ids() const
void enable_transport_using_config(bool reliable, bool durable, const TransportConfig_rch &tc)
TimeDuration passive_connect_duration_
void send_control(DataSampleElement *sample)
Send a control message that is wrapped in a DataSampleElement.
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)=0
void send_response(GUID_t sub_id, const DataSampleHeader &header, Message_Block_Ptr response)
#define TheServiceParticipant
bool success_
If false, the accept or connect has failed and link_ is ignored.
void release_reservations(GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
void send_start(DataLinkSet *link_set)
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
ACE_Thread_Mutex send_transaction_lock_
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)=0
virtual void data_delivered(const DataSampleElement *sample)
TransportSendListener_rch get_send_listener()
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
void clean_prev_pending()
SendControlStatus
Return code type for send_control() operations.
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)