OpenDDS::DCPS::TransportClient Class Reference

Mix-in class for DDS entities which directly use the transport layer. More...

#include <TransportClient.h>

Inheritance diagram for OpenDDS::DCPS::TransportClient:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportClient:

Collaboration graph
[legend]
List of all members.

Public Types

 ASSOC_OK = 1
 ASSOC_ACTIVE = 2
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }

Public Member Functions

void use_datalink (const RepoId &remote_id, const DataLink_rch &link)

Protected Member Functions

 TransportClient ()
virtual ~TransportClient ()
void enable_transport (bool reliable, bool durable)
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
bool swap_bytes () const
bool cdr_encapsulation () const
const TransportLocatorSeqconnection_info () const
bool associate (const AssociationData &peer, bool active)
void disassociate (const RepoId &peerId)
void stop_associating ()
void stop_associating (const GUID_t *repos, CORBA::ULong length)
void send_final_acks ()
void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
void register_for_writer (const RepoId &participant, const RepoId &readerid, const RepoId &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
void unregister_for_writer (const RepoId &participant, const RepoId &readerid, const RepoId &writerid)
bool send_response (const RepoId &peer, const DataSampleHeader &header, ACE_Message_Block *payload)
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, ACE_Message_Block *msg, const RepoId &destination)
SendControlStatus send_control (const DataSampleHeader &header, ACE_Message_Block *msg)
SendControlStatus send_control_to (const DataSampleHeader &header, ACE_Message_Block *msg, const RepoId &destination)
bool remove_sample (const DataSampleElement *sample)
bool remove_all_msgs ()
virtual void add_link (const DataLink_rch &link, const RepoId &peer)
void on_notification_of_connection_deletion (const RepoId &peerId)

Private Types

typedef ACE_Guard< ACE_Thread_Mutex > Guard
typedef ACE_Reverse_Lock<
ACE_Thread_Mutex > 
Reverse_Lock_t

Private Member Functions

virtual bool check_transport_qos (const TransportInst &inst)=0
virtual const RepoIdget_repo_id () const =0
virtual DDS::DomainId_t domain_id () const =0
virtual Priority get_priority_value (const AssociationData &data) const =0
virtual void transport_assoc_done (int, const RepoId &)
void transport_detached (TransportImpl *which)
void use_datalink_i (const RepoId &remote_id, const DataLink_rch &link, Guard &guard)
TransportSendListenerget_send_listener ()
TransportReceiveListenerget_receive_listener ()
bool initiate_connect_i (TransportImpl::AcceptConnectResult &result, const TransportImpl_rch impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
void send_i (SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
typedef OPENDDS_MAP_CMP (RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex
typedef OPENDDS_VECTOR (TransportImpl_rch) ImplsType
typedef OPENDDS_MAP_CMP (RepoId, PendingAssoc *, GUID_tKeyLessThan) PendingMap

Private Attributes

PendingAssocTimerpending_assoc_timer_
ImplsType impls_
PendingMap pending_
DataLinkSet links_
DataLinkIndex links_waiting_for_on_deleted_callback_
DataLinkSet send_links_
DataLinkIndex data_link_index_
ACE_Thread_Mutex send_transaction_lock_
ACE_UINT64 expected_transaction_id_
ACE_UINT64 max_transaction_id_seen_
DataSampleElementmax_transaction_tail_
bool swap_bytes_
bool cdr_encapsulation_
bool reliable_
bool durable_
ACE_Time_Value passive_connect_duration_
TransportLocatorSeq conn_info_
ACE_Thread_Mutex lock_
Reverse_Lock_t reverse_lock_
RepoId repo_id_

Friends

class TransportImpl
class ::DDS_TEST

Classes

struct  PendingAssoc
class  PendingAssocTimer

Detailed Description

Mix-in class for DDS entities which directly use the transport layer.

DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.

Definition at line 49 of file TransportClient.h.


Member Typedef Documentation

typedef ACE_Guard<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Guard [private]

Definition at line 142 of file TransportClient.h.

typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Reverse_Lock_t [private]

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 309 of file TransportClient.h.


Member Enumeration Documentation

anonymous enum

Enumerator:
ASSOC_OK 
ASSOC_ACTIVE 

Definition at line 55 of file TransportClient.h.

00055 { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportClient::TransportClient (  )  [protected]

Definition at line 32 of file TransportClient.cpp.

00033   : pending_assoc_timer_(new PendingAssocTimer (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00034   , expected_transaction_id_(1)
00035   , max_transaction_id_seen_(0)
00036   , max_transaction_tail_(0)
00037   , swap_bytes_(false)
00038   , cdr_encapsulation_(false)
00039   , reliable_(false)
00040   , durable_(false)
00041   , reverse_lock_(lock_)
00042   , repo_id_(GUID_UNKNOWN)
00043 {
00044 }

OpenDDS::DCPS::TransportClient::~TransportClient (  )  [protected, virtual]

Definition at line 46 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportClient::PendingAssocTimer::cancel_timer(), OpenDDS::DCPS::ReactorInterceptor::destroy(), impls_, links_, links_waiting_for_on_deleted_callback_, lock_, OpenDDS::DCPS::DataLinkSet::map(), OPENDDS_STRING, OPENDDS_VECTOR(), pending_, pending_assoc_timer_, repo_id_, stop_associating(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::ReactorInterceptor::wait().

00047 {
00048   if (Transport_debug_level > 5) {
00049     GuidConverter converter(repo_id_);
00050     ACE_DEBUG((LM_DEBUG,
00051                ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00052                OPENDDS_STRING(converter).c_str()));
00053   }
00054 
00055   this->stop_associating();
00056 
00057   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00058 
00059   for (DataLinkIndex::iterator iter = links_waiting_for_on_deleted_callback_.begin();
00060        iter != links_waiting_for_on_deleted_callback_.end(); ++iter) {
00061     if (Transport_debug_level > 5) {
00062       GuidConverter converter(repo_id_);
00063       ACE_DEBUG((LM_DEBUG,
00064                  ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C from link waiting for callback\n"),
00065                  this,
00066                  OPENDDS_STRING(converter).c_str()));
00067     }
00068     iter->second->remove_listener(repo_id_);
00069   }
00070 
00071   for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00072        iter != links_.map().end(); ++iter) {
00073     if (Transport_debug_level > 5) {
00074       GuidConverter converter(repo_id_);
00075       ACE_DEBUG((LM_DEBUG,
00076                  ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C\n"),
00077                  this,
00078                  OPENDDS_STRING(converter).c_str()));
00079     }
00080     iter->second->remove_listener(repo_id_);
00081   }
00082 
00083   for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00084     for (size_t i = 0; i < impls_.size(); ++i) {
00085       impls_[i]->stop_accepting_or_connecting(this, it->second->data_.remote_id_);
00086     }
00087 
00088     pending_assoc_timer_->cancel_timer(this, it->second);
00089   }
00090 
00091   pending_assoc_timer_->wait();
00092   pending_assoc_timer_->destroy();
00093 
00094   for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00095        it != impls_.end(); ++it) {
00096 
00097     (*it)->detach_client(this);
00098   }
00099 }


Member Function Documentation

void OpenDDS::DCPS::TransportClient::add_link ( const DataLink_rch link,
const RepoId peer 
) [protected, virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl.

Definition at line 649 of file TransportClient.cpp.

References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLinkSet::insert_link(), links_, and repo_id_.

Referenced by OpenDDS::DCPS::DataReaderImpl::add_link(), and use_datalink_i().

00650 {
00651   links_.insert_link(link.in());
00652   data_link_index_[peer] = link;
00653 
00654   TransportReceiveListener* trl = get_receive_listener();
00655 
00656   if (trl) {
00657     link->make_reservation(peer, repo_id_, trl);
00658 
00659   } else {
00660     link->make_reservation(peer, repo_id_, get_send_listener());
00661   }
00662 }

bool OpenDDS::DCPS::TransportClient::associate ( const AssociationData peer,
bool  active 
) [protected]

Definition at line 243 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportClient::PendingAssoc::active_, OpenDDS::DCPS::TransportClient::PendingAssoc::attribs_, OpenDDS::DCPS::TransportClient::PendingAssoc::blob_index_, OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::DCPS_debug_level, durable_, get_priority_value(), get_repo_id(), OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, impls_, OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportClient::PendingAssocTimer::schedule_timer(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, use_datalink_i(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Reader::assoc(), and OpenDDS::RTPS::Sedp::Writer::assoc().

00244 {
00245   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00246 
00247   repo_id_ = get_repo_id();
00248 
00249   if (impls_.empty()) {
00250     if (DCPS_debug_level) {
00251       GuidConverter writer_converter(this->repo_id_);
00252       GuidConverter reader_converter(data.remote_id_);
00253       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00254                  ACE_TEXT("local %C remote %C no available impls\n"),
00255                  OPENDDS_STRING(writer_converter).c_str(),
00256                  OPENDDS_STRING(reader_converter).c_str()));
00257     }
00258     return false;
00259   }
00260 
00261   bool all_impls_shut_down = true;
00262   for (size_t i = 0; i < impls_.size(); ++i) {
00263     if (!impls_.at(i)->is_shut_down()) {
00264       all_impls_shut_down = false;
00265       break;
00266     }
00267   }
00268 
00269   if (all_impls_shut_down) {
00270     if (DCPS_debug_level) {
00271       GuidConverter writer_converter(this->repo_id_);
00272       GuidConverter reader_converter(data.remote_id_);
00273       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00274                  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00275                  OPENDDS_STRING(writer_converter).c_str(),
00276                  OPENDDS_STRING(reader_converter).c_str()));
00277     }
00278     return false;
00279   }
00280 
00281   PendingMap::iterator iter = pending_.find(data.remote_id_);
00282 
00283   if (iter == pending_.end()) {
00284     RepoId remote_copy(data.remote_id_);
00285     iter = pending_.insert(std::make_pair(remote_copy, new PendingAssoc())).first;
00286 
00287     GuidConverter tc_assoc(this->repo_id_);
00288     GuidConverter remote_new(data.remote_id_);
00289     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00290               "between %C and remote %C\n",
00291               OPENDDS_STRING(tc_assoc).c_str(),
00292               OPENDDS_STRING(remote_new).c_str()), 0);
00293   } else {
00294     if (iter->second->removed_) {
00295       iter->second->removed_ = false;
00296       GuidConverter tc_assoc(this->repo_id_);
00297       GuidConverter remote_new(data.remote_id_);
00298       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate found existing PendingAssoc "
00299                 "between %C and remote %C, set removed to false to continue using this pending association\n",
00300                 OPENDDS_STRING(tc_assoc).c_str(),
00301                 OPENDDS_STRING(remote_new).c_str()), 0);
00302     } else {
00303       ACE_ERROR((LM_ERROR,
00304                  ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00305                  ACE_TEXT("already associating with remote.\n")));
00306 
00307       return false;
00308     }
00309   }
00310 
00311   PendingAssoc& pend = *(iter->second);
00312   pend.active_ = active;
00313   pend.impls_.clear();
00314   pend.blob_index_ = 0;
00315   pend.data_ = data;
00316   pend.attribs_.local_id_ = repo_id_;
00317   pend.attribs_.priority_ = get_priority_value(data);
00318   pend.attribs_.local_reliable_ = reliable_;
00319   pend.attribs_.local_durable_ = durable_;
00320 
00321   if (active) {
00322     pend.impls_.reserve(impls_.size());
00323     std::reverse_copy(impls_.begin(), impls_.end(),
00324                       std::back_inserter(pend.impls_));
00325 
00326     pend.initiate_connect(this, guard);
00327 
00328     //Revisit if this should be used instead of always returning true.
00329     //return pend.initiate_connect(this, guard);
00330     return true;
00331 
00332   } else { // passive
00333 
00334     // call accept_datalink for each impl / blob pair of the same type
00335     for (size_t i = 0; i < impls_.size(); ++i) {
00336       pend.impls_.push_back(impls_[i]);
00337       const OPENDDS_STRING type = impls_[i]->transport_type();
00338 
00339       for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00340         if (data.remote_data_[j].transport_type.in() == type) {
00341           const TransportImpl::RemoteTransport remote = {
00342             data.remote_id_, data.remote_data_[j].data,
00343             data.publication_transport_priority_,
00344             data.remote_reliable_, data.remote_durable_};
00345 
00346           TransportImpl::AcceptConnectResult res;
00347           {
00348             // This thread acquired lock_ at the beginning of this method.  Calling accept_datalink might require getting the lock for the transport's reactor.
00349             // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
00350             // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_.  The locking order in this case is transport reactor lock -> lock_.
00351             // To avoid deadlock, we must reverse the lock.
00352             ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00353             res = impls_[i]->accept_datalink(remote, pend.attribs_, this);
00354           }
00355 
00356           //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
00357           PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00358 
00359           if (iter_after_accept == pending_.end()) {
00360             //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
00361             //active side connection and completed, thus pend was removed from pending_.  Can return true.
00362             return true;
00363           }
00364 
00365           if (res.success_ && !res.link_.is_nil()) {
00366 
00367             use_datalink_i(data.remote_id_, res.link_, guard);
00368 
00369             return true;
00370           }
00371         }
00372       }
00373 
00374       //pend.impls_.push_back(impls_[i]);
00375     }
00376 
00377     pending_assoc_timer_->schedule_timer(this, iter->second);
00378   }
00379 
00380   return true;
00381 }

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation (  )  const [inline, protected]

Definition at line 69 of file TransportClient.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().

00069 { return cdr_encapsulation_; }

virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos ( const TransportInst inst  )  [private, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Referenced by enable_transport_using_config().

const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info (  )  const [inline, protected]

Definition at line 70 of file TransportClient.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().

00070 { return conn_info_; }

void OpenDDS::DCPS::TransportClient::disassociate ( const RepoId peerId  )  [protected]

Definition at line 737 of file TransportClient.cpp.

References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), links_, links_waiting_for_on_deleted_callback_, lock_, OPENDDS_STRING, pending_, OpenDDS::DCPS::DataLinkSet::remove_link(), repo_id_, and VDBG_LVL.

Referenced by OpenDDS::RTPS::Sedp::disassociate().

00738 {
00739   GuidConverter peerId_conv(peerId);
00740   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00741             "TransportClient(%@) disassociating from %C\n",
00742             this,
00743             OPENDDS_STRING(peerId_conv).c_str()), 5);
00744 
00745   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00746 
00747   const PendingMap::iterator iter = pending_.find(peerId);
00748 
00749   if (iter != pending_.end()) {
00750     iter->second->removed_ = true;
00751     return;
00752   }
00753 
00754   const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00755 
00756   if (found == data_link_index_.end()) {
00757     if (DCPS_debug_level > 4) {
00758       const GuidConverter converter(peerId);
00759       ACE_DEBUG((LM_DEBUG,
00760                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00761                  ACE_TEXT("no link for remote peer %C\n"),
00762                  OPENDDS_STRING(converter).c_str()));
00763     }
00764 
00765     return;
00766   }
00767 
00768   const DataLink_rch link = found->second;
00769 
00770   //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
00771   //otherwise it could be removed in transport_detached()
00772   data_link_index_.erase(found);
00773   DataLinkSetMap released;
00774 
00775     if (DCPS_debug_level > 4) {
00776       ACE_DEBUG((LM_DEBUG,
00777                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00778                  ACE_TEXT("about to release_reservations for link[%@] \n"),
00779                  link.in()));
00780     }
00781 
00782     link->release_reservations(peerId, repo_id_, released);
00783 
00784   if (!released.empty()) {
00785 
00786     if (DCPS_debug_level > 4) {
00787       ACE_DEBUG((LM_DEBUG,
00788                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00789                  ACE_TEXT("about to remove_link[%@] from links_\n"),
00790                  link.in()));
00791     }
00792     links_.remove_link(link);
00793 
00794     if (link->issues_on_deleted_callback()) {
00795       if (DCPS_debug_level > 4) {
00796         GuidConverter converter(repo_id_);
00797         ACE_DEBUG((LM_DEBUG,
00798                    ACE_TEXT("(%P|%t) TransportClient::disassociate: wait for connection deleted callback for %C on link[%@]\n"),
00799                    OPENDDS_STRING(converter).c_str(),
00800                    link.in()));
00801       }
00802       links_waiting_for_on_deleted_callback_[peerId] = link;
00803     } else {
00804       if (DCPS_debug_level > 4) {
00805         GuidConverter converter(repo_id_);
00806         ACE_DEBUG((LM_DEBUG,
00807                    ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00808                    OPENDDS_STRING(converter).c_str(),
00809                    link.in()));
00810       }
00811       // Datalink is no longer used for any remote peer by this TransportClient
00812       link->remove_listener(repo_id_);
00813     }
00814   }
00815 }

virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id (  )  const [private, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Referenced by enable_transport().

void OpenDDS::DCPS::TransportClient::enable_transport ( bool  reliable,
bool  durable 
) [protected]

Definition at line 102 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::instance(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().

00103 {
00104   // Search for a TransportConfig to use:
00105   TransportConfig_rch tc;
00106 
00107   // 1. If this object is an Entity, check if a TransportConfig has been
00108   //    bound either directly to this entity or to a parent entity.
00109   for (const EntityImpl* ent = dynamic_cast<const EntityImpl*>(this);
00110        ent && tc.is_nil(); ent = ent->parent()) {
00111     tc = ent->transport_config();
00112   }
00113 
00114   if (tc.is_nil()) {
00115     TransportRegistry* const reg = TransportRegistry::instance();
00116     // 2. Check for a TransportConfig that is the default for this Domain.
00117     tc = reg->domain_default_config(domain_id());
00118 
00119     if (tc.is_nil()) {
00120       // 3. Use the global_config if one has been set.
00121       tc = reg->global_config();
00122 
00123       if (!tc.is_nil() && tc->instances_.empty()
00124           && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00125         // 4. Set the "fallback option" if the global_config is empty.
00126         //    (only applies if the user hasn't changed the global config)
00127         tc = reg->fix_empty_default();
00128       }
00129     }
00130   }
00131 
00132   if (tc.is_nil()) {
00133     ACE_ERROR((LM_ERROR,
00134                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00135                ACE_TEXT("No TransportConfig found.\n")));
00136     throw Transport::NotConfigured();
00137   }
00138 
00139   enable_transport_using_config(reliable, durable, tc);
00140 }

void OpenDDS::DCPS::TransportClient::enable_transport_using_config ( bool  reliable,
bool  durable,
const TransportConfig_rch tc 
) [protected]

Definition at line 143 of file TransportClient.cpp.

References cdr_encapsulation_, check_transport_qos(), conn_info_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, impls_, OpenDDS::DCPS::RcHandle< T >::is_nil(), passive_connect_duration_, reliable_, and swap_bytes_.

Referenced by enable_transport().

00145 {
00146   swap_bytes_ = tc->swap_bytes_;
00147   cdr_encapsulation_ = false;
00148   reliable_ = reliable;
00149   durable_ = durable;
00150   unsigned long duration = tc->passive_connect_duration_;
00151   if (duration == 0) {
00152     duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00153     if (DCPS_debug_level) {
00154       ACE_DEBUG((LM_WARNING,
00155         ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00156         ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00157         ACE_TEXT("default value\n")));
00158     }
00159   }
00160   passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00161 
00162   const size_t n = tc->instances_.size();
00163 
00164   for (size_t i = 0; i < n; ++i) {
00165     TransportInst_rch inst = tc->instances_[i];
00166 
00167     if (check_transport_qos(*inst.in())) {
00168       TransportImpl_rch impl = inst->impl();
00169 
00170       if (!impl.is_nil()) {
00171         impl->attach_client(this);
00172         impls_.push_back(impl);
00173         const CORBA::ULong len = conn_info_.length();
00174         conn_info_.length(len + 1);
00175         impl->connection_info(conn_info_[len]);
00176         cdr_encapsulation_ |= inst->requires_cdr();
00177       }
00178     }
00179   }
00180 
00181   if (impls_.empty()) {
00182     ACE_ERROR((LM_ERROR,
00183                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00184                ACE_TEXT("No TransportImpl could be created.\n")));
00185     throw Transport::NotConfigured();
00186   }
00187 }

virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value ( const AssociationData data  )  const [private, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Referenced by associate().

TransportReceiveListener * OpenDDS::DCPS::TransportClient::get_receive_listener (  )  [private]

Definition at line 1087 of file TransportClient.cpp.

Referenced by add_link().

01088 {
01089   return dynamic_cast<TransportReceiveListener*>(this);
01090 }

virtual const RepoId& OpenDDS::DCPS::TransportClient::get_repo_id (  )  const [private, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.

Referenced by associate(), and send_final_acks().

TransportSendListener * OpenDDS::DCPS::TransportClient::get_send_listener (  )  [private]

Definition at line 1081 of file TransportClient.cpp.

Referenced by add_link(), send_control(), and send_control_to().

01082 {
01083   return dynamic_cast<TransportSendListener*>(this);
01084 }

bool OpenDDS::DCPS::TransportClient::initiate_connect_i ( TransportImpl::AcceptConnectResult result,
const TransportImpl_rch  impl,
const TransportImpl::RemoteTransport remote,
const TransportImpl::ConnectionAttribs attribs_,
Guard guard 
) [private]

Definition at line 395 of file TransportClient.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, pending_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().

00400 {
00401   if (!guard.locked()) {
00402     //don't own the lock_ so can't release it...shouldn't happen
00403     GuidConverter local(repo_id_);
00404     GuidConverter remote_conv(remote.repo_id_);
00405     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00406                         "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00407                         OPENDDS_STRING(local).c_str(),
00408                         OPENDDS_STRING(remote_conv).c_str()), 0);
00409     return false;
00410   }
00411 
00412   {
00413     //can't call connect while holding lock due to possible reactor deadlock
00414     guard.release();
00415     GuidConverter local(repo_id_);
00416     GuidConverter remote_conv(remote.repo_id_);
00417     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00418                         "attempt to connect_datalink between local %C and remote %C\n",
00419                         OPENDDS_STRING(local).c_str(),
00420                         OPENDDS_STRING(remote_conv).c_str()), 0);
00421     result = impl->connect_datalink(remote, attribs_, this);
00422     if (!result.success_) {
00423       if (DCPS_debug_level) {
00424         GuidConverter writer_converter(repo_id_);
00425         GuidConverter reader_converter(remote.repo_id_);
00426         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00427                    ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00428                    OPENDDS_STRING(writer_converter).c_str(),
00429                    OPENDDS_STRING(reader_converter).c_str()));
00430       }
00431       return false;
00432     }
00433     guard.acquire();
00434   }
00435 
00436   //Check to make sure the pending assoc still exists in the map and hasn't been slated for removal
00437   //figure out how to respond to these possible results that occurred while lock was released to connect
00438   PendingMap::iterator iter = pending_.find(remote.repo_id_);
00439 
00440   if (iter == pending_.end()) {
00441     GuidConverter local(repo_id_);
00442     GuidConverter remote_conv(remote.repo_id_);
00443     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00444                         "cannot find pending association after connecting datalink between local %C and remote %C\n",
00445                         OPENDDS_STRING(local).c_str(),
00446                         OPENDDS_STRING(remote_conv).c_str()), 0);
00447     return false;
00448     //log some sort of error message...
00449     //PendingAssoc's are only erased from pending_ in use_datalink_i after
00450 
00451   } else {
00452     if (iter->second->removed_) {
00453       GuidConverter local(repo_id_);
00454       GuidConverter remote_conv(remote.repo_id_);
00455       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00456                           "pending association marked for removal already, after connecting datalink between local %C and remote %C\n",
00457                           OPENDDS_STRING(local).c_str(),
00458                           OPENDDS_STRING(remote_conv).c_str()), 0);
00459       //this occurs if the transport client was told to disassociate while connecting
00460       //disassociate cleans up everything except this local AcceptConnectResult whose destructor
00461       //should take care of it because link has not been shifted into links_ by use_datalink_i
00462       return false;
00463 
00464     }
00465 
00466   }
00467   GuidConverter local(repo_id_);
00468   GuidConverter remote_conv(remote.repo_id_);
00469   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00470                       "connection between local %C and remote %C initiation successful\n",
00471                       OPENDDS_STRING(local).c_str(),
00472                       OPENDDS_STRING(remote_conv).c_str()), 0);
00473   return true;
00474 }

void OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion ( const RepoId peerId  )  [protected]

Definition at line 665 of file TransportClient.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, links_waiting_for_on_deleted_callback_, lock_, OPENDDS_STRING, repo_id_, and VDBG_LVL.

Referenced by OpenDDS::DCPS::DataWriterImpl::notify_connection_deleted(), and OpenDDS::DCPS::DataReaderImpl::notify_connection_deleted().

00666 {
00667   DBG_ENTRY_LVL("TransportClient","on_notification_of_connection_deletion",6);
00668 
00669   GuidConverter peerId_conv(peerId);
00670   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::on_notification_of_connection_deletion "
00671             "TransportClient(%@) connection to %C deleted\n",
00672             this,
00673             OPENDDS_STRING(peerId_conv).c_str()), 5);
00674 
00675   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00676 
00677   const DataLinkIndex::iterator found = links_waiting_for_on_deleted_callback_.find(peerId);
00678 
00679   if (found == links_waiting_for_on_deleted_callback_.end()) {
00680     if (DCPS_debug_level > 4) {
00681       const GuidConverter converter(peerId);
00682       ACE_DEBUG((LM_DEBUG,
00683                  ACE_TEXT("(%P|%t) TransportClient::on_notification_of_connection_deletion: ")
00684                  ACE_TEXT("no link for remote peer %C\n"),
00685                  OPENDDS_STRING(converter).c_str()));
00686     }
00687 
00688     return;
00689   }
00690 
00691   const DataLink_rch link = found->second;
00692 
00693   //now that an _rch is created for the link, remove the iterator from links_waiting_for_on_deleted_callback_ while still holding lock
00694   links_waiting_for_on_deleted_callback_.erase(found);
00695 
00696   link->remove_listener(repo_id_);
00697 }

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( RepoId  ,
PendingAssoc ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP ( RepoId  ,
DataLink_rch  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR ( TransportImpl_rch   )  [private]

Referenced by OpenDDS::DCPS::RecorderImpl::signal_liveliness(), transport_detached(), and ~TransportClient().

void OpenDDS::DCPS::TransportClient::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
OpenDDS::DCPS::DiscoveryListener listener 
) [protected]

Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 818 of file TransportClient.cpp.

References impls_, and lock_.

Referenced by OpenDDS::DCPS::ReplayerImpl::register_for_reader(), and OpenDDS::DCPS::DataWriterImpl::register_for_reader().

00823 {
00824   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00825   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00826        pos != limit;
00827        ++pos) {
00828     (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00829   }
00830 }

void OpenDDS::DCPS::TransportClient::register_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [protected]

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.

Definition at line 846 of file TransportClient.cpp.

References impls_, and lock_.

Referenced by OpenDDS::DCPS::RecorderImpl::register_for_writer(), and OpenDDS::DCPS::DataReaderImpl::register_for_writer().

00851 {
00852   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00853   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00854        pos != limit;
00855        ++pos) {
00856     (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00857   }
00858 }

bool OpenDDS::DCPS::TransportClient::remove_all_msgs (  )  [protected]

Definition at line 1129 of file TransportClient.cpp.

References links_, OpenDDS::DCPS::DataLinkSet::remove_all_msgs(), and repo_id_.

01130 {
01131   return links_.remove_all_msgs(repo_id_);
01132 }

bool OpenDDS::DCPS::TransportClient::remove_sample ( const DataSampleElement sample  )  [protected]

Definition at line 1123 of file TransportClient.cpp.

References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().

01124 {
01125   return links_.remove_sample(sample);
01126 }

void OpenDDS::DCPS::TransportClient::send ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id = 0 
) [protected]

Definition at line 902 of file TransportClient.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), OpenDDS::DCPS::DataWriterImpl::write(), and OpenDDS::RTPS::Sedp::Writer::write_sample().

00903 {
00904   if (send_list.head() == 0) {
00905     return;
00906   }
00907   ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00908   send_i(send_list, transaction_id);
00909 }

SendControlStatus OpenDDS::DCPS::TransportClient::send_control ( const DataSampleHeader header,
ACE_Message_Block *  msg 
) [protected]

Reimplemented in OpenDDS::DCPS::DataWriterImpl.

Definition at line 1093 of file TransportClient.cpp.

References get_send_listener(), header, links_, repo_id_, and OpenDDS::DCPS::DataLinkSet::send_control().

Referenced by OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::RTPS::Sedp::Writer::write_control_msg().

01095 {
01096   TransportSendListener* listener = get_send_listener();
01097 
01098   return links_.send_control(repo_id_, listener, header, msg);
01099 }

SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to ( const DataSampleHeader header,
ACE_Message_Block *  msg,
const RepoId destination 
) [protected]

Definition at line 1102 of file TransportClient.cpp.

References data_link_index_, get_send_listener(), header, OpenDDS::DCPS::DataLinkSet::insert_link(), links_, lock_, repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::DataLinkSet::tsce_allocator().

Referenced by send_w_control().

01105 {
01106   DataLinkSet singular;
01107   {
01108     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
01109     DataLinkIndex::iterator found = data_link_index_.find(destination);
01110 
01111     if (found == data_link_index_.end()) {
01112       msg->release();
01113       return SEND_CONTROL_ERROR;
01114     }
01115 
01116     singular.insert_link(found->second.in());
01117   }
01118   return singular.send_control(repo_id_, get_send_listener(), header, msg,
01119                                &links_.tsce_allocator());
01120 }

void OpenDDS::DCPS::TransportClient::send_final_acks (  )  [protected]

Definition at line 731 of file TransportClient.cpp.

References get_repo_id(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().

Referenced by OpenDDS::DCPS::DataReaderImpl::prepare_to_delete().

00732 {
00733   links_.send_final_acks (get_repo_id());
00734 }

void OpenDDS::DCPS::TransportClient::send_i ( SendStateDataSampleList  send_list,
ACE_UINT64  transaction_id 
) [private]

Definition at line 926 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.

Referenced by send(), and send_w_control().

00927 {
00928   if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00929     if (transaction_id > max_transaction_id_seen_) {
00930       max_transaction_id_seen_ = transaction_id;
00931       max_transaction_tail_ = send_list.tail();
00932     }
00933     return;
00934   } else /* transaction_id == expected_transaction_id */ {
00935 
00936     DataSampleElement* cur = send_list.head();
00937     if (max_transaction_tail_ == 0) {
00938       //Means no future transaction beat this transaction into send
00939       if (transaction_id != 0)
00940         max_transaction_id_seen_ = expected_transaction_id_;
00941       // Only send this current transaction
00942       max_transaction_tail_ = send_list.tail();
00943     }
00944     DataLinkSet send_links;
00945 
00946     while (true) {
00947       // VERY IMPORTANT NOTE:
00948       //
00949       // We have to be very careful in how we deal with the current
00950       // DataSampleElement.  The issue is that once we have invoked
00951       // data_delivered() on the send_listener_ object, or we have invoked
00952       // send() on the pub_links, we can no longer access the current
00953       // DataSampleElement!Thus, we need to get the next
00954       // DataSampleElement (pointer) from the current element now,
00955       // while it is safe.
00956       DataSampleElement* next_elem;
00957       if (cur != max_transaction_tail_) {
00958         next_elem = cur->get_next_send_sample();
00959       } else {
00960         next_elem = max_transaction_tail_;
00961       }
00962       DataLinkSet_rch pub_links =
00963         (cur->get_num_subs() > 0)
00964         ? links_.select_links(cur->get_sub_ids(), cur->get_num_subs())
00965         : DataLinkSet_rch(&links_, false);
00966 
00967       if (pub_links.is_nil() || pub_links->empty()) {
00968         // NOTE: This is the "local publisher id is not currently
00969         //       associated with any remote subscriber ids" case.
00970 
00971         if (DCPS_debug_level > 4) {
00972           GuidConverter converter(cur->get_pub_id());
00973           ACE_DEBUG((LM_DEBUG,
00974                      ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00975                      ACE_TEXT("no links for publication %C, ")
00976                      ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00977                      OPENDDS_STRING(converter).c_str(),
00978                      cur,
00979                      cur->transaction_id()));
00980         }
00981 
00982         // We tell the send_listener_ that all of the remote subscriber ids
00983         // that wanted the data (all zero of them) have indeed received
00984         // the data.
00985         cur->get_send_listener()->data_delivered(cur);
00986 
00987       } else {
00988         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00989                   , cur), 5);
00990 
00991   #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00992 
00993         // Content-Filtering adjustment to the pub_links:
00994         // - If the sample should be filtered out of all subscriptions on a given
00995         //   DataLink, then exclude that link from the subset that we'll send to.
00996         // - If the sample should be filtered out of some (or none) of the subs,
00997         //   then record that information in the DataSampleElement so that the
00998         //   header's content_filter_entries_ can be marshaled before it's sent.
00999         if (cur->filter_out_.ptr()) {
01000           DataLinkSet_rch subset;
01001           DataLinkSet::GuardType guard(pub_links->lock());
01002           typedef DataLinkSet::MapType MapType;
01003           MapType& map = pub_links->map();
01004 
01005           for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
01006             size_t n_subs;
01007             GUIDSeq_var ti =
01008               itr->second->target_intersection(cur->get_pub_id(),
01009                                                cur->filter_out_.in(), n_subs);
01010 
01011             if (ti.ptr() == 0 || ti->length() != n_subs) {
01012               if (!subset.in()) {
01013                 subset = new DataLinkSet;
01014               }
01015 
01016               subset->insert_link(itr->second.in());
01017               cur->filter_per_link_[itr->first] = ti._retn();
01018 
01019             } else {
01020               VDBG((LM_DEBUG,
01021                     "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
01022                     itr->second.in()));
01023             }
01024           }
01025 
01026           if (!subset.in()) {
01027             VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
01028             // similar to the "if (pub_links.is_nil())" case above, no links
01029             cur->get_send_listener()->data_delivered(cur);
01030             if (cur != max_transaction_tail_) {
01031               // Move on to the next DataSampleElement to send.
01032               cur = next_elem;
01033               continue;
01034             } else {
01035               break;
01036             }
01037           }
01038 
01039           pub_links = subset;
01040         }
01041 
01042   #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
01043 
01044         // This will do several things, including adding to the membership
01045         // of the send_links set.  Any DataLinks added to the send_links
01046         // set will be also told about the send_start() event.  Those
01047         // DataLinks (in the pub_links set) that are already in the
01048         // send_links set will not be told about the send_start() event
01049         // since they heard about it when they were inserted into the
01050         // send_links set.
01051         send_links.send_start(pub_links.in());
01052         if (cur->get_header().message_id_ != SAMPLE_DATA) {
01053           pub_links->send_control(cur);
01054         } else {
01055           pub_links->send(cur);
01056         }
01057       }
01058       if (cur != max_transaction_tail_) {
01059         // Move on to the next DataSampleElement to send.
01060         cur = next_elem;
01061       } else {
01062         break;
01063       }
01064     }
01065 
01066     // This will inform each DataLink in the set about the stop_send() event.
01067     // It will then clear the send_links_ set.
01068     //
01069     // The reason that the send_links_ set is cleared is because we continually
01070     // reuse the same send_links_ object over and over for each call to this
01071     // send method.
01072     RepoId pub_id(this->repo_id_);
01073     send_links.send_stop(pub_id);
01074     if (transaction_id != 0)
01075       expected_transaction_id_ = max_transaction_id_seen_ + 1;
01076     max_transaction_tail_ = 0;
01077   }
01078 }

bool OpenDDS::DCPS::TransportClient::send_response ( const RepoId peer,
const DataSampleHeader header,
ACE_Message_Block *  payload 
) [protected]

Definition at line 874 of file TransportClient.cpp.

References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, header, OpenDDS::DCPS::DataLinkSet::insert_link(), OPENDDS_STRING, and OpenDDS::DCPS::DataLinkSet::send_response().

00877 {
00878   DataLinkIndex::iterator found = data_link_index_.find(peer);
00879 
00880   if (found == data_link_index_.end()) {
00881     payload->release();
00882 
00883     if (DCPS_debug_level > 4) {
00884       GuidConverter converter(peer);
00885       ACE_DEBUG((LM_DEBUG,
00886                  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00887                  ACE_TEXT("no link for publication %C, ")
00888                  ACE_TEXT("not sending response.\n"),
00889                  OPENDDS_STRING(converter).c_str()));
00890     }
00891 
00892     return false;
00893   }
00894 
00895   DataLinkSet singular;
00896   singular.insert_link(found->second.in());
00897   singular.send_response(peer, header, payload);
00898   return true;
00899 }

SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control ( SendStateDataSampleList  send_list,
const DataSampleHeader header,
ACE_Message_Block *  msg,
const RepoId destination 
) [protected]

Definition at line 912 of file TransportClient.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::head(), header, OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().

00916 {
00917   ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00918                    send_transaction_lock_, SEND_CONTROL_ERROR);
00919   if (send_list.head()) {
00920     send_i(send_list, 0);
00921   }
00922   return send_control_to(header, msg, destination);
00923 }

void OpenDDS::DCPS::TransportClient::stop_associating ( const GUID_t repos,
CORBA::ULong  length 
) [protected]

Definition at line 713 of file TransportClient.cpp.

References lock_, and pending_.

00714 {
00715   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00716 
00717   if (repos == 0 || length == 0) {
00718     return;
00719   } else {
00720     for (CORBA::ULong i = 0; i < length; ++i) {
00721       PendingMap::iterator iter = pending_.find(repos[i]);
00722 
00723       if (iter != pending_.end()) {
00724         iter->second->removed_ = true;
00725       }
00726     }
00727   }
00728 }

void OpenDDS::DCPS::TransportClient::stop_associating (  )  [protected]

Definition at line 700 of file TransportClient.cpp.

References lock_, and pending_.

Referenced by OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), OpenDDS::DCPS::DataReaderImpl::prepare_to_delete(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::DataReaderImpl::remove_all_associations(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), and ~TransportClient().

00701 {
00702   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00703 
00704   PendingMap::iterator iter = pending_.begin();
00705 
00706   while (iter != pending_.end()) {
00707     iter->second->removed_ = true;
00708     ++iter;
00709   }
00710 }

bool OpenDDS::DCPS::TransportClient::swap_bytes (  )  const [inline, protected]

Definition at line 68 of file TransportClient.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().

00068 { return swap_bytes_; }

virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done ( int  ,
const RepoId  
) [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.

Definition at line 135 of file TransportClient.h.

Referenced by use_datalink_i().

00135 {}

void OpenDDS::DCPS::TransportClient::transport_detached ( TransportImpl which  )  [private]

Definition at line 190 of file TransportClient.cpp.

References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, impls_, links_, lock_, OpenDDS::DCPS::DataLinkSet::map(), OPENDDS_STRING, OPENDDS_VECTOR(), pending_, repo_id_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().

00191 {
00192 
00193   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00194 
00195   // Remove any DataLinks created by the 'which' TransportImpl from our local
00196   // data structures (both links_ and data_link_index_).
00197   for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00198        iter != links_.map().end();) {
00199     TransportImpl_rch impl = iter->second->impl();
00200 
00201     if (impl.in() == which) {
00202       for (DataLinkIndex::iterator it2 = data_link_index_.begin();
00203            it2 != data_link_index_.end();) {
00204         if (it2->second.in() == iter->second.in()) {
00205           data_link_index_.erase(it2++);
00206 
00207         } else {
00208           ++it2;
00209         }
00210       }
00211       if (DCPS_debug_level > 4) {
00212         GuidConverter converter(repo_id_);
00213         ACE_DEBUG((LM_DEBUG,
00214                    ACE_TEXT("(%P|%t) TransportClient::transport_detached: calling remove_listener %C on link[%@]\n"),
00215                    OPENDDS_STRING(converter).c_str(),
00216                    iter->second.in()));
00217       }
00218       iter->second->remove_listener(repo_id_);
00219       links_.map().erase(iter++);
00220 
00221     } else {
00222       ++iter;
00223     }
00224   }
00225 
00226   // Remove the 'which' TransportImpl from the impls_ list
00227   for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00228        it != impls_.end(); ++it) {
00229     if (it->in() == which) {
00230       impls_.erase(it);
00231 
00232       for (PendingMap::iterator it2 = pending_.begin();
00233            it2 != pending_.end(); ++it2) {
00234         which->stop_accepting_or_connecting(this, it2->first);
00235       }
00236 
00237       return;
00238     }
00239   }
00240 }

void OpenDDS::DCPS::TransportClient::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [protected]

Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 833 of file TransportClient.cpp.

References impls_, and lock_.

Referenced by OpenDDS::DCPS::ReplayerImpl::unregister_for_reader(), and OpenDDS::DCPS::DataWriterImpl::unregister_for_reader().

00836 {
00837   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00838   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00839        pos != limit;
00840        ++pos) {
00841     (*pos)->unregister_for_reader(participant, writerid, readerid);
00842   }
00843 }

void OpenDDS::DCPS::TransportClient::unregister_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid 
) [protected]

Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.

Definition at line 861 of file TransportClient.cpp.

References impls_, and lock_.

Referenced by OpenDDS::DCPS::RecorderImpl::unregister_for_writer(), and OpenDDS::DCPS::DataReaderImpl::unregister_for_writer().

00864 {
00865   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00866   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00867        pos != limit;
00868        ++pos) {
00869     (*pos)->unregister_for_writer(participant, readerid, writerid);
00870   }
00871 }

void OpenDDS::DCPS::TransportClient::use_datalink ( const RepoId remote_id,
const DataLink_rch link 
)

Definition at line 557 of file TransportClient.cpp.

References lock_, and use_datalink_i().

Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::handle_timeout().

00559 {
00560   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00561 
00562   use_datalink_i(remote_id, link, guard);
00563 }

void OpenDDS::DCPS::TransportClient::use_datalink_i ( const RepoId remote_id,
const DataLink_rch link,
Guard guard 
) [private]

Definition at line 566 of file TransportClient.cpp.

References OpenDDS::DCPS::TransportClient::PendingAssoc::active_, add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::TransportClient::PendingAssocTimer::cancel_timer(), OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::TransportClient::PendingAssocTimer::delete_pending_assoc(), OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::TransportClient::PendingAssoc::removed_, transport_assoc_done(), and VDBG_LVL.

Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().

00569 {
00570   //try to make a local copy of remote_id to use in calls
00571   //because the reference could be invalidated if the caller
00572   //reference location is deleted (i.e. in stop_accepting_or_connecting
00573   //if use_datalink_i was called from passive_connection)
00574   //Does changing this from a reference to a local affect anything going forward?
00575   RepoId remote_id(remote_id_ref);
00576 
00577   GuidConverter peerId_conv(remote_id);
00578   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00579             "TransportClient(%@) using datalink[%@] from %C\n",
00580             this,
00581             link.in(),
00582             OPENDDS_STRING(peerId_conv).c_str()), 0);
00583 
00584   PendingMap::iterator iter = pending_.find(remote_id);
00585 
00586   if (iter == pending_.end()) {
00587     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00588                         "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00589                         this,
00590                         link.in(),
00591                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00592     return;
00593   }
00594 
00595   PendingAssoc* pend = iter->second;
00596   const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00597   bool ok = false;
00598 
00599   if (pend->removed_) { // no-op
00600     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00601                         "TransportClient(%@) using datalink[%@] pending association to remote %C was removed\n",
00602                         this,
00603                         link.in(),
00604                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00605     return;
00606   } else if (link.is_nil()) {
00607 
00608     if (pend->active_ && pend->initiate_connect(this, guard)) {
00609       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00610                           "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00611                           this,
00612                           link.in(),
00613                           OPENDDS_STRING(peerId_conv).c_str()), 0);
00614       return;
00615     }
00616 
00617   } else { // link is ready to use
00618     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00619               "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00620               this,
00621               link.in(),
00622               OPENDDS_STRING(peerId_conv).c_str()), 0);
00623 
00624     add_link(link, remote_id);
00625     ok = true;
00626   }
00627 
00628   // either link is valid or assoc failed, clean up pending object
00629   // for passive side processing
00630   if (!pend->active_) {
00631 
00632     for (size_t i = 0; i < pend->impls_.size(); ++i) {
00633       pend->impls_[i]->stop_accepting_or_connecting(this, pend->data_.remote_id_);
00634     }
00635   }
00636 
00637   pending_.erase(iter);
00638   pend->removed_ = true;
00639 
00640   guard.release();
00641 
00642   pending_assoc_timer_->cancel_timer(this, pend);
00643   pending_assoc_timer_->delete_pending_assoc(pend);
00644 
00645   transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00646 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::DataWriterImpl_T< MessageType >, OpenDDS::DCPS::RecorderImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 162 of file TransportClient.h.

friend class TransportImpl [friend]

Definition at line 138 of file TransportClient.h.


Member Data Documentation

bool OpenDDS::DCPS::TransportClient::cdr_encapsulation_ [private]

Definition at line 300 of file TransportClient.h.

Referenced by enable_transport_using_config().

TransportLocatorSeq OpenDDS::DCPS::TransportClient::conn_info_ [private]

Definition at line 304 of file TransportClient.h.

Referenced by enable_transport_using_config().

DataLinkIndex OpenDDS::DCPS::TransportClient::data_link_index_ [private]

Definition at line 282 of file TransportClient.h.

Referenced by add_link(), disassociate(), send_control_to(), send_response(), and transport_detached().

bool OpenDDS::DCPS::TransportClient::durable_ [private]

Definition at line 300 of file TransportClient.h.

Referenced by associate(), and enable_transport_using_config().

ACE_UINT64 OpenDDS::DCPS::TransportClient::expected_transaction_id_ [private]

Definition at line 288 of file TransportClient.h.

Referenced by send_i().

ImplsType OpenDDS::DCPS::TransportClient::impls_ [private]

Definition at line 273 of file TransportClient.h.

Referenced by associate(), enable_transport_using_config(), register_for_reader(), register_for_writer(), transport_detached(), unregister_for_reader(), unregister_for_writer(), and ~TransportClient().

DataLinkSet OpenDDS::DCPS::TransportClient::links_ [private]

Definition at line 275 of file TransportClient.h.

Referenced by add_link(), disassociate(), remove_all_msgs(), remove_sample(), send_control(), send_control_to(), send_final_acks(), send_i(), transport_detached(), and ~TransportClient().

DataLinkIndex OpenDDS::DCPS::TransportClient::links_waiting_for_on_deleted_callback_ [private]

Definition at line 276 of file TransportClient.h.

Referenced by disassociate(), on_notification_of_connection_deletion(), and ~TransportClient().

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::lock_ [private]

Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.

Definition at line 307 of file TransportClient.h.

Referenced by associate(), disassociate(), on_notification_of_connection_deletion(), register_for_reader(), register_for_writer(), send_control_to(), stop_associating(), transport_detached(), unregister_for_reader(), unregister_for_writer(), use_datalink(), and ~TransportClient().

ACE_UINT64 OpenDDS::DCPS::TransportClient::max_transaction_id_seen_ [private]

Definition at line 289 of file TransportClient.h.

Referenced by send_i().

DataSampleElement* OpenDDS::DCPS::TransportClient::max_transaction_tail_ [private]

Definition at line 296 of file TransportClient.h.

Referenced by send_i().

ACE_Time_Value OpenDDS::DCPS::TransportClient::passive_connect_duration_ [private]

Definition at line 302 of file TransportClient.h.

Referenced by enable_transport_using_config().

PendingMap OpenDDS::DCPS::TransportClient::pending_ [private]

Definition at line 274 of file TransportClient.h.

Referenced by associate(), disassociate(), initiate_connect_i(), stop_associating(), transport_detached(), use_datalink_i(), and ~TransportClient().

PendingAssocTimer* OpenDDS::DCPS::TransportClient::pending_assoc_timer_ [private]

Definition at line 269 of file TransportClient.h.

Referenced by associate(), use_datalink_i(), and ~TransportClient().

bool OpenDDS::DCPS::TransportClient::reliable_ [private]

Definition at line 300 of file TransportClient.h.

Referenced by associate(), and enable_transport_using_config().

RepoId OpenDDS::DCPS::TransportClient::repo_id_ [private]

Reimplemented in OpenDDS::RTPS::Sedp::Endpoint.

Definition at line 312 of file TransportClient.h.

Referenced by OpenDDS::DCPS::UdpTransport::accept_datalink(), OpenDDS::DCPS::TcpTransport::accept_datalink(), OpenDDS::DCPS::MulticastTransport::accept_datalink(), add_link(), associate(), OpenDDS::DCPS::TcpTransport::connect_datalink(), OpenDDS::DCPS::RtpsUdpTransport::connect_datalink(), disassociate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), initiate_connect_i(), on_notification_of_connection_deletion(), remove_all_msgs(), send_control(), send_control_to(), transport_detached(), and ~TransportClient().

Reverse_Lock_t OpenDDS::DCPS::TransportClient::reverse_lock_ [private]

Definition at line 310 of file TransportClient.h.

Referenced by associate().

DataLinkSet OpenDDS::DCPS::TransportClient::send_links_ [private]

These are the links being used during the call to send(). This is made a member of the class to minimize allocation/deallocations of the data link set.

Definition at line 280 of file TransportClient.h.

ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::send_transaction_lock_ [private]

Definition at line 287 of file TransportClient.h.

Referenced by send(), and send_w_control().

bool OpenDDS::DCPS::TransportClient::swap_bytes_ [private]

Definition at line 300 of file TransportClient.h.

Referenced by enable_transport_using_config().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:29 2016 for OpenDDS by  doxygen 1.4.7