#include <TransportClient.h>
Inheritance diagram for OpenDDS::DCPS::TransportClient:
DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.
Definition at line 49 of file TransportClient.h.
typedef ACE_Guard<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Guard [private] |
Definition at line 142 of file TransportClient.h.
typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Reverse_Lock_t [private] |
anonymous enum |
OpenDDS::DCPS::TransportClient::TransportClient | ( | ) | [protected] |
Definition at line 32 of file TransportClient.cpp.
00033 : pending_assoc_timer_(new PendingAssocTimer (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner())) 00034 , expected_transaction_id_(1) 00035 , max_transaction_id_seen_(0) 00036 , max_transaction_tail_(0) 00037 , swap_bytes_(false) 00038 , cdr_encapsulation_(false) 00039 , reliable_(false) 00040 , durable_(false) 00041 , reverse_lock_(lock_) 00042 , repo_id_(GUID_UNKNOWN) 00043 { 00044 }
OpenDDS::DCPS::TransportClient::~TransportClient | ( | ) | [protected, virtual] |
Definition at line 46 of file TransportClient.cpp.
References OpenDDS::DCPS::TransportClient::PendingAssocTimer::cancel_timer(), OpenDDS::DCPS::ReactorInterceptor::destroy(), impls_, links_, links_waiting_for_on_deleted_callback_, lock_, OpenDDS::DCPS::DataLinkSet::map(), OPENDDS_STRING, OPENDDS_VECTOR(), pending_, pending_assoc_timer_, repo_id_, stop_associating(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::ReactorInterceptor::wait().
00047 { 00048 if (Transport_debug_level > 5) { 00049 GuidConverter converter(repo_id_); 00050 ACE_DEBUG((LM_DEBUG, 00051 ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"), 00052 OPENDDS_STRING(converter).c_str())); 00053 } 00054 00055 this->stop_associating(); 00056 00057 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00058 00059 for (DataLinkIndex::iterator iter = links_waiting_for_on_deleted_callback_.begin(); 00060 iter != links_waiting_for_on_deleted_callback_.end(); ++iter) { 00061 if (Transport_debug_level > 5) { 00062 GuidConverter converter(repo_id_); 00063 ACE_DEBUG((LM_DEBUG, 00064 ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C from link waiting for callback\n"), 00065 this, 00066 OPENDDS_STRING(converter).c_str())); 00067 } 00068 iter->second->remove_listener(repo_id_); 00069 } 00070 00071 for (DataLinkSet::MapType::iterator iter = links_.map().begin(); 00072 iter != links_.map().end(); ++iter) { 00073 if (Transport_debug_level > 5) { 00074 GuidConverter converter(repo_id_); 00075 ACE_DEBUG((LM_DEBUG, 00076 ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C\n"), 00077 this, 00078 OPENDDS_STRING(converter).c_str())); 00079 } 00080 iter->second->remove_listener(repo_id_); 00081 } 00082 00083 for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) { 00084 for (size_t i = 0; i < impls_.size(); ++i) { 00085 impls_[i]->stop_accepting_or_connecting(this, it->second->data_.remote_id_); 00086 } 00087 00088 pending_assoc_timer_->cancel_timer(this, it->second); 00089 } 00090 00091 pending_assoc_timer_->wait(); 00092 pending_assoc_timer_->destroy(); 00093 00094 for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin(); 00095 it != impls_.end(); ++it) { 00096 00097 (*it)->detach_client(this); 00098 } 00099 }
void OpenDDS::DCPS::TransportClient::add_link | ( | const DataLink_rch & | link, | |
const RepoId & | peer | |||
) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl.
Definition at line 649 of file TransportClient.cpp.
References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLinkSet::insert_link(), links_, and repo_id_.
Referenced by OpenDDS::DCPS::DataReaderImpl::add_link(), and use_datalink_i().
00650 { 00651 links_.insert_link(link.in()); 00652 data_link_index_[peer] = link; 00653 00654 TransportReceiveListener* trl = get_receive_listener(); 00655 00656 if (trl) { 00657 link->make_reservation(peer, repo_id_, trl); 00658 00659 } else { 00660 link->make_reservation(peer, repo_id_, get_send_listener()); 00661 } 00662 }
bool OpenDDS::DCPS::TransportClient::associate | ( | const AssociationData & | peer, | |
bool | active | |||
) | [protected] |
Definition at line 243 of file TransportClient.cpp.
References OpenDDS::DCPS::TransportClient::PendingAssoc::active_, OpenDDS::DCPS::TransportClient::PendingAssoc::attribs_, OpenDDS::DCPS::TransportClient::PendingAssoc::blob_index_, OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::DCPS_debug_level, durable_, get_priority_value(), get_repo_id(), OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, impls_, OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportClient::PendingAssocTimer::schedule_timer(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, use_datalink_i(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Reader::assoc(), and OpenDDS::RTPS::Sedp::Writer::assoc().
00244 { 00245 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false); 00246 00247 repo_id_ = get_repo_id(); 00248 00249 if (impls_.empty()) { 00250 if (DCPS_debug_level) { 00251 GuidConverter writer_converter(this->repo_id_); 00252 GuidConverter reader_converter(data.remote_id_); 00253 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00254 ACE_TEXT("local %C remote %C no available impls\n"), 00255 OPENDDS_STRING(writer_converter).c_str(), 00256 OPENDDS_STRING(reader_converter).c_str())); 00257 } 00258 return false; 00259 } 00260 00261 bool all_impls_shut_down = true; 00262 for (size_t i = 0; i < impls_.size(); ++i) { 00263 if (!impls_.at(i)->is_shut_down()) { 00264 all_impls_shut_down = false; 00265 break; 00266 } 00267 } 00268 00269 if (all_impls_shut_down) { 00270 if (DCPS_debug_level) { 00271 GuidConverter writer_converter(this->repo_id_); 00272 GuidConverter reader_converter(data.remote_id_); 00273 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00274 ACE_TEXT("local %C remote %C all available impls previously shutdown\n"), 00275 OPENDDS_STRING(writer_converter).c_str(), 00276 OPENDDS_STRING(reader_converter).c_str())); 00277 } 00278 return false; 00279 } 00280 00281 PendingMap::iterator iter = pending_.find(data.remote_id_); 00282 00283 if (iter == pending_.end()) { 00284 RepoId remote_copy(data.remote_id_); 00285 iter = pending_.insert(std::make_pair(remote_copy, new PendingAssoc())).first; 00286 00287 GuidConverter tc_assoc(this->repo_id_); 00288 GuidConverter remote_new(data.remote_id_); 00289 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc " 00290 "between %C and remote %C\n", 00291 OPENDDS_STRING(tc_assoc).c_str(), 00292 OPENDDS_STRING(remote_new).c_str()), 0); 00293 } else { 00294 if (iter->second->removed_) { 00295 iter->second->removed_ = false; 00296 GuidConverter tc_assoc(this->repo_id_); 00297 GuidConverter remote_new(data.remote_id_); 00298 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate found existing PendingAssoc " 00299 "between %C and remote %C, set removed to false to continue using this pending association\n", 00300 OPENDDS_STRING(tc_assoc).c_str(), 00301 OPENDDS_STRING(remote_new).c_str()), 0); 00302 } else { 00303 ACE_ERROR((LM_ERROR, 00304 ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ") 00305 ACE_TEXT("already associating with remote.\n"))); 00306 00307 return false; 00308 } 00309 } 00310 00311 PendingAssoc& pend = *(iter->second); 00312 pend.active_ = active; 00313 pend.impls_.clear(); 00314 pend.blob_index_ = 0; 00315 pend.data_ = data; 00316 pend.attribs_.local_id_ = repo_id_; 00317 pend.attribs_.priority_ = get_priority_value(data); 00318 pend.attribs_.local_reliable_ = reliable_; 00319 pend.attribs_.local_durable_ = durable_; 00320 00321 if (active) { 00322 pend.impls_.reserve(impls_.size()); 00323 std::reverse_copy(impls_.begin(), impls_.end(), 00324 std::back_inserter(pend.impls_)); 00325 00326 pend.initiate_connect(this, guard); 00327 00328 //Revisit if this should be used instead of always returning true. 00329 //return pend.initiate_connect(this, guard); 00330 return true; 00331 00332 } else { // passive 00333 00334 // call accept_datalink for each impl / blob pair of the same type 00335 for (size_t i = 0; i < impls_.size(); ++i) { 00336 pend.impls_.push_back(impls_[i]); 00337 const OPENDDS_STRING type = impls_[i]->transport_type(); 00338 00339 for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) { 00340 if (data.remote_data_[j].transport_type.in() == type) { 00341 const TransportImpl::RemoteTransport remote = { 00342 data.remote_id_, data.remote_data_[j].data, 00343 data.publication_transport_priority_, 00344 data.remote_reliable_, data.remote_durable_}; 00345 00346 TransportImpl::AcceptConnectResult res; 00347 { 00348 // This thread acquired lock_ at the beginning of this method. Calling accept_datalink might require getting the lock for the transport's reactor. 00349 // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock. 00350 // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_. The locking order in this case is transport reactor lock -> lock_. 00351 // To avoid deadlock, we must reverse the lock. 00352 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false); 00353 res = impls_[i]->accept_datalink(remote, pend.attribs_, this); 00354 } 00355 00356 //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink 00357 PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_); 00358 00359 if (iter_after_accept == pending_.end()) { 00360 //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an 00361 //active side connection and completed, thus pend was removed from pending_. Can return true. 00362 return true; 00363 } 00364 00365 if (res.success_ && !res.link_.is_nil()) { 00366 00367 use_datalink_i(data.remote_id_, res.link_, guard); 00368 00369 return true; 00370 } 00371 } 00372 } 00373 00374 //pend.impls_.push_back(impls_[i]); 00375 } 00376 00377 pending_assoc_timer_->schedule_timer(this, iter->second); 00378 } 00379 00380 return true; 00381 }
bool OpenDDS::DCPS::TransportClient::cdr_encapsulation | ( | ) | const [inline, protected] |
Definition at line 69 of file TransportClient.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().
00069 { return cdr_encapsulation_; }
virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos | ( | const TransportInst & | inst | ) | [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by enable_transport_using_config().
const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info | ( | ) | const [inline, protected] |
Definition at line 70 of file TransportClient.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().
00070 { return conn_info_; }
void OpenDDS::DCPS::TransportClient::disassociate | ( | const RepoId & | peerId | ) | [protected] |
Definition at line 737 of file TransportClient.cpp.
References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), links_, links_waiting_for_on_deleted_callback_, lock_, OPENDDS_STRING, pending_, OpenDDS::DCPS::DataLinkSet::remove_link(), repo_id_, and VDBG_LVL.
Referenced by OpenDDS::RTPS::Sedp::disassociate().
00738 { 00739 GuidConverter peerId_conv(peerId); 00740 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate " 00741 "TransportClient(%@) disassociating from %C\n", 00742 this, 00743 OPENDDS_STRING(peerId_conv).c_str()), 5); 00744 00745 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00746 00747 const PendingMap::iterator iter = pending_.find(peerId); 00748 00749 if (iter != pending_.end()) { 00750 iter->second->removed_ = true; 00751 return; 00752 } 00753 00754 const DataLinkIndex::iterator found = data_link_index_.find(peerId); 00755 00756 if (found == data_link_index_.end()) { 00757 if (DCPS_debug_level > 4) { 00758 const GuidConverter converter(peerId); 00759 ACE_DEBUG((LM_DEBUG, 00760 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00761 ACE_TEXT("no link for remote peer %C\n"), 00762 OPENDDS_STRING(converter).c_str())); 00763 } 00764 00765 return; 00766 } 00767 00768 const DataLink_rch link = found->second; 00769 00770 //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock 00771 //otherwise it could be removed in transport_detached() 00772 data_link_index_.erase(found); 00773 DataLinkSetMap released; 00774 00775 if (DCPS_debug_level > 4) { 00776 ACE_DEBUG((LM_DEBUG, 00777 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00778 ACE_TEXT("about to release_reservations for link[%@] \n"), 00779 link.in())); 00780 } 00781 00782 link->release_reservations(peerId, repo_id_, released); 00783 00784 if (!released.empty()) { 00785 00786 if (DCPS_debug_level > 4) { 00787 ACE_DEBUG((LM_DEBUG, 00788 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00789 ACE_TEXT("about to remove_link[%@] from links_\n"), 00790 link.in())); 00791 } 00792 links_.remove_link(link); 00793 00794 if (link->issues_on_deleted_callback()) { 00795 if (DCPS_debug_level > 4) { 00796 GuidConverter converter(repo_id_); 00797 ACE_DEBUG((LM_DEBUG, 00798 ACE_TEXT("(%P|%t) TransportClient::disassociate: wait for connection deleted callback for %C on link[%@]\n"), 00799 OPENDDS_STRING(converter).c_str(), 00800 link.in())); 00801 } 00802 links_waiting_for_on_deleted_callback_[peerId] = link; 00803 } else { 00804 if (DCPS_debug_level > 4) { 00805 GuidConverter converter(repo_id_); 00806 ACE_DEBUG((LM_DEBUG, 00807 ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"), 00808 OPENDDS_STRING(converter).c_str(), 00809 link.in())); 00810 } 00811 // Datalink is no longer used for any remote peer by this TransportClient 00812 link->remove_listener(repo_id_); 00813 } 00814 } 00815 }
virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id | ( | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by enable_transport().
void OpenDDS::DCPS::TransportClient::enable_transport | ( | bool | reliable, | |
bool | durable | |||
) | [protected] |
Definition at line 102 of file TransportClient.cpp.
References OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::instance(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().
00103 { 00104 // Search for a TransportConfig to use: 00105 TransportConfig_rch tc; 00106 00107 // 1. If this object is an Entity, check if a TransportConfig has been 00108 // bound either directly to this entity or to a parent entity. 00109 for (const EntityImpl* ent = dynamic_cast<const EntityImpl*>(this); 00110 ent && tc.is_nil(); ent = ent->parent()) { 00111 tc = ent->transport_config(); 00112 } 00113 00114 if (tc.is_nil()) { 00115 TransportRegistry* const reg = TransportRegistry::instance(); 00116 // 2. Check for a TransportConfig that is the default for this Domain. 00117 tc = reg->domain_default_config(domain_id()); 00118 00119 if (tc.is_nil()) { 00120 // 3. Use the global_config if one has been set. 00121 tc = reg->global_config(); 00122 00123 if (!tc.is_nil() && tc->instances_.empty() 00124 && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) { 00125 // 4. Set the "fallback option" if the global_config is empty. 00126 // (only applies if the user hasn't changed the global config) 00127 tc = reg->fix_empty_default(); 00128 } 00129 } 00130 } 00131 00132 if (tc.is_nil()) { 00133 ACE_ERROR((LM_ERROR, 00134 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ") 00135 ACE_TEXT("No TransportConfig found.\n"))); 00136 throw Transport::NotConfigured(); 00137 } 00138 00139 enable_transport_using_config(reliable, durable, tc); 00140 }
void OpenDDS::DCPS::TransportClient::enable_transport_using_config | ( | bool | reliable, | |
bool | durable, | |||
const TransportConfig_rch & | tc | |||
) | [protected] |
Definition at line 143 of file TransportClient.cpp.
References cdr_encapsulation_, check_transport_qos(), conn_info_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, impls_, OpenDDS::DCPS::RcHandle< T >::is_nil(), passive_connect_duration_, reliable_, and swap_bytes_.
Referenced by enable_transport().
00145 { 00146 swap_bytes_ = tc->swap_bytes_; 00147 cdr_encapsulation_ = false; 00148 reliable_ = reliable; 00149 durable_ = durable; 00150 unsigned long duration = tc->passive_connect_duration_; 00151 if (duration == 0) { 00152 duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION; 00153 if (DCPS_debug_level) { 00154 ACE_DEBUG((LM_WARNING, 00155 ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ") 00156 ACE_TEXT("passive_connect_duration_ configured as 0, changing to ") 00157 ACE_TEXT("default value\n"))); 00158 } 00159 } 00160 passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000); 00161 00162 const size_t n = tc->instances_.size(); 00163 00164 for (size_t i = 0; i < n; ++i) { 00165 TransportInst_rch inst = tc->instances_[i]; 00166 00167 if (check_transport_qos(*inst.in())) { 00168 TransportImpl_rch impl = inst->impl(); 00169 00170 if (!impl.is_nil()) { 00171 impl->attach_client(this); 00172 impls_.push_back(impl); 00173 const CORBA::ULong len = conn_info_.length(); 00174 conn_info_.length(len + 1); 00175 impl->connection_info(conn_info_[len]); 00176 cdr_encapsulation_ |= inst->requires_cdr(); 00177 } 00178 } 00179 } 00180 00181 if (impls_.empty()) { 00182 ACE_ERROR((LM_ERROR, 00183 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ") 00184 ACE_TEXT("No TransportImpl could be created.\n"))); 00185 throw Transport::NotConfigured(); 00186 } 00187 }
virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value | ( | const AssociationData & | data | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by associate().
TransportReceiveListener * OpenDDS::DCPS::TransportClient::get_receive_listener | ( | ) | [private] |
virtual const RepoId& OpenDDS::DCPS::TransportClient::get_repo_id | ( | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by associate(), and send_final_acks().
TransportSendListener * OpenDDS::DCPS::TransportClient::get_send_listener | ( | ) | [private] |
Definition at line 1081 of file TransportClient.cpp.
Referenced by add_link(), send_control(), and send_control_to().
bool OpenDDS::DCPS::TransportClient::initiate_connect_i | ( | TransportImpl::AcceptConnectResult & | result, | |
const TransportImpl_rch | impl, | |||
const TransportImpl::RemoteTransport & | remote, | |||
const TransportImpl::ConnectionAttribs & | attribs_, | |||
Guard & | guard | |||
) | [private] |
Definition at line 395 of file TransportClient.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, pending_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().
00400 { 00401 if (!guard.locked()) { 00402 //don't own the lock_ so can't release it...shouldn't happen 00403 GuidConverter local(repo_id_); 00404 GuidConverter remote_conv(remote.repo_id_); 00405 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00406 "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n", 00407 OPENDDS_STRING(local).c_str(), 00408 OPENDDS_STRING(remote_conv).c_str()), 0); 00409 return false; 00410 } 00411 00412 { 00413 //can't call connect while holding lock due to possible reactor deadlock 00414 guard.release(); 00415 GuidConverter local(repo_id_); 00416 GuidConverter remote_conv(remote.repo_id_); 00417 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00418 "attempt to connect_datalink between local %C and remote %C\n", 00419 OPENDDS_STRING(local).c_str(), 00420 OPENDDS_STRING(remote_conv).c_str()), 0); 00421 result = impl->connect_datalink(remote, attribs_, this); 00422 if (!result.success_) { 00423 if (DCPS_debug_level) { 00424 GuidConverter writer_converter(repo_id_); 00425 GuidConverter reader_converter(remote.repo_id_); 00426 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00427 ACE_TEXT("connect_datalink between local %C remote %C not successful\n"), 00428 OPENDDS_STRING(writer_converter).c_str(), 00429 OPENDDS_STRING(reader_converter).c_str())); 00430 } 00431 return false; 00432 } 00433 guard.acquire(); 00434 } 00435 00436 //Check to make sure the pending assoc still exists in the map and hasn't been slated for removal 00437 //figure out how to respond to these possible results that occurred while lock was released to connect 00438 PendingMap::iterator iter = pending_.find(remote.repo_id_); 00439 00440 if (iter == pending_.end()) { 00441 GuidConverter local(repo_id_); 00442 GuidConverter remote_conv(remote.repo_id_); 00443 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00444 "cannot find pending association after connecting datalink between local %C and remote %C\n", 00445 OPENDDS_STRING(local).c_str(), 00446 OPENDDS_STRING(remote_conv).c_str()), 0); 00447 return false; 00448 //log some sort of error message... 00449 //PendingAssoc's are only erased from pending_ in use_datalink_i after 00450 00451 } else { 00452 if (iter->second->removed_) { 00453 GuidConverter local(repo_id_); 00454 GuidConverter remote_conv(remote.repo_id_); 00455 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00456 "pending association marked for removal already, after connecting datalink between local %C and remote %C\n", 00457 OPENDDS_STRING(local).c_str(), 00458 OPENDDS_STRING(remote_conv).c_str()), 0); 00459 //this occurs if the transport client was told to disassociate while connecting 00460 //disassociate cleans up everything except this local AcceptConnectResult whose destructor 00461 //should take care of it because link has not been shifted into links_ by use_datalink_i 00462 return false; 00463 00464 } 00465 00466 } 00467 GuidConverter local(repo_id_); 00468 GuidConverter remote_conv(remote.repo_id_); 00469 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00470 "connection between local %C and remote %C initiation successful\n", 00471 OPENDDS_STRING(local).c_str(), 00472 OPENDDS_STRING(remote_conv).c_str()), 0); 00473 return true; 00474 }
void OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion | ( | const RepoId & | peerId | ) | [protected] |
Definition at line 665 of file TransportClient.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, links_waiting_for_on_deleted_callback_, lock_, OPENDDS_STRING, repo_id_, and VDBG_LVL.
Referenced by OpenDDS::DCPS::DataWriterImpl::notify_connection_deleted(), and OpenDDS::DCPS::DataReaderImpl::notify_connection_deleted().
00666 { 00667 DBG_ENTRY_LVL("TransportClient","on_notification_of_connection_deletion",6); 00668 00669 GuidConverter peerId_conv(peerId); 00670 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::on_notification_of_connection_deletion " 00671 "TransportClient(%@) connection to %C deleted\n", 00672 this, 00673 OPENDDS_STRING(peerId_conv).c_str()), 5); 00674 00675 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00676 00677 const DataLinkIndex::iterator found = links_waiting_for_on_deleted_callback_.find(peerId); 00678 00679 if (found == links_waiting_for_on_deleted_callback_.end()) { 00680 if (DCPS_debug_level > 4) { 00681 const GuidConverter converter(peerId); 00682 ACE_DEBUG((LM_DEBUG, 00683 ACE_TEXT("(%P|%t) TransportClient::on_notification_of_connection_deletion: ") 00684 ACE_TEXT("no link for remote peer %C\n"), 00685 OPENDDS_STRING(converter).c_str())); 00686 } 00687 00688 return; 00689 } 00690 00691 const DataLink_rch link = found->second; 00692 00693 //now that an _rch is created for the link, remove the iterator from links_waiting_for_on_deleted_callback_ while still holding lock 00694 links_waiting_for_on_deleted_callback_.erase(found); 00695 00696 link->remove_listener(repo_id_); 00697 }
typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP | ( | RepoId | , | |
PendingAssoc * | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP | ( | RepoId | , | |
DataLink_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR | ( | TransportImpl_rch | ) | [private] |
Referenced by OpenDDS::DCPS::RecorderImpl::signal_liveliness(), transport_detached(), and ~TransportClient().
void OpenDDS::DCPS::TransportClient::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) | [protected] |
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 818 of file TransportClient.cpp.
Referenced by OpenDDS::DCPS::ReplayerImpl::register_for_reader(), and OpenDDS::DCPS::DataWriterImpl::register_for_reader().
00823 { 00824 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00825 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00826 pos != limit; 00827 ++pos) { 00828 (*pos)->register_for_reader(participant, writerid, readerid, locators, listener); 00829 } 00830 }
void OpenDDS::DCPS::TransportClient::register_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [protected] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.
Definition at line 846 of file TransportClient.cpp.
Referenced by OpenDDS::DCPS::RecorderImpl::register_for_writer(), and OpenDDS::DCPS::DataReaderImpl::register_for_writer().
00851 { 00852 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00853 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00854 pos != limit; 00855 ++pos) { 00856 (*pos)->register_for_writer(participant, readerid, writerid, locators, listener); 00857 } 00858 }
bool OpenDDS::DCPS::TransportClient::remove_all_msgs | ( | ) | [protected] |
Definition at line 1129 of file TransportClient.cpp.
References links_, OpenDDS::DCPS::DataLinkSet::remove_all_msgs(), and repo_id_.
bool OpenDDS::DCPS::TransportClient::remove_sample | ( | const DataSampleElement * | sample | ) | [protected] |
Definition at line 1123 of file TransportClient.cpp.
References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().
01124 { 01125 return links_.remove_sample(sample); 01126 }
void OpenDDS::DCPS::TransportClient::send | ( | SendStateDataSampleList | send_list, | |
ACE_UINT64 | transaction_id = 0 | |||
) | [protected] |
Definition at line 902 of file TransportClient.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.
Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), OpenDDS::DCPS::DataWriterImpl::write(), and OpenDDS::RTPS::Sedp::Writer::write_sample().
00903 { 00904 if (send_list.head() == 0) { 00905 return; 00906 } 00907 ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_); 00908 send_i(send_list, transaction_id); 00909 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_control | ( | const DataSampleHeader & | header, | |
ACE_Message_Block * | msg | |||
) | [protected] |
Reimplemented in OpenDDS::DCPS::DataWriterImpl.
Definition at line 1093 of file TransportClient.cpp.
References get_send_listener(), header, links_, repo_id_, and OpenDDS::DCPS::DataLinkSet::send_control().
Referenced by OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::RTPS::Sedp::Writer::write_control_msg().
01095 { 01096 TransportSendListener* listener = get_send_listener(); 01097 01098 return links_.send_control(repo_id_, listener, header, msg); 01099 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to | ( | const DataSampleHeader & | header, | |
ACE_Message_Block * | msg, | |||
const RepoId & | destination | |||
) | [protected] |
Definition at line 1102 of file TransportClient.cpp.
References data_link_index_, get_send_listener(), header, OpenDDS::DCPS::DataLinkSet::insert_link(), links_, lock_, repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::DataLinkSet::tsce_allocator().
Referenced by send_w_control().
01105 { 01106 DataLinkSet singular; 01107 { 01108 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR); 01109 DataLinkIndex::iterator found = data_link_index_.find(destination); 01110 01111 if (found == data_link_index_.end()) { 01112 msg->release(); 01113 return SEND_CONTROL_ERROR; 01114 } 01115 01116 singular.insert_link(found->second.in()); 01117 } 01118 return singular.send_control(repo_id_, get_send_listener(), header, msg, 01119 &links_.tsce_allocator()); 01120 }
void OpenDDS::DCPS::TransportClient::send_final_acks | ( | ) | [protected] |
Definition at line 731 of file TransportClient.cpp.
References get_repo_id(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().
Referenced by OpenDDS::DCPS::DataReaderImpl::prepare_to_delete().
00732 { 00733 links_.send_final_acks (get_repo_id()); 00734 }
void OpenDDS::DCPS::TransportClient::send_i | ( | SendStateDataSampleList | send_list, | |
ACE_UINT64 | transaction_id | |||
) | [private] |
Definition at line 926 of file TransportClient.cpp.
References OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.
Referenced by send(), and send_w_control().
00927 { 00928 if (transaction_id != 0 && transaction_id != expected_transaction_id_) { 00929 if (transaction_id > max_transaction_id_seen_) { 00930 max_transaction_id_seen_ = transaction_id; 00931 max_transaction_tail_ = send_list.tail(); 00932 } 00933 return; 00934 } else /* transaction_id == expected_transaction_id */ { 00935 00936 DataSampleElement* cur = send_list.head(); 00937 if (max_transaction_tail_ == 0) { 00938 //Means no future transaction beat this transaction into send 00939 if (transaction_id != 0) 00940 max_transaction_id_seen_ = expected_transaction_id_; 00941 // Only send this current transaction 00942 max_transaction_tail_ = send_list.tail(); 00943 } 00944 DataLinkSet send_links; 00945 00946 while (true) { 00947 // VERY IMPORTANT NOTE: 00948 // 00949 // We have to be very careful in how we deal with the current 00950 // DataSampleElement. The issue is that once we have invoked 00951 // data_delivered() on the send_listener_ object, or we have invoked 00952 // send() on the pub_links, we can no longer access the current 00953 // DataSampleElement!Thus, we need to get the next 00954 // DataSampleElement (pointer) from the current element now, 00955 // while it is safe. 00956 DataSampleElement* next_elem; 00957 if (cur != max_transaction_tail_) { 00958 next_elem = cur->get_next_send_sample(); 00959 } else { 00960 next_elem = max_transaction_tail_; 00961 } 00962 DataLinkSet_rch pub_links = 00963 (cur->get_num_subs() > 0) 00964 ? links_.select_links(cur->get_sub_ids(), cur->get_num_subs()) 00965 : DataLinkSet_rch(&links_, false); 00966 00967 if (pub_links.is_nil() || pub_links->empty()) { 00968 // NOTE: This is the "local publisher id is not currently 00969 // associated with any remote subscriber ids" case. 00970 00971 if (DCPS_debug_level > 4) { 00972 GuidConverter converter(cur->get_pub_id()); 00973 ACE_DEBUG((LM_DEBUG, 00974 ACE_TEXT("(%P|%t) TransportClient::send_i: ") 00975 ACE_TEXT("no links for publication %C, ") 00976 ACE_TEXT("not sending element %@ for transaction: %d.\n"), 00977 OPENDDS_STRING(converter).c_str(), 00978 cur, 00979 cur->transaction_id())); 00980 } 00981 00982 // We tell the send_listener_ that all of the remote subscriber ids 00983 // that wanted the data (all zero of them) have indeed received 00984 // the data. 00985 cur->get_send_listener()->data_delivered(cur); 00986 00987 } else { 00988 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n" 00989 , cur), 5); 00990 00991 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00992 00993 // Content-Filtering adjustment to the pub_links: 00994 // - If the sample should be filtered out of all subscriptions on a given 00995 // DataLink, then exclude that link from the subset that we'll send to. 00996 // - If the sample should be filtered out of some (or none) of the subs, 00997 // then record that information in the DataSampleElement so that the 00998 // header's content_filter_entries_ can be marshaled before it's sent. 00999 if (cur->filter_out_.ptr()) { 01000 DataLinkSet_rch subset; 01001 DataLinkSet::GuardType guard(pub_links->lock()); 01002 typedef DataLinkSet::MapType MapType; 01003 MapType& map = pub_links->map(); 01004 01005 for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) { 01006 size_t n_subs; 01007 GUIDSeq_var ti = 01008 itr->second->target_intersection(cur->get_pub_id(), 01009 cur->filter_out_.in(), n_subs); 01010 01011 if (ti.ptr() == 0 || ti->length() != n_subs) { 01012 if (!subset.in()) { 01013 subset = new DataLinkSet; 01014 } 01015 01016 subset->insert_link(itr->second.in()); 01017 cur->filter_per_link_[itr->first] = ti._retn(); 01018 01019 } else { 01020 VDBG((LM_DEBUG, 01021 "(%P|%t) DBG: DataLink completely filtered-out %@.\n", 01022 itr->second.in())); 01023 } 01024 } 01025 01026 if (!subset.in()) { 01027 VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n")); 01028 // similar to the "if (pub_links.is_nil())" case above, no links 01029 cur->get_send_listener()->data_delivered(cur); 01030 if (cur != max_transaction_tail_) { 01031 // Move on to the next DataSampleElement to send. 01032 cur = next_elem; 01033 continue; 01034 } else { 01035 break; 01036 } 01037 } 01038 01039 pub_links = subset; 01040 } 01041 01042 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 01043 01044 // This will do several things, including adding to the membership 01045 // of the send_links set. Any DataLinks added to the send_links 01046 // set will be also told about the send_start() event. Those 01047 // DataLinks (in the pub_links set) that are already in the 01048 // send_links set will not be told about the send_start() event 01049 // since they heard about it when they were inserted into the 01050 // send_links set. 01051 send_links.send_start(pub_links.in()); 01052 if (cur->get_header().message_id_ != SAMPLE_DATA) { 01053 pub_links->send_control(cur); 01054 } else { 01055 pub_links->send(cur); 01056 } 01057 } 01058 if (cur != max_transaction_tail_) { 01059 // Move on to the next DataSampleElement to send. 01060 cur = next_elem; 01061 } else { 01062 break; 01063 } 01064 } 01065 01066 // This will inform each DataLink in the set about the stop_send() event. 01067 // It will then clear the send_links_ set. 01068 // 01069 // The reason that the send_links_ set is cleared is because we continually 01070 // reuse the same send_links_ object over and over for each call to this 01071 // send method. 01072 RepoId pub_id(this->repo_id_); 01073 send_links.send_stop(pub_id); 01074 if (transaction_id != 0) 01075 expected_transaction_id_ = max_transaction_id_seen_ + 1; 01076 max_transaction_tail_ = 0; 01077 } 01078 }
bool OpenDDS::DCPS::TransportClient::send_response | ( | const RepoId & | peer, | |
const DataSampleHeader & | header, | |||
ACE_Message_Block * | payload | |||
) | [protected] |
Definition at line 874 of file TransportClient.cpp.
References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, header, OpenDDS::DCPS::DataLinkSet::insert_link(), OPENDDS_STRING, and OpenDDS::DCPS::DataLinkSet::send_response().
00877 { 00878 DataLinkIndex::iterator found = data_link_index_.find(peer); 00879 00880 if (found == data_link_index_.end()) { 00881 payload->release(); 00882 00883 if (DCPS_debug_level > 4) { 00884 GuidConverter converter(peer); 00885 ACE_DEBUG((LM_DEBUG, 00886 ACE_TEXT("(%P|%t) TransportClient::send_response: ") 00887 ACE_TEXT("no link for publication %C, ") 00888 ACE_TEXT("not sending response.\n"), 00889 OPENDDS_STRING(converter).c_str())); 00890 } 00891 00892 return false; 00893 } 00894 00895 DataLinkSet singular; 00896 singular.insert_link(found->second.in()); 00897 singular.send_response(peer, header, payload); 00898 return true; 00899 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control | ( | SendStateDataSampleList | send_list, | |
const DataSampleHeader & | header, | |||
ACE_Message_Block * | msg, | |||
const RepoId & | destination | |||
) | [protected] |
Definition at line 912 of file TransportClient.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::head(), header, OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().
00916 { 00917 ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard, 00918 send_transaction_lock_, SEND_CONTROL_ERROR); 00919 if (send_list.head()) { 00920 send_i(send_list, 0); 00921 } 00922 return send_control_to(header, msg, destination); 00923 }
void OpenDDS::DCPS::TransportClient::stop_associating | ( | const GUID_t * | repos, | |
CORBA::ULong | length | |||
) | [protected] |
Definition at line 713 of file TransportClient.cpp.
References lock_, and pending_.
00714 { 00715 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00716 00717 if (repos == 0 || length == 0) { 00718 return; 00719 } else { 00720 for (CORBA::ULong i = 0; i < length; ++i) { 00721 PendingMap::iterator iter = pending_.find(repos[i]); 00722 00723 if (iter != pending_.end()) { 00724 iter->second->removed_ = true; 00725 } 00726 } 00727 } 00728 }
void OpenDDS::DCPS::TransportClient::stop_associating | ( | ) | [protected] |
Definition at line 700 of file TransportClient.cpp.
References lock_, and pending_.
Referenced by OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), OpenDDS::DCPS::DataReaderImpl::prepare_to_delete(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::DataReaderImpl::remove_all_associations(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), and ~TransportClient().
00701 { 00702 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00703 00704 PendingMap::iterator iter = pending_.begin(); 00705 00706 while (iter != pending_.end()) { 00707 iter->second->removed_ = true; 00708 ++iter; 00709 } 00710 }
bool OpenDDS::DCPS::TransportClient::swap_bytes | ( | ) | const [inline, protected] |
Definition at line 68 of file TransportClient.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().
00068 { return swap_bytes_; }
virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done | ( | int | , | |
const RepoId & | ||||
) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.
Definition at line 135 of file TransportClient.h.
Referenced by use_datalink_i().
void OpenDDS::DCPS::TransportClient::transport_detached | ( | TransportImpl * | which | ) | [private] |
Definition at line 190 of file TransportClient.cpp.
References data_link_index_, OpenDDS::DCPS::DCPS_debug_level, impls_, links_, lock_, OpenDDS::DCPS::DataLinkSet::map(), OPENDDS_STRING, OPENDDS_VECTOR(), pending_, repo_id_, and OpenDDS::DCPS::TransportImpl::stop_accepting_or_connecting().
00191 { 00192 00193 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00194 00195 // Remove any DataLinks created by the 'which' TransportImpl from our local 00196 // data structures (both links_ and data_link_index_). 00197 for (DataLinkSet::MapType::iterator iter = links_.map().begin(); 00198 iter != links_.map().end();) { 00199 TransportImpl_rch impl = iter->second->impl(); 00200 00201 if (impl.in() == which) { 00202 for (DataLinkIndex::iterator it2 = data_link_index_.begin(); 00203 it2 != data_link_index_.end();) { 00204 if (it2->second.in() == iter->second.in()) { 00205 data_link_index_.erase(it2++); 00206 00207 } else { 00208 ++it2; 00209 } 00210 } 00211 if (DCPS_debug_level > 4) { 00212 GuidConverter converter(repo_id_); 00213 ACE_DEBUG((LM_DEBUG, 00214 ACE_TEXT("(%P|%t) TransportClient::transport_detached: calling remove_listener %C on link[%@]\n"), 00215 OPENDDS_STRING(converter).c_str(), 00216 iter->second.in())); 00217 } 00218 iter->second->remove_listener(repo_id_); 00219 links_.map().erase(iter++); 00220 00221 } else { 00222 ++iter; 00223 } 00224 } 00225 00226 // Remove the 'which' TransportImpl from the impls_ list 00227 for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin(); 00228 it != impls_.end(); ++it) { 00229 if (it->in() == which) { 00230 impls_.erase(it); 00231 00232 for (PendingMap::iterator it2 = pending_.begin(); 00233 it2 != pending_.end(); ++it2) { 00234 which->stop_accepting_or_connecting(this, it2->first); 00235 } 00236 00237 return; 00238 } 00239 } 00240 }
void OpenDDS::DCPS::TransportClient::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [protected] |
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 833 of file TransportClient.cpp.
Referenced by OpenDDS::DCPS::ReplayerImpl::unregister_for_reader(), and OpenDDS::DCPS::DataWriterImpl::unregister_for_reader().
00836 { 00837 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00838 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00839 pos != limit; 00840 ++pos) { 00841 (*pos)->unregister_for_reader(participant, writerid, readerid); 00842 } 00843 }
void OpenDDS::DCPS::TransportClient::unregister_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid | |||
) | [protected] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.
Definition at line 861 of file TransportClient.cpp.
Referenced by OpenDDS::DCPS::RecorderImpl::unregister_for_writer(), and OpenDDS::DCPS::DataReaderImpl::unregister_for_writer().
00864 { 00865 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00866 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00867 pos != limit; 00868 ++pos) { 00869 (*pos)->unregister_for_writer(participant, readerid, writerid); 00870 } 00871 }
void OpenDDS::DCPS::TransportClient::use_datalink | ( | const RepoId & | remote_id, | |
const DataLink_rch & | link | |||
) |
Definition at line 557 of file TransportClient.cpp.
References lock_, and use_datalink_i().
Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::handle_timeout().
00559 { 00560 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00561 00562 use_datalink_i(remote_id, link, guard); 00563 }
void OpenDDS::DCPS::TransportClient::use_datalink_i | ( | const RepoId & | remote_id, | |
const DataLink_rch & | link, | |||
Guard & | guard | |||
) | [private] |
Definition at line 566 of file TransportClient.cpp.
References OpenDDS::DCPS::TransportClient::PendingAssoc::active_, add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::TransportClient::PendingAssocTimer::cancel_timer(), OpenDDS::DCPS::TransportClient::PendingAssoc::data_, OpenDDS::DCPS::TransportClient::PendingAssocTimer::delete_pending_assoc(), OpenDDS::DCPS::TransportClient::PendingAssoc::impls_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::TransportClient::PendingAssoc::removed_, transport_assoc_done(), and VDBG_LVL.
Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().
00569 { 00570 //try to make a local copy of remote_id to use in calls 00571 //because the reference could be invalidated if the caller 00572 //reference location is deleted (i.e. in stop_accepting_or_connecting 00573 //if use_datalink_i was called from passive_connection) 00574 //Does changing this from a reference to a local affect anything going forward? 00575 RepoId remote_id(remote_id_ref); 00576 00577 GuidConverter peerId_conv(remote_id); 00578 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00579 "TransportClient(%@) using datalink[%@] from %C\n", 00580 this, 00581 link.in(), 00582 OPENDDS_STRING(peerId_conv).c_str()), 0); 00583 00584 PendingMap::iterator iter = pending_.find(remote_id); 00585 00586 if (iter == pending_.end()) { 00587 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00588 "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n", 00589 this, 00590 link.in(), 00591 OPENDDS_STRING(peerId_conv).c_str()), 0); 00592 return; 00593 } 00594 00595 PendingAssoc* pend = iter->second; 00596 const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0; 00597 bool ok = false; 00598 00599 if (pend->removed_) { // no-op 00600 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00601 "TransportClient(%@) using datalink[%@] pending association to remote %C was removed\n", 00602 this, 00603 link.in(), 00604 OPENDDS_STRING(peerId_conv).c_str()), 0); 00605 return; 00606 } else if (link.is_nil()) { 00607 00608 if (pend->active_ && pend->initiate_connect(this, guard)) { 00609 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00610 "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n", 00611 this, 00612 link.in(), 00613 OPENDDS_STRING(peerId_conv).c_str()), 0); 00614 return; 00615 } 00616 00617 } else { // link is ready to use 00618 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00619 "TransportClient(%@) about to add_link[%@] to remote: %C\n", 00620 this, 00621 link.in(), 00622 OPENDDS_STRING(peerId_conv).c_str()), 0); 00623 00624 add_link(link, remote_id); 00625 ok = true; 00626 } 00627 00628 // either link is valid or assoc failed, clean up pending object 00629 // for passive side processing 00630 if (!pend->active_) { 00631 00632 for (size_t i = 0; i < pend->impls_.size(); ++i) { 00633 pend->impls_[i]->stop_accepting_or_connecting(this, pend->data_.remote_id_); 00634 } 00635 } 00636 00637 pending_.erase(iter); 00638 pend->removed_ = true; 00639 00640 guard.release(); 00641 00642 pending_assoc_timer_->cancel_timer(this, pend); 00643 pending_assoc_timer_->delete_pending_assoc(pend); 00644 00645 transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id); 00646 }
friend class ::DDS_TEST [friend] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::DataWriterImpl_T< MessageType >, OpenDDS::DCPS::RecorderImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 162 of file TransportClient.h.
friend class TransportImpl [friend] |
Definition at line 138 of file TransportClient.h.
bool OpenDDS::DCPS::TransportClient::cdr_encapsulation_ [private] |
DataLinkIndex OpenDDS::DCPS::TransportClient::data_link_index_ [private] |
Definition at line 282 of file TransportClient.h.
Referenced by add_link(), disassociate(), send_control_to(), send_response(), and transport_detached().
bool OpenDDS::DCPS::TransportClient::durable_ [private] |
Definition at line 300 of file TransportClient.h.
Referenced by associate(), and enable_transport_using_config().
ACE_UINT64 OpenDDS::DCPS::TransportClient::expected_transaction_id_ [private] |
ImplsType OpenDDS::DCPS::TransportClient::impls_ [private] |
Definition at line 273 of file TransportClient.h.
Referenced by associate(), enable_transport_using_config(), register_for_reader(), register_for_writer(), transport_detached(), unregister_for_reader(), unregister_for_writer(), and ~TransportClient().
Definition at line 275 of file TransportClient.h.
Referenced by add_link(), disassociate(), remove_all_msgs(), remove_sample(), send_control(), send_control_to(), send_final_acks(), send_i(), transport_detached(), and ~TransportClient().
DataLinkIndex OpenDDS::DCPS::TransportClient::links_waiting_for_on_deleted_callback_ [private] |
Definition at line 276 of file TransportClient.h.
Referenced by disassociate(), on_notification_of_connection_deletion(), and ~TransportClient().
ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::lock_ [private] |
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 307 of file TransportClient.h.
Referenced by associate(), disassociate(), on_notification_of_connection_deletion(), register_for_reader(), register_for_writer(), send_control_to(), stop_associating(), transport_detached(), unregister_for_reader(), unregister_for_writer(), use_datalink(), and ~TransportClient().
ACE_UINT64 OpenDDS::DCPS::TransportClient::max_transaction_id_seen_ [private] |
ACE_Time_Value OpenDDS::DCPS::TransportClient::passive_connect_duration_ [private] |
PendingMap OpenDDS::DCPS::TransportClient::pending_ [private] |
Definition at line 274 of file TransportClient.h.
Referenced by associate(), disassociate(), initiate_connect_i(), stop_associating(), transport_detached(), use_datalink_i(), and ~TransportClient().
Definition at line 269 of file TransportClient.h.
Referenced by associate(), use_datalink_i(), and ~TransportClient().
bool OpenDDS::DCPS::TransportClient::reliable_ [private] |
Definition at line 300 of file TransportClient.h.
Referenced by associate(), and enable_transport_using_config().
Reimplemented in OpenDDS::RTPS::Sedp::Endpoint.
Definition at line 312 of file TransportClient.h.
Referenced by OpenDDS::DCPS::UdpTransport::accept_datalink(), OpenDDS::DCPS::TcpTransport::accept_datalink(), OpenDDS::DCPS::MulticastTransport::accept_datalink(), add_link(), associate(), OpenDDS::DCPS::TcpTransport::connect_datalink(), OpenDDS::DCPS::RtpsUdpTransport::connect_datalink(), disassociate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), initiate_connect_i(), on_notification_of_connection_deletion(), remove_all_msgs(), send_control(), send_control_to(), transport_detached(), and ~TransportClient().
These are the links being used during the call to send(). This is made a member of the class to minimize allocation/deallocations of the data link set.
Definition at line 280 of file TransportClient.h.
ACE_Thread_Mutex OpenDDS::DCPS::TransportClient::send_transaction_lock_ [private] |
bool OpenDDS::DCPS::TransportClient::swap_bytes_ [private] |