OpenDDS::DCPS::TransportClient Class Reference

Mix-in class for DDS entities which directly use the transport layer. More...

#include <TransportClient.h>

Inheritance diagram for OpenDDS::DCPS::TransportClient:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportClient:
Collaboration graph
[legend]

List of all members.

Classes

struct  PendingAssoc
class  PendingAssocTimer

Public Types

enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }

Public Member Functions

void use_datalink (const RepoId &remote_id, const DataLink_rch &link)
 TransportClient ()
virtual ~TransportClient ()
void enable_transport (bool reliable, bool durable)
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
bool swap_bytes () const
bool cdr_encapsulation () const
const TransportLocatorSeqconnection_info () const
bool associate (const AssociationData &peer, bool active)
void disassociate (const RepoId &peerId)
void stop_associating ()
void stop_associating (const GUID_t *repos, CORBA::ULong length)
void send_final_acks ()
void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
void register_for_writer (const RepoId &participant, const RepoId &readerid, const RepoId &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
void unregister_for_writer (const RepoId &participant, const RepoId &readerid, const RepoId &writerid)
bool send_response (const RepoId &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const RepoId &destination)
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const RepoId &destination)
bool remove_sample (const DataSampleElement *sample)
bool remove_all_msgs ()
virtual void add_link (const DataLink_rch &link, const RepoId &peer)

Private Types

typedef ACE_Guard
< ACE_Thread_Mutex
Guard
typedef RcHandle< PendingAssocPendingAssoc_rch
typedef ACE_Reverse_Lock
< ACE_Thread_Mutex
Reverse_Lock_t

Private Member Functions

virtual bool check_transport_qos (const TransportInst &inst)=0
virtual const RepoIdget_repo_id () const =0
virtual DDS::DomainId_t domain_id () const =0
virtual Priority get_priority_value (const AssociationData &data) const =0
virtual void transport_assoc_done (int, const RepoId &)
void use_datalink_i (const RepoId &remote_id, const DataLink_rch &link, Guard &guard)
TransportSendListener_rch get_send_listener ()
TransportReceiveListener_rch get_receive_listener ()
bool initiate_connect_i (TransportImpl::AcceptConnectResult &result, TransportImpl *impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
void send_i (SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
typedef OPENDDS_MAP_CMP (RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex
typedef OPENDDS_VECTOR (TransportImpl *) ImplsType
typedef OPENDDS_MAP_CMP (RepoId, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap

Private Attributes

RcHandle< PendingAssocTimerpending_assoc_timer_
ImplsType impls_
PendingMap pending_
DataLinkSet links_
DataLinkSet send_links_
DataLinkIndex data_link_index_
ACE_Thread_Mutex send_transaction_lock_
ACE_UINT64 expected_transaction_id_
ACE_UINT64 max_transaction_id_seen_
DataSampleElementmax_transaction_tail_
bool swap_bytes_
bool cdr_encapsulation_
bool reliable_
bool durable_
ACE_Time_Value passive_connect_duration_
TransportLocatorSeq conn_info_
ACE_Thread_Mutex lock_
 Seems to protect accesses to impls_, pending_, links_, data_link_index_.
Reverse_Lock_t reverse_lock_
RepoId repo_id_

Friends

class ::DDS_TEST

Detailed Description

Mix-in class for DDS entities which directly use the transport layer.

DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.

Definition at line 52 of file TransportClient.h.


Member Typedef Documentation

Definition at line 146 of file TransportClient.h.

Definition at line 188 of file TransportClient.h.

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 302 of file TransportClient.h.


Member Enumeration Documentation

anonymous enum
Enumerator:
ASSOC_OK 
ASSOC_ACTIVE 

Definition at line 60 of file TransportClient.h.

00060 { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportClient::TransportClient (  ) 

Definition at line 33 of file TransportClient.cpp.

00034   : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00035   , expected_transaction_id_(1)
00036   , max_transaction_id_seen_(0)
00037   , max_transaction_tail_(0)
00038   , swap_bytes_(false)
00039   , cdr_encapsulation_(false)
00040   , reliable_(false)
00041   , durable_(false)
00042   , reverse_lock_(lock_)
00043   , repo_id_(GUID_UNKNOWN)
00044 {
00045 }

OpenDDS::DCPS::TransportClient::~TransportClient (  )  [virtual]

Definition at line 47 of file TransportClient.cpp.

References ACE_TEXT(), impls_, LM_DEBUG, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, repo_id_, stop_associating(), and OpenDDS::DCPS::Transport_debug_level.

00048 {
00049   if (Transport_debug_level > 5) {
00050     GuidConverter converter(repo_id_);
00051     ACE_DEBUG((LM_DEBUG,
00052                ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00053                OPENDDS_STRING(converter).c_str()));
00054   }
00055 
00056   stop_associating();
00057 
00058   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00059 
00060   for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00061     for (size_t i = 0; i < impls_.size(); ++i) {
00062       impls_[i]->stop_accepting_or_connecting(*this, it->second->data_.remote_id_);
00063     }
00064 
00065     pending_assoc_timer_->cancel_timer(this, it->second);
00066   }
00067 
00068   pending_assoc_timer_->wait();
00069 
00070 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::TransportClient::add_link ( const DataLink_rch link,
const RepoId peer 
) [virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 524 of file TransportClient.cpp.

References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), links_, and repo_id_.

Referenced by use_datalink_i().

00525 {
00526   links_.insert_link(link);
00527   data_link_index_[peer] = link;
00528 
00529   TransportReceiveListener_rch trl = get_receive_listener();
00530 
00531   if (trl) {
00532     link->make_reservation(peer, repo_id_, trl);
00533 
00534   } else {
00535     link->make_reservation(peer, repo_id_, get_send_listener());
00536   }
00537 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportClient::associate ( const AssociationData peer,
bool  active 
)

Definition at line 166 of file TransportClient.cpp.

References ACE_TEXT(), OpenDDS::DCPS::back_inserter(), OpenDDS::DCPS::DCPS_debug_level, durable_, get_priority_value(), get_repo_id(), impls_, OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, LM_DEBUG, LM_ERROR, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, OpenDDS::DCPS::rchandle_from(), reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, use_datalink_i(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::ReplayerImpl::add_association(), OpenDDS::DCPS::RecorderImpl::add_association(), OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Reader::assoc(), and OpenDDS::RTPS::Sedp::Writer::assoc().

00167 {
00168   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00169 
00170   repo_id_ = get_repo_id();
00171 
00172   if (impls_.empty()) {
00173     if (DCPS_debug_level) {
00174       GuidConverter writer_converter(repo_id_);
00175       GuidConverter reader_converter(data.remote_id_);
00176       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00177                  ACE_TEXT("local %C remote %C no available impls\n"),
00178                  OPENDDS_STRING(writer_converter).c_str(),
00179                  OPENDDS_STRING(reader_converter).c_str()));
00180     }
00181     return false;
00182   }
00183 
00184   bool all_impls_shut_down = true;
00185   for (size_t i = 0; i < impls_.size(); ++i) {
00186     if (!impls_.at(i)->is_shut_down()) {
00187       all_impls_shut_down = false;
00188       break;
00189     }
00190   }
00191 
00192   if (all_impls_shut_down) {
00193     if (DCPS_debug_level) {
00194       GuidConverter writer_converter(repo_id_);
00195       GuidConverter reader_converter(data.remote_id_);
00196       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00197                  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00198                  OPENDDS_STRING(writer_converter).c_str(),
00199                  OPENDDS_STRING(reader_converter).c_str()));
00200     }
00201     return false;
00202   }
00203 
00204   PendingMap::iterator iter = pending_.find(data.remote_id_);
00205 
00206   if (iter == pending_.end()) {
00207     RepoId remote_copy(data.remote_id_);
00208     iter = pending_.insert(std::make_pair(remote_copy, make_rch<PendingAssoc>())).first;
00209 
00210     GuidConverter tc_assoc(repo_id_);
00211     GuidConverter remote_new(data.remote_id_);
00212     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00213               "between %C and remote %C\n",
00214               OPENDDS_STRING(tc_assoc).c_str(),
00215               OPENDDS_STRING(remote_new).c_str()), 0);
00216   } else {
00217 
00218     ACE_ERROR((LM_ERROR,
00219                ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00220                ACE_TEXT("already associating with remote.\n")));
00221 
00222     return false;
00223 
00224   }
00225 
00226   PendingAssoc_rch pend = iter->second;
00227   pend->active_ = active;
00228   pend->impls_.clear();
00229   pend->blob_index_ = 0;
00230   pend->data_ = data;
00231   pend->attribs_.local_id_ = repo_id_;
00232   pend->attribs_.priority_ = get_priority_value(data);
00233   pend->attribs_.local_reliable_ = reliable_;
00234   pend->attribs_.local_durable_ = durable_;
00235 
00236   if (active) {
00237     pend->impls_.reserve(impls_.size());
00238     std::reverse_copy(impls_.begin(), impls_.end(),
00239                       std::back_inserter(pend->impls_));
00240 
00241     return pend->initiate_connect(this, guard);
00242 
00243   } else { // passive
00244 
00245     // call accept_datalink for each impl / blob pair of the same type
00246     for (size_t i = 0; i < impls_.size(); ++i) {
00247       pend->impls_.push_back(impls_[i]);
00248       const OPENDDS_STRING type = impls_[i]->transport_type();
00249 
00250       for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00251         if (data.remote_data_[j].transport_type.in() == type) {
00252           const TransportImpl::RemoteTransport remote = {
00253             data.remote_id_, data.remote_data_[j].data,
00254             data.publication_transport_priority_,
00255             data.remote_reliable_, data.remote_durable_};
00256 
00257           TransportImpl::AcceptConnectResult res;
00258           {
00259             // This thread acquired lock_ at the beginning of this method.  Calling accept_datalink might require getting the lock for the transport's reactor.
00260             // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
00261             // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_.  The locking order in this case is transport reactor lock -> lock_.
00262             // To avoid deadlock, we must reverse the lock.
00263             ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00264             res = impls_[i]->accept_datalink(remote, pend->attribs_, rchandle_from(this));
00265           }
00266 
00267           //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
00268           PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00269 
00270           if (iter_after_accept == pending_.end()) {
00271             //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
00272             //active side connection and completed, thus pend was removed from pending_.  Can return true.
00273             return true;
00274           }
00275 
00276           if (res.success_) {
00277             if (res.link_.is_nil()) {
00278                 // In this case, it may be waiting for the TCP connection to be established.  Just wait without trying other transports.
00279                 pending_assoc_timer_->schedule_timer(this, iter->second);
00280             }
00281             else {
00282                 use_datalink_i(data.remote_id_, res.link_, guard);
00283             }
00284             return true;
00285           }
00286         }
00287       }
00288 
00289       //pend->impls_.push_back(impls_[i]);
00290     }
00291 
00292     pending_assoc_timer_->schedule_timer(this, iter->second);
00293   }
00294 
00295   return true;
00296 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation (  )  const [inline]

Definition at line 72 of file TransportClient.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().

00072 { return cdr_encapsulation_; }

Here is the caller graph for this function:

virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos ( const TransportInst inst  )  [private, pure virtual]
const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info (  )  const [inline]
void OpenDDS::DCPS::TransportClient::disassociate ( const RepoId peerId  ) 

Definition at line 567 of file TransportClient.cpp.

References ACE_TEXT(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, lock_, OPENDDS_STRING, pending_, OpenDDS::DCPS::DataLinkSet::remove_link(), repo_id_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations_i(), and OpenDDS::DCPS::DataReaderImpl::remove_associations_i().

00568 {
00569   GuidConverter peerId_conv(peerId);
00570   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00571             "TransportClient(%@) disassociating from %C\n",
00572             this,
00573             OPENDDS_STRING(peerId_conv).c_str()), 5);
00574 
00575   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00576 
00577   if (pending_.erase(peerId)) {
00578     return;
00579   }
00580 
00581   const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00582 
00583   if (found == data_link_index_.end()) {
00584     if (DCPS_debug_level > 4) {
00585       const GuidConverter converter(peerId);
00586       ACE_DEBUG((LM_DEBUG,
00587                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00588                  ACE_TEXT("no link for remote peer %C\n"),
00589                  OPENDDS_STRING(converter).c_str()));
00590     }
00591 
00592     return;
00593   }
00594 
00595   const DataLink_rch link = found->second;
00596 
00597   //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
00598   //otherwise it could be removed in transport_detached()
00599   data_link_index_.erase(found);
00600   DataLinkSetMap released;
00601 
00602     if (DCPS_debug_level > 4) {
00603       ACE_DEBUG((LM_DEBUG,
00604                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00605                  ACE_TEXT("about to release_reservations for link[%@] \n"),
00606                  link.in()));
00607     }
00608 
00609     link->release_reservations(peerId, repo_id_, released);
00610 
00611   if (!released.empty()) {
00612 
00613     if (DCPS_debug_level > 4) {
00614       ACE_DEBUG((LM_DEBUG,
00615                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00616                  ACE_TEXT("about to remove_link[%@] from links_\n"),
00617                  link.in()));
00618     }
00619     links_.remove_link(link);
00620 
00621     if (DCPS_debug_level > 4) {
00622       GuidConverter converter(repo_id_);
00623       ACE_DEBUG((LM_DEBUG,
00624                  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00625                  OPENDDS_STRING(converter).c_str(),
00626                  link.in()));
00627     }
00628     // Datalink is no longer used for any remote peer by this TransportClient
00629     link->remove_listener(repo_id_);
00630 
00631   }
00632 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id (  )  const [private, pure virtual]
void OpenDDS::DCPS::TransportClient::enable_transport ( bool  reliable,
bool  durable 
)

Definition at line 73 of file TransportClient.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, OpenDDS::DCPS::TransportRegistry::domain_default_config(), domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::fix_empty_default(), OpenDDS::DCPS::TransportRegistry::global_config(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, and OpenDDS::DCPS::rchandle_from().

Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().

00074 {
00075   // Search for a TransportConfig to use:
00076   TransportConfig_rch tc;
00077 
00078   // 1. If this object is an Entity, check if a TransportConfig has been
00079   //    bound either directly to this entity or to a parent entity.
00080   for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
00081        ent && tc.is_nil(); ent = ent->parent()) {
00082     tc = ent->transport_config();
00083   }
00084 
00085   if (tc.is_nil()) {
00086     TransportRegistry* const reg = TransportRegistry::instance();
00087     // 2. Check for a TransportConfig that is the default for this Domain.
00088     tc = reg->domain_default_config(domain_id());
00089 
00090     if (tc.is_nil()) {
00091       // 3. Use the global_config if one has been set.
00092       tc = reg->global_config();
00093 
00094       if (!tc.is_nil() && tc->instances_.empty()
00095           && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00096         // 4. Set the "fallback option" if the global_config is empty.
00097         //    (only applies if the user hasn't changed the global config)
00098         tc = reg->fix_empty_default();
00099       }
00100     }
00101   }
00102 
00103   if (tc.is_nil()) {
00104     ACE_ERROR((LM_ERROR,
00105                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00106                ACE_TEXT("No TransportConfig found.\n")));
00107     throw Transport::NotConfigured();
00108   }
00109 
00110   enable_transport_using_config(reliable, durable, tc);
00111 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::enable_transport_using_config ( bool  reliable,
bool  durable,
const TransportConfig_rch tc 
)

Definition at line 114 of file TransportClient.cpp.

References ACE_TEXT(), cdr_encapsulation_, check_transport_qos(), conn_info_, OpenDDS::DCPS::TransportImpl::connection_info(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, impls_, len, LM_ERROR, LM_WARNING, passive_connect_duration_, reliable_, ACE_Time_Value::set(), and swap_bytes_.

Referenced by enable_transport().

00116 {
00117   swap_bytes_ = tc->swap_bytes_;
00118   cdr_encapsulation_ = false;
00119   reliable_ = reliable;
00120   durable_ = durable;
00121   unsigned long duration = tc->passive_connect_duration_;
00122   if (duration == 0) {
00123     duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00124     if (DCPS_debug_level) {
00125       ACE_DEBUG((LM_WARNING,
00126         ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00127         ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00128         ACE_TEXT("default value\n")));
00129     }
00130   }
00131   passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00132 
00133   const size_t n = tc->instances_.size();
00134 
00135   for (size_t i = 0; i < n; ++i) {
00136     TransportInst_rch inst = tc->instances_[i];
00137 
00138     if (check_transport_qos(*inst)) {
00139       TransportImpl* impl = inst->impl();
00140 
00141       if (impl) {
00142         impls_.push_back(impl);
00143         const CORBA::ULong len = conn_info_.length();
00144         conn_info_.length(len + 1);
00145         impl->connection_info(conn_info_[len]);
00146 
00147 #if defined(OPENDDS_SECURITY)
00148         impl->local_crypto_handle(get_crypto_handle());
00149 #endif
00150 
00151         cdr_encapsulation_ |= inst->requires_cdr();
00152       }
00153     }
00154   }
00155 
00156   if (impls_.empty()) {
00157     ACE_ERROR((LM_ERROR,
00158                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00159                ACE_TEXT("No TransportImpl could be created.\n")));
00160     throw Transport::NotConfigured();
00161   }
00162 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value ( const AssociationData data  )  const [private, pure virtual]
TransportReceiveListener_rch OpenDDS::DCPS::TransportClient::get_receive_listener (  )  [private]

Definition at line 902 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link().

00903 {
00904   return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
00905 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual const RepoId& OpenDDS::DCPS::TransportClient::get_repo_id (  )  const [private, pure virtual]
TransportSendListener_rch OpenDDS::DCPS::TransportClient::get_send_listener (  )  [private]

Definition at line 896 of file TransportClient.cpp.

References OpenDDS::DCPS::rchandle_from().

Referenced by add_link(), send_control(), and send_control_to().

00897 {
00898   return rchandle_from(dynamic_cast<TransportSendListener*>(this));
00899 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportClient::initiate_connect_i ( TransportImpl::AcceptConnectResult result,
TransportImpl impl,
const TransportImpl::RemoteTransport remote,
const TransportImpl::ConnectionAttribs attribs_,
Guard guard 
) [private]

Definition at line 310 of file TransportClient.cpp.

References ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), OpenDDS::DCPS::TransportImpl::connect_datalink(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Guard< ACE_LOCK >::locked(), OPENDDS_STRING, OpenDDS::DCPS::rchandle_from(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().

00315 {
00316   if (!guard.locked()) {
00317     //don't own the lock_ so can't release it...shouldn't happen
00318     GuidConverter local(repo_id_);
00319     GuidConverter remote_conv(remote.repo_id_);
00320     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00321                         "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00322                         OPENDDS_STRING(local).c_str(),
00323                         OPENDDS_STRING(remote_conv).c_str()), 0);
00324     return false;
00325   }
00326 
00327   {
00328     //can't call connect while holding lock due to possible reactor deadlock
00329     guard.release();
00330     GuidConverter local(repo_id_);
00331     GuidConverter remote_conv(remote.repo_id_);
00332     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00333                         "attempt to connect_datalink between local %C and remote %C\n",
00334                         OPENDDS_STRING(local).c_str(),
00335                         OPENDDS_STRING(remote_conv).c_str()), 0);
00336     result = impl->connect_datalink(remote, attribs_, rchandle_from(this));
00337     guard.acquire();
00338     if (!result.success_) {
00339       if (DCPS_debug_level) {
00340         GuidConverter writer_converter(repo_id_);
00341         GuidConverter reader_converter(remote.repo_id_);
00342         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00343                    ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00344                    OPENDDS_STRING(writer_converter).c_str(),
00345                    OPENDDS_STRING(reader_converter).c_str()));
00346       }
00347       return false;
00348     }
00349   }
00350 
00351   GuidConverter local(repo_id_);
00352   GuidConverter remote_conv(remote.repo_id_);
00353   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00354                       "connection between local %C and remote %C initiation successful\n",
00355                       OPENDDS_STRING(local).c_str(),
00356                       OPENDDS_STRING(remote_conv).c_str()), 0);
00357   return true;
00358 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( RepoId  ,
PendingAssoc_rch  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( RepoId  ,
DataLink_rch  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR ( TransportImpl  )  [private]

Referenced by OpenDDS::DCPS::RecorderImpl::signal_liveliness().

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
OpenDDS::DCPS::DiscoveryListener listener 
)

Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 635 of file TransportClient.cpp.

References impls_, and lock_.

00640 {
00641   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00642   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00643        pos != limit;
00644        ++pos) {
00645     (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00646   }
00647 }

void OpenDDS::DCPS::TransportClient::register_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.

Definition at line 663 of file TransportClient.cpp.

References impls_, and lock_.

00668 {
00669   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00670   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00671        pos != limit;
00672        ++pos) {
00673     (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00674   }
00675 }

bool OpenDDS::DCPS::TransportClient::remove_all_msgs (  ) 

Definition at line 940 of file TransportClient.cpp.

References links_, OpenDDS::DCPS::DataLinkSet::remove_all_msgs(), and repo_id_.

Referenced by OpenDDS::DCPS::WriteDataContainer::unregister_all().

00941 {
00942   return links_.remove_all_msgs(repo_id_);
00943 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportClient::remove_sample ( const DataSampleElement sample  ) 

Definition at line 934 of file TransportClient.cpp.

References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().

Referenced by OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample().

00935 {
00936   return links_.remove_sample(sample);
00937 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::send ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id = 0 
)

Definition at line 717 of file TransportClient.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), and OpenDDS::DCPS::DataWriterImpl::write().

00718 {
00719   if (send_list.head() == 0) {
00720     return;
00721   }
00722   ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00723   send_i(send_list, transaction_id);
00724 }

Here is the call graph for this function:

Here is the caller graph for this function:

SendControlStatus OpenDDS::DCPS::TransportClient::send_control ( const DataSampleHeader header,
Message_Block_Ptr  msg 
)

Reimplemented in OpenDDS::DCPS::DataWriterImpl.

Definition at line 908 of file TransportClient.cpp.

References get_send_listener(), links_, OpenDDS::DCPS::move(), repo_id_, and OpenDDS::DCPS::DataLinkSet::send_control().

Referenced by OpenDDS::RTPS::Sedp::Writer::write_control_msg().

00910 {
00911   return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
00912 }

Here is the call graph for this function:

Here is the caller graph for this function:

SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to ( const DataSampleHeader header,
Message_Block_Ptr  msg,
const RepoId destination 
)

Definition at line 915 of file TransportClient.cpp.

References data_link_index_, get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), lock_, OpenDDS::DCPS::move(), repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), and OpenDDS::DCPS::SEND_CONTROL_ERROR.

Referenced by send_w_control().

00918 {
00919   DataLinkSet singular;
00920   {
00921     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
00922     DataLinkIndex::iterator found = data_link_index_.find(destination);
00923 
00924     if (found == data_link_index_.end()) {
00925       return SEND_CONTROL_ERROR;
00926     }
00927 
00928     singular.insert_link(found->second);
00929   }
00930   return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
00931 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::send_final_acks (  ) 

Definition at line 561 of file TransportClient.cpp.

References get_repo_id(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().

Referenced by OpenDDS::DCPS::DataReaderImpl::prepare_to_delete().

00562 {
00563   links_.send_final_acks (get_repo_id());
00564 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::send_i ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id 
) [private]

Definition at line 741 of file TransportClient.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, LM_DEBUG, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, repo_id_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.

Referenced by send(), and send_w_control().

00742 {
00743   if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00744     if (transaction_id > max_transaction_id_seen_) {
00745       max_transaction_id_seen_ = transaction_id;
00746       max_transaction_tail_ = send_list.tail();
00747     }
00748     return;
00749   } else /* transaction_id == expected_transaction_id */ {
00750 
00751     DataSampleElement* cur = send_list.head();
00752     if (max_transaction_tail_ == 0) {
00753       //Means no future transaction beat this transaction into send
00754       if (transaction_id != 0)
00755         max_transaction_id_seen_ = expected_transaction_id_;
00756       // Only send this current transaction
00757       max_transaction_tail_ = send_list.tail();
00758     }
00759     DataLinkSet send_links;
00760 
00761     while (true) {
00762       // VERY IMPORTANT NOTE:
00763       //
00764       // We have to be very careful in how we deal with the current
00765       // DataSampleElement.  The issue is that once we have invoked
00766       // data_delivered() on the send_listener_ object, or we have invoked
00767       // send() on the pub_links, we can no longer access the current
00768       // DataSampleElement!Thus, we need to get the next
00769       // DataSampleElement (pointer) from the current element now,
00770       // while it is safe.
00771       DataSampleElement* next_elem;
00772       if (cur != max_transaction_tail_) {
00773         next_elem = cur->get_next_send_sample();
00774       } else {
00775         next_elem = max_transaction_tail_;
00776       }
00777       DataLinkSet_rch pub_links =
00778         (cur->get_num_subs() > 0)
00779         ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
00780         : DataLinkSet_rch(&links_, inc_count());
00781 
00782       if (pub_links.is_nil() || pub_links->empty()) {
00783         // NOTE: This is the "local publisher id is not currently
00784         //       associated with any remote subscriber ids" case.
00785 
00786         if (DCPS_debug_level > 4) {
00787           GuidConverter converter(cur->get_pub_id());
00788           ACE_DEBUG((LM_DEBUG,
00789                      ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00790                      ACE_TEXT("no links for publication %C, ")
00791                      ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00792                      OPENDDS_STRING(converter).c_str(),
00793                      cur,
00794                      cur->transaction_id()));
00795         }
00796 
00797         // We tell the send_listener_ that all of the remote subscriber ids
00798         // that wanted the data (all zero of them) have indeed received
00799         // the data.
00800         cur->get_send_listener()->data_delivered(cur);
00801 
00802       } else {
00803         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00804                   , cur), 5);
00805 
00806   #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00807 
00808         // Content-Filtering adjustment to the pub_links:
00809         // - If the sample should be filtered out of all subscriptions on a given
00810         //   DataLink, then exclude that link from the subset that we'll send to.
00811         // - If the sample should be filtered out of some (or none) of the subs,
00812         //   then record that information in the DataSampleElement so that the
00813         //   header's content_filter_entries_ can be marshaled before it's sent.
00814         if (cur->filter_out_.ptr()) {
00815           DataLinkSet_rch subset;
00816           DataLinkSet::GuardType guard(pub_links->lock());
00817           typedef DataLinkSet::MapType MapType;
00818           MapType& map = pub_links->map();
00819 
00820           for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
00821             size_t n_subs;
00822             GUIDSeq_var ti =
00823               itr->second->target_intersection(cur->get_pub_id(),
00824                                                cur->filter_out_.in(), n_subs);
00825 
00826             if (ti.ptr() == 0 || ti->length() != n_subs) {
00827               if (!subset.in()) {
00828                 subset = make_rch<DataLinkSet>();
00829               }
00830 
00831               subset->insert_link(itr->second);
00832               cur->filter_per_link_[itr->first] = ti._retn();
00833 
00834             } else {
00835               VDBG((LM_DEBUG,
00836                     "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
00837                     itr->second.in()));
00838             }
00839           }
00840 
00841           if (!subset.in()) {
00842             VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
00843             // similar to the "if (pub_links.is_nil())" case above, no links
00844             cur->get_send_listener()->data_delivered(cur);
00845             if (cur != max_transaction_tail_) {
00846               // Move on to the next DataSampleElement to send.
00847               cur = next_elem;
00848               continue;
00849             } else {
00850               break;
00851             }
00852           }
00853 
00854           pub_links = subset;
00855         }
00856 
00857   #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00858 
00859         // This will do several things, including adding to the membership
00860         // of the send_links set.  Any DataLinks added to the send_links
00861         // set will be also told about the send_start() event.  Those
00862         // DataLinks (in the pub_links set) that are already in the
00863         // send_links set will not be told about the send_start() event
00864         // since they heard about it when they were inserted into the
00865         // send_links set.
00866         send_links.send_start(pub_links.in());
00867         if (cur->get_header().message_id_ != SAMPLE_DATA) {
00868           pub_links->send_control(cur);
00869         } else {
00870           pub_links->send(cur);
00871         }
00872       }
00873       if (cur != max_transaction_tail_) {
00874         // Move on to the next DataSampleElement to send.
00875         cur = next_elem;
00876       } else {
00877         break;
00878       }
00879     }
00880 
00881     // This will inform each DataLink in the set about the stop_send() event.
00882     // It will then clear the send_links_ set.
00883     //
00884     // The reason that the send_links_ set is cleared is because we continually
00885     // reuse the same send_links_ object over and over for each call to this
00886     // send method.
00887     RepoId pub_id(repo_id_);
00888     send_links.send_stop(pub_id);
00889     if (transaction_id != 0)
00890       expected_transaction_id_ = max_transaction_id_seen_ + 1;
00891     max_transaction_tail_ = 0;
00892   }
00893 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportClient::send_response ( const RepoId peer,
const DataSampleHeader header,
Message_Block_Ptr  payload 
)

Definition at line 691 of file TransportClient.cpp.

References ACE_TEXT(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataLinkSet::insert_link(), LM_DEBUG, OpenDDS::DCPS::move(), OPENDDS_STRING, and OpenDDS::DCPS::DataLinkSet::send_response().

00694 {
00695   DataLinkIndex::iterator found = data_link_index_.find(peer);
00696 
00697   if (found == data_link_index_.end()) {
00698     if (DCPS_debug_level > 4) {
00699       GuidConverter converter(peer);
00700       ACE_DEBUG((LM_DEBUG,
00701                  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00702                  ACE_TEXT("no link for publication %C, ")
00703                  ACE_TEXT("not sending response.\n"),
00704                  OPENDDS_STRING(converter).c_str()));
00705     }
00706 
00707     return false;
00708   }
00709 
00710   DataLinkSet singular;
00711   singular.insert_link(found->second);
00712   singular.send_response(peer, header, move(payload));
00713   return true;
00714 }

Here is the call graph for this function:

SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control ( SendStateDataSampleList  send_list,
const DataSampleHeader header,
Message_Block_Ptr  msg,
const RepoId destination 
)

Definition at line 727 of file TransportClient.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().

00731 {
00732   ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00733                    send_transaction_lock_, SEND_CONTROL_ERROR);
00734   if (send_list.head()) {
00735     send_i(send_list, 0);
00736   }
00737   return send_control_to(header, move(msg), destination);
00738 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::stop_associating ( const GUID_t repos,
CORBA::ULong  length 
)

Definition at line 547 of file TransportClient.cpp.

References lock_, and pending_.

00548 {
00549   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00550 
00551   if (repos == 0 || length == 0) {
00552     return;
00553   } else {
00554     for (CORBA::ULong i = 0; i < length; ++i) {
00555       pending_.erase(repos[i]);
00556     }
00557   }
00558 }

void OpenDDS::DCPS::TransportClient::stop_associating (  ) 
bool OpenDDS::DCPS::TransportClient::swap_bytes (  )  const [inline]
virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done ( int  ,
const RepoId  
) [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.

Definition at line 136 of file TransportClient.h.

Referenced by use_datalink_i().

00136 {}

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
)

Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 650 of file TransportClient.cpp.

References impls_, and lock_.

00653 {
00654   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00655   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00656        pos != limit;
00657        ++pos) {
00658     (*pos)->unregister_for_reader(participant, writerid, readerid);
00659   }
00660 }

void OpenDDS::DCPS::TransportClient::unregister_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid 
)

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.

Definition at line 678 of file TransportClient.cpp.

References impls_, and lock_.

00681 {
00682   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00683   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00684        pos != limit;
00685        ++pos) {
00686     (*pos)->unregister_for_writer(participant, readerid, writerid);
00687   }
00688 }

void OpenDDS::DCPS::TransportClient::use_datalink ( const RepoId remote_id,
const DataLink_rch link 
)

Definition at line 441 of file TransportClient.cpp.

References lock_, and use_datalink_i().

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::handle_timeout().

00443 {
00444   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00445 
00446   use_datalink_i(remote_id, link, guard);
00447 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportClient::use_datalink_i ( const RepoId remote_id,
const DataLink_rch link,
Guard guard 
) [private]

Definition at line 450 of file TransportClient.cpp.

References add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pending_, pending_assoc_timer_, ACE_Guard< ACE_LOCK >::release(), transport_assoc_done(), and VDBG_LVL.

Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().

00453 {
00454   // Try to make a local copy of remote_id to use in calls
00455   // because the reference could be invalidated if the caller
00456   // reference location is deleted (i.e. in stop_accepting_or_connecting
00457   // if use_datalink_i was called from passive_connection)
00458   // Does changing this from a reference to a local affect anything going forward?
00459   RepoId remote_id(remote_id_ref);
00460 
00461   GuidConverter peerId_conv(remote_id);
00462   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00463             "TransportClient(%@) using datalink[%@] from %C\n",
00464             this,
00465             link.in(),
00466             OPENDDS_STRING(peerId_conv).c_str()), 0);
00467 
00468   PendingMap::iterator iter = pending_.find(remote_id);
00469 
00470   if (iter == pending_.end()) {
00471     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00472                         "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00473                         this,
00474                         link.in(),
00475                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00476     return;
00477   }
00478 
00479   PendingAssoc_rch pend = iter->second;
00480   const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00481   bool ok = false;
00482 
00483   if (link.is_nil()) {
00484 
00485     if (pend->active_ && pend->initiate_connect(this, guard)) {
00486       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00487                           "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00488                           this,
00489                           link.in(),
00490                           OPENDDS_STRING(peerId_conv).c_str()), 0);
00491       return;
00492     }
00493 
00494   } else { // link is ready to use
00495     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00496               "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00497               this,
00498               link.in(),
00499               OPENDDS_STRING(peerId_conv).c_str()), 0);
00500 
00501     add_link(link, remote_id);
00502     ok = true;
00503   }
00504 
00505   // either link is valid or assoc failed, clean up pending object
00506   // for passive side processing
00507   if (!pend->active_) {
00508 
00509     for (size_t i = 0; i < pend->impls_.size(); ++i) {
00510       pend->impls_[i]->stop_accepting_or_connecting(*this, pend->data_.remote_id_);
00511     }
00512   }
00513 
00514   pending_.erase(iter);
00515 
00516   guard.release();
00517 
00518   pending_assoc_timer_->cancel_timer(this, pend);
00519 
00520   transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00521 }

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Member Data Documentation

Definition at line 293 of file TransportClient.h.

Referenced by enable_transport_using_config().

Definition at line 297 of file TransportClient.h.

Referenced by enable_transport_using_config().

Definition at line 275 of file TransportClient.h.

Referenced by add_link(), disassociate(), send_control_to(), and send_response().

Definition at line 293 of file TransportClient.h.

Referenced by associate(), and enable_transport_using_config().

Definition at line 281 of file TransportClient.h.

Referenced by send_i().

Definition at line 282 of file TransportClient.h.

Referenced by send_i().

Definition at line 289 of file TransportClient.h.

Referenced by send_i().

Definition at line 295 of file TransportClient.h.

Referenced by enable_transport_using_config().

Definition at line 263 of file TransportClient.h.

Referenced by associate(), use_datalink_i(), and ~TransportClient().

Definition at line 293 of file TransportClient.h.

Referenced by associate(), and enable_transport_using_config().

Definition at line 303 of file TransportClient.h.

Referenced by associate().

These are the links being used during the call to send(). This is made a member of the class to minimize allocation/deallocations of the data link set.

Definition at line 273 of file TransportClient.h.

Definition at line 280 of file TransportClient.h.

Referenced by send(), and send_w_control().

Definition at line 293 of file TransportClient.h.

Referenced by enable_transport_using_config().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1