Mix-in class for DDS entities which directly use the transport layer. More...
#include <TransportClient.h>
Mix-in class for DDS entities which directly use the transport layer.
DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient class manages the TransportImpl objects that represent the available communication mechanisms and the DataLink objects that represent the currently active communication channels to peers.
Definition at line 52 of file TransportClient.h.
typedef ACE_Guard<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Guard [private] |
Definition at line 146 of file TransportClient.h.
typedef RcHandle<PendingAssoc> OpenDDS::DCPS::TransportClient::PendingAssoc_rch [private] |
Definition at line 188 of file TransportClient.h.
typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::TransportClient::Reverse_Lock_t [private] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl.
Definition at line 302 of file TransportClient.h.
anonymous enum |
Definition at line 60 of file TransportClient.h.
00060 { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
OpenDDS::DCPS::TransportClient::TransportClient | ( | ) |
Definition at line 33 of file TransportClient.cpp.
00034 : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner())) 00035 , expected_transaction_id_(1) 00036 , max_transaction_id_seen_(0) 00037 , max_transaction_tail_(0) 00038 , swap_bytes_(false) 00039 , cdr_encapsulation_(false) 00040 , reliable_(false) 00041 , durable_(false) 00042 , reverse_lock_(lock_) 00043 , repo_id_(GUID_UNKNOWN) 00044 { 00045 }
OpenDDS::DCPS::TransportClient::~TransportClient | ( | ) | [virtual] |
Definition at line 47 of file TransportClient.cpp.
References ACE_TEXT(), impls_, LM_DEBUG, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, repo_id_, stop_associating(), and OpenDDS::DCPS::Transport_debug_level.
00048 { 00049 if (Transport_debug_level > 5) { 00050 GuidConverter converter(repo_id_); 00051 ACE_DEBUG((LM_DEBUG, 00052 ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"), 00053 OPENDDS_STRING(converter).c_str())); 00054 } 00055 00056 stop_associating(); 00057 00058 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00059 00060 for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) { 00061 for (size_t i = 0; i < impls_.size(); ++i) { 00062 impls_[i]->stop_accepting_or_connecting(*this, it->second->data_.remote_id_); 00063 } 00064 00065 pending_assoc_timer_->cancel_timer(this, it->second); 00066 } 00067 00068 pending_assoc_timer_->wait(); 00069 00070 }
void OpenDDS::DCPS::TransportClient::add_link | ( | const DataLink_rch & | link, | |
const RepoId & | peer | |||
) | [virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl.
Definition at line 524 of file TransportClient.cpp.
References data_link_index_, get_receive_listener(), get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), links_, and repo_id_.
Referenced by use_datalink_i().
00525 { 00526 links_.insert_link(link); 00527 data_link_index_[peer] = link; 00528 00529 TransportReceiveListener_rch trl = get_receive_listener(); 00530 00531 if (trl) { 00532 link->make_reservation(peer, repo_id_, trl); 00533 00534 } else { 00535 link->make_reservation(peer, repo_id_, get_send_listener()); 00536 } 00537 }
bool OpenDDS::DCPS::TransportClient::associate | ( | const AssociationData & | peer, | |
bool | active | |||
) |
Definition at line 166 of file TransportClient.cpp.
References ACE_TEXT(), OpenDDS::DCPS::back_inserter(), OpenDDS::DCPS::DCPS_debug_level, durable_, get_priority_value(), get_repo_id(), impls_, OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::AcceptConnectResult::link_, LM_DEBUG, LM_ERROR, lock_, OPENDDS_STRING, pending_, pending_assoc_timer_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, OpenDDS::DCPS::rchandle_from(), reliable_, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, repo_id_, reverse_lock_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, use_datalink_i(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::ReplayerImpl::add_association(), OpenDDS::DCPS::RecorderImpl::add_association(), OpenDDS::DCPS::DataWriterImpl::add_association(), OpenDDS::DCPS::DataReaderImpl::add_association(), OpenDDS::RTPS::Sedp::Reader::assoc(), and OpenDDS::RTPS::Sedp::Writer::assoc().
00167 { 00168 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false); 00169 00170 repo_id_ = get_repo_id(); 00171 00172 if (impls_.empty()) { 00173 if (DCPS_debug_level) { 00174 GuidConverter writer_converter(repo_id_); 00175 GuidConverter reader_converter(data.remote_id_); 00176 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00177 ACE_TEXT("local %C remote %C no available impls\n"), 00178 OPENDDS_STRING(writer_converter).c_str(), 00179 OPENDDS_STRING(reader_converter).c_str())); 00180 } 00181 return false; 00182 } 00183 00184 bool all_impls_shut_down = true; 00185 for (size_t i = 0; i < impls_.size(); ++i) { 00186 if (!impls_.at(i)->is_shut_down()) { 00187 all_impls_shut_down = false; 00188 break; 00189 } 00190 } 00191 00192 if (all_impls_shut_down) { 00193 if (DCPS_debug_level) { 00194 GuidConverter writer_converter(repo_id_); 00195 GuidConverter reader_converter(data.remote_id_); 00196 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00197 ACE_TEXT("local %C remote %C all available impls previously shutdown\n"), 00198 OPENDDS_STRING(writer_converter).c_str(), 00199 OPENDDS_STRING(reader_converter).c_str())); 00200 } 00201 return false; 00202 } 00203 00204 PendingMap::iterator iter = pending_.find(data.remote_id_); 00205 00206 if (iter == pending_.end()) { 00207 RepoId remote_copy(data.remote_id_); 00208 iter = pending_.insert(std::make_pair(remote_copy, make_rch<PendingAssoc>())).first; 00209 00210 GuidConverter tc_assoc(repo_id_); 00211 GuidConverter remote_new(data.remote_id_); 00212 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc " 00213 "between %C and remote %C\n", 00214 OPENDDS_STRING(tc_assoc).c_str(), 00215 OPENDDS_STRING(remote_new).c_str()), 0); 00216 } else { 00217 00218 ACE_ERROR((LM_ERROR, 00219 ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ") 00220 ACE_TEXT("already associating with remote.\n"))); 00221 00222 return false; 00223 00224 } 00225 00226 PendingAssoc_rch pend = iter->second; 00227 pend->active_ = active; 00228 pend->impls_.clear(); 00229 pend->blob_index_ = 0; 00230 pend->data_ = data; 00231 pend->attribs_.local_id_ = repo_id_; 00232 pend->attribs_.priority_ = get_priority_value(data); 00233 pend->attribs_.local_reliable_ = reliable_; 00234 pend->attribs_.local_durable_ = durable_; 00235 00236 if (active) { 00237 pend->impls_.reserve(impls_.size()); 00238 std::reverse_copy(impls_.begin(), impls_.end(), 00239 std::back_inserter(pend->impls_)); 00240 00241 return pend->initiate_connect(this, guard); 00242 00243 } else { // passive 00244 00245 // call accept_datalink for each impl / blob pair of the same type 00246 for (size_t i = 0; i < impls_.size(); ++i) { 00247 pend->impls_.push_back(impls_[i]); 00248 const OPENDDS_STRING type = impls_[i]->transport_type(); 00249 00250 for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) { 00251 if (data.remote_data_[j].transport_type.in() == type) { 00252 const TransportImpl::RemoteTransport remote = { 00253 data.remote_id_, data.remote_data_[j].data, 00254 data.publication_transport_priority_, 00255 data.remote_reliable_, data.remote_durable_}; 00256 00257 TransportImpl::AcceptConnectResult res; 00258 { 00259 // This thread acquired lock_ at the beginning of this method. Calling accept_datalink might require getting the lock for the transport's reactor. 00260 // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock. 00261 // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_. The locking order in this case is transport reactor lock -> lock_. 00262 // To avoid deadlock, we must reverse the lock. 00263 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false); 00264 res = impls_[i]->accept_datalink(remote, pend->attribs_, rchandle_from(this)); 00265 } 00266 00267 //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink 00268 PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_); 00269 00270 if (iter_after_accept == pending_.end()) { 00271 //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an 00272 //active side connection and completed, thus pend was removed from pending_. Can return true. 00273 return true; 00274 } 00275 00276 if (res.success_) { 00277 if (res.link_.is_nil()) { 00278 // In this case, it may be waiting for the TCP connection to be established. Just wait without trying other transports. 00279 pending_assoc_timer_->schedule_timer(this, iter->second); 00280 } 00281 else { 00282 use_datalink_i(data.remote_id_, res.link_, guard); 00283 } 00284 return true; 00285 } 00286 } 00287 } 00288 00289 //pend->impls_.push_back(impls_[i]); 00290 } 00291 00292 pending_assoc_timer_->schedule_timer(this, iter->second); 00293 } 00294 00295 return true; 00296 }
bool OpenDDS::DCPS::TransportClient::cdr_encapsulation | ( | ) | const [inline] |
Definition at line 72 of file TransportClient.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::create_sample_data_message().
00072 { return cdr_encapsulation_; }
virtual bool OpenDDS::DCPS::TransportClient::check_transport_qos | ( | const TransportInst & | inst | ) | [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by enable_transport_using_config().
const TransportLocatorSeq& OpenDDS::DCPS::TransportClient::connection_info | ( | ) | const [inline] |
Definition at line 73 of file TransportClient.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().
00073 { return conn_info_; }
void OpenDDS::DCPS::TransportClient::disassociate | ( | const RepoId & | peerId | ) |
Definition at line 567 of file TransportClient.cpp.
References ACE_TEXT(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, lock_, OPENDDS_STRING, pending_, OpenDDS::DCPS::DataLinkSet::remove_link(), repo_id_, and VDBG_LVL.
Referenced by OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::RecorderImpl::remove_associations_i(), and OpenDDS::DCPS::DataReaderImpl::remove_associations_i().
00568 { 00569 GuidConverter peerId_conv(peerId); 00570 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate " 00571 "TransportClient(%@) disassociating from %C\n", 00572 this, 00573 OPENDDS_STRING(peerId_conv).c_str()), 5); 00574 00575 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00576 00577 if (pending_.erase(peerId)) { 00578 return; 00579 } 00580 00581 const DataLinkIndex::iterator found = data_link_index_.find(peerId); 00582 00583 if (found == data_link_index_.end()) { 00584 if (DCPS_debug_level > 4) { 00585 const GuidConverter converter(peerId); 00586 ACE_DEBUG((LM_DEBUG, 00587 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00588 ACE_TEXT("no link for remote peer %C\n"), 00589 OPENDDS_STRING(converter).c_str())); 00590 } 00591 00592 return; 00593 } 00594 00595 const DataLink_rch link = found->second; 00596 00597 //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock 00598 //otherwise it could be removed in transport_detached() 00599 data_link_index_.erase(found); 00600 DataLinkSetMap released; 00601 00602 if (DCPS_debug_level > 4) { 00603 ACE_DEBUG((LM_DEBUG, 00604 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00605 ACE_TEXT("about to release_reservations for link[%@] \n"), 00606 link.in())); 00607 } 00608 00609 link->release_reservations(peerId, repo_id_, released); 00610 00611 if (!released.empty()) { 00612 00613 if (DCPS_debug_level > 4) { 00614 ACE_DEBUG((LM_DEBUG, 00615 ACE_TEXT("(%P|%t) TransportClient::disassociate: ") 00616 ACE_TEXT("about to remove_link[%@] from links_\n"), 00617 link.in())); 00618 } 00619 links_.remove_link(link); 00620 00621 if (DCPS_debug_level > 4) { 00622 GuidConverter converter(repo_id_); 00623 ACE_DEBUG((LM_DEBUG, 00624 ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"), 00625 OPENDDS_STRING(converter).c_str(), 00626 link.in())); 00627 } 00628 // Datalink is no longer used for any remote peer by this TransportClient 00629 link->remove_listener(repo_id_); 00630 00631 } 00632 }
virtual DDS::DomainId_t OpenDDS::DCPS::TransportClient::domain_id | ( | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by enable_transport().
void OpenDDS::DCPS::TransportClient::enable_transport | ( | bool | reliable, | |
bool | durable | |||
) |
Definition at line 73 of file TransportClient.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::DEFAULT_CONFIG_NAME, OpenDDS::DCPS::TransportRegistry::domain_default_config(), domain_id(), enable_transport_using_config(), OpenDDS::DCPS::TransportRegistry::fix_empty_default(), OpenDDS::DCPS::TransportRegistry::global_config(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, and OpenDDS::DCPS::rchandle_from().
Referenced by OpenDDS::DCPS::ReplayerImpl::enable(), OpenDDS::DCPS::RecorderImpl::enable(), OpenDDS::DCPS::DataWriterImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::enable().
00074 { 00075 // Search for a TransportConfig to use: 00076 TransportConfig_rch tc; 00077 00078 // 1. If this object is an Entity, check if a TransportConfig has been 00079 // bound either directly to this entity or to a parent entity. 00080 for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this)); 00081 ent && tc.is_nil(); ent = ent->parent()) { 00082 tc = ent->transport_config(); 00083 } 00084 00085 if (tc.is_nil()) { 00086 TransportRegistry* const reg = TransportRegistry::instance(); 00087 // 2. Check for a TransportConfig that is the default for this Domain. 00088 tc = reg->domain_default_config(domain_id()); 00089 00090 if (tc.is_nil()) { 00091 // 3. Use the global_config if one has been set. 00092 tc = reg->global_config(); 00093 00094 if (!tc.is_nil() && tc->instances_.empty() 00095 && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) { 00096 // 4. Set the "fallback option" if the global_config is empty. 00097 // (only applies if the user hasn't changed the global config) 00098 tc = reg->fix_empty_default(); 00099 } 00100 } 00101 } 00102 00103 if (tc.is_nil()) { 00104 ACE_ERROR((LM_ERROR, 00105 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ") 00106 ACE_TEXT("No TransportConfig found.\n"))); 00107 throw Transport::NotConfigured(); 00108 } 00109 00110 enable_transport_using_config(reliable, durable, tc); 00111 }
void OpenDDS::DCPS::TransportClient::enable_transport_using_config | ( | bool | reliable, | |
bool | durable, | |||
const TransportConfig_rch & | tc | |||
) |
Definition at line 114 of file TransportClient.cpp.
References ACE_TEXT(), cdr_encapsulation_, check_transport_qos(), conn_info_, OpenDDS::DCPS::TransportImpl::connection_info(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION, durable_, impls_, len, LM_ERROR, LM_WARNING, passive_connect_duration_, reliable_, ACE_Time_Value::set(), and swap_bytes_.
Referenced by enable_transport().
00116 { 00117 swap_bytes_ = tc->swap_bytes_; 00118 cdr_encapsulation_ = false; 00119 reliable_ = reliable; 00120 durable_ = durable; 00121 unsigned long duration = tc->passive_connect_duration_; 00122 if (duration == 0) { 00123 duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION; 00124 if (DCPS_debug_level) { 00125 ACE_DEBUG((LM_WARNING, 00126 ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ") 00127 ACE_TEXT("passive_connect_duration_ configured as 0, changing to ") 00128 ACE_TEXT("default value\n"))); 00129 } 00130 } 00131 passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000); 00132 00133 const size_t n = tc->instances_.size(); 00134 00135 for (size_t i = 0; i < n; ++i) { 00136 TransportInst_rch inst = tc->instances_[i]; 00137 00138 if (check_transport_qos(*inst)) { 00139 TransportImpl* impl = inst->impl(); 00140 00141 if (impl) { 00142 impls_.push_back(impl); 00143 const CORBA::ULong len = conn_info_.length(); 00144 conn_info_.length(len + 1); 00145 impl->connection_info(conn_info_[len]); 00146 00147 #if defined(OPENDDS_SECURITY) 00148 impl->local_crypto_handle(get_crypto_handle()); 00149 #endif 00150 00151 cdr_encapsulation_ |= inst->requires_cdr(); 00152 } 00153 } 00154 } 00155 00156 if (impls_.empty()) { 00157 ACE_ERROR((LM_ERROR, 00158 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ") 00159 ACE_TEXT("No TransportImpl could be created.\n"))); 00160 throw Transport::NotConfigured(); 00161 } 00162 }
virtual Priority OpenDDS::DCPS::TransportClient::get_priority_value | ( | const AssociationData & | data | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by associate().
TransportReceiveListener_rch OpenDDS::DCPS::TransportClient::get_receive_listener | ( | ) | [private] |
Definition at line 902 of file TransportClient.cpp.
References OpenDDS::DCPS::rchandle_from().
Referenced by add_link().
00903 { 00904 return rchandle_from(dynamic_cast<TransportReceiveListener*>(this)); 00905 }
virtual const RepoId& OpenDDS::DCPS::TransportClient::get_repo_id | ( | ) | const [private, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::RecorderImpl, OpenDDS::DCPS::ReplayerImpl, and OpenDDS::RTPS::Sedp::Endpoint.
Referenced by associate(), and send_final_acks().
TransportSendListener_rch OpenDDS::DCPS::TransportClient::get_send_listener | ( | ) | [private] |
Definition at line 896 of file TransportClient.cpp.
References OpenDDS::DCPS::rchandle_from().
Referenced by add_link(), send_control(), and send_control_to().
00897 { 00898 return rchandle_from(dynamic_cast<TransportSendListener*>(this)); 00899 }
bool OpenDDS::DCPS::TransportClient::initiate_connect_i | ( | TransportImpl::AcceptConnectResult & | result, | |
TransportImpl * | impl, | |||
const TransportImpl::RemoteTransport & | remote, | |||
const TransportImpl::ConnectionAttribs & | attribs_, | |||
Guard & | guard | |||
) | [private] |
Definition at line 310 of file TransportClient.cpp.
References ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), OpenDDS::DCPS::TransportImpl::connect_datalink(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Guard< ACE_LOCK >::locked(), OPENDDS_STRING, OpenDDS::DCPS::rchandle_from(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, repo_id_, OpenDDS::DCPS::TransportImpl::AcceptConnectResult::success_, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect().
00315 { 00316 if (!guard.locked()) { 00317 //don't own the lock_ so can't release it...shouldn't happen 00318 GuidConverter local(repo_id_); 00319 GuidConverter remote_conv(remote.repo_id_); 00320 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00321 "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n", 00322 OPENDDS_STRING(local).c_str(), 00323 OPENDDS_STRING(remote_conv).c_str()), 0); 00324 return false; 00325 } 00326 00327 { 00328 //can't call connect while holding lock due to possible reactor deadlock 00329 guard.release(); 00330 GuidConverter local(repo_id_); 00331 GuidConverter remote_conv(remote.repo_id_); 00332 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00333 "attempt to connect_datalink between local %C and remote %C\n", 00334 OPENDDS_STRING(local).c_str(), 00335 OPENDDS_STRING(remote_conv).c_str()), 0); 00336 result = impl->connect_datalink(remote, attribs_, rchandle_from(this)); 00337 guard.acquire(); 00338 if (!result.success_) { 00339 if (DCPS_debug_level) { 00340 GuidConverter writer_converter(repo_id_); 00341 GuidConverter reader_converter(remote.repo_id_); 00342 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ") 00343 ACE_TEXT("connect_datalink between local %C remote %C not successful\n"), 00344 OPENDDS_STRING(writer_converter).c_str(), 00345 OPENDDS_STRING(reader_converter).c_str())); 00346 } 00347 return false; 00348 } 00349 } 00350 00351 GuidConverter local(repo_id_); 00352 GuidConverter remote_conv(remote.repo_id_); 00353 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - " 00354 "connection between local %C and remote %C initiation successful\n", 00355 OPENDDS_STRING(local).c_str(), 00356 OPENDDS_STRING(remote_conv).c_str()), 0); 00357 return true; 00358 }
typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP | ( | RepoId | , | |
PendingAssoc_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::TransportClient::OPENDDS_MAP_CMP | ( | RepoId | , | |
DataLink_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR | ( | TransportImpl * | ) | [private] |
Referenced by OpenDDS::DCPS::RecorderImpl::signal_liveliness().
void OpenDDS::DCPS::TransportClient::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) |
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 635 of file TransportClient.cpp.
00640 { 00641 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00642 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00643 pos != limit; 00644 ++pos) { 00645 (*pos)->register_for_reader(participant, writerid, readerid, locators, listener); 00646 } 00647 }
void OpenDDS::DCPS::TransportClient::register_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.
Definition at line 663 of file TransportClient.cpp.
00668 { 00669 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00670 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00671 pos != limit; 00672 ++pos) { 00673 (*pos)->register_for_writer(participant, readerid, writerid, locators, listener); 00674 } 00675 }
bool OpenDDS::DCPS::TransportClient::remove_all_msgs | ( | ) |
Definition at line 940 of file TransportClient.cpp.
References links_, OpenDDS::DCPS::DataLinkSet::remove_all_msgs(), and repo_id_.
Referenced by OpenDDS::DCPS::WriteDataContainer::unregister_all().
bool OpenDDS::DCPS::TransportClient::remove_sample | ( | const DataSampleElement * | sample | ) |
Definition at line 934 of file TransportClient.cpp.
References links_, and OpenDDS::DCPS::DataLinkSet::remove_sample().
Referenced by OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample().
00935 { 00936 return links_.remove_sample(sample); 00937 }
void OpenDDS::DCPS::TransportClient::send | ( | SendStateDataSampleList | send_list, | |
ACE_UINT64 | transaction_id = 0 | |||
) |
Definition at line 717 of file TransportClient.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::head(), send_i(), and send_transaction_lock_.
Referenced by OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_suspended_data(), OpenDDS::DCPS::ReplayerImpl::write(), and OpenDDS::DCPS::DataWriterImpl::write().
00718 { 00719 if (send_list.head() == 0) { 00720 return; 00721 } 00722 ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_); 00723 send_i(send_list, transaction_id); 00724 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_control | ( | const DataSampleHeader & | header, | |
Message_Block_Ptr | msg | |||
) |
Reimplemented in OpenDDS::DCPS::DataWriterImpl.
Definition at line 908 of file TransportClient.cpp.
References get_send_listener(), links_, OpenDDS::DCPS::move(), repo_id_, and OpenDDS::DCPS::DataLinkSet::send_control().
Referenced by OpenDDS::RTPS::Sedp::Writer::write_control_msg().
00910 { 00911 return links_.send_control(repo_id_, get_send_listener(), header, move(msg)); 00912 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_control_to | ( | const DataSampleHeader & | header, | |
Message_Block_Ptr | msg, | |||
const RepoId & | destination | |||
) |
Definition at line 915 of file TransportClient.cpp.
References data_link_index_, get_send_listener(), OpenDDS::DCPS::DataLinkSet::insert_link(), lock_, OpenDDS::DCPS::move(), repo_id_, OpenDDS::DCPS::DataLinkSet::send_control(), and OpenDDS::DCPS::SEND_CONTROL_ERROR.
Referenced by send_w_control().
00918 { 00919 DataLinkSet singular; 00920 { 00921 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR); 00922 DataLinkIndex::iterator found = data_link_index_.find(destination); 00923 00924 if (found == data_link_index_.end()) { 00925 return SEND_CONTROL_ERROR; 00926 } 00927 00928 singular.insert_link(found->second); 00929 } 00930 return singular.send_control(repo_id_, get_send_listener(), header, move(msg)); 00931 }
void OpenDDS::DCPS::TransportClient::send_final_acks | ( | ) |
Definition at line 561 of file TransportClient.cpp.
References get_repo_id(), links_, and OpenDDS::DCPS::DataLinkSet::send_final_acks().
Referenced by OpenDDS::DCPS::DataReaderImpl::prepare_to_delete().
00562 { 00563 links_.send_final_acks (get_repo_id()); 00564 }
void OpenDDS::DCPS::TransportClient::send_i | ( | SendStateDataSampleList | send_list, | |
ACE_UINT64 | transaction_id | |||
) | [private] |
Definition at line 741 of file TransportClient.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportSendListener::data_delivered(), OpenDDS::DCPS::DCPS_debug_level, expected_transaction_id_, OpenDDS::DCPS::DataSampleElement::filter_out_, OpenDDS::DCPS::DataSampleElement::filter_per_link_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_next_send_sample(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_ids(), OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, LM_DEBUG, max_transaction_id_seen_, max_transaction_tail_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, repo_id_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataLinkSet::select_links(), OpenDDS::DCPS::DataLinkSet::send_start(), OpenDDS::DCPS::DataLinkSet::send_stop(), OpenDDS::DCPS::SendStateDataSampleList::tail(), OpenDDS::DCPS::DataSampleElement::transaction_id(), VDBG, and VDBG_LVL.
Referenced by send(), and send_w_control().
00742 { 00743 if (transaction_id != 0 && transaction_id != expected_transaction_id_) { 00744 if (transaction_id > max_transaction_id_seen_) { 00745 max_transaction_id_seen_ = transaction_id; 00746 max_transaction_tail_ = send_list.tail(); 00747 } 00748 return; 00749 } else /* transaction_id == expected_transaction_id */ { 00750 00751 DataSampleElement* cur = send_list.head(); 00752 if (max_transaction_tail_ == 0) { 00753 //Means no future transaction beat this transaction into send 00754 if (transaction_id != 0) 00755 max_transaction_id_seen_ = expected_transaction_id_; 00756 // Only send this current transaction 00757 max_transaction_tail_ = send_list.tail(); 00758 } 00759 DataLinkSet send_links; 00760 00761 while (true) { 00762 // VERY IMPORTANT NOTE: 00763 // 00764 // We have to be very careful in how we deal with the current 00765 // DataSampleElement. The issue is that once we have invoked 00766 // data_delivered() on the send_listener_ object, or we have invoked 00767 // send() on the pub_links, we can no longer access the current 00768 // DataSampleElement!Thus, we need to get the next 00769 // DataSampleElement (pointer) from the current element now, 00770 // while it is safe. 00771 DataSampleElement* next_elem; 00772 if (cur != max_transaction_tail_) { 00773 next_elem = cur->get_next_send_sample(); 00774 } else { 00775 next_elem = max_transaction_tail_; 00776 } 00777 DataLinkSet_rch pub_links = 00778 (cur->get_num_subs() > 0) 00779 ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs())) 00780 : DataLinkSet_rch(&links_, inc_count()); 00781 00782 if (pub_links.is_nil() || pub_links->empty()) { 00783 // NOTE: This is the "local publisher id is not currently 00784 // associated with any remote subscriber ids" case. 00785 00786 if (DCPS_debug_level > 4) { 00787 GuidConverter converter(cur->get_pub_id()); 00788 ACE_DEBUG((LM_DEBUG, 00789 ACE_TEXT("(%P|%t) TransportClient::send_i: ") 00790 ACE_TEXT("no links for publication %C, ") 00791 ACE_TEXT("not sending element %@ for transaction: %d.\n"), 00792 OPENDDS_STRING(converter).c_str(), 00793 cur, 00794 cur->transaction_id())); 00795 } 00796 00797 // We tell the send_listener_ that all of the remote subscriber ids 00798 // that wanted the data (all zero of them) have indeed received 00799 // the data. 00800 cur->get_send_listener()->data_delivered(cur); 00801 00802 } else { 00803 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n" 00804 , cur), 5); 00805 00806 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00807 00808 // Content-Filtering adjustment to the pub_links: 00809 // - If the sample should be filtered out of all subscriptions on a given 00810 // DataLink, then exclude that link from the subset that we'll send to. 00811 // - If the sample should be filtered out of some (or none) of the subs, 00812 // then record that information in the DataSampleElement so that the 00813 // header's content_filter_entries_ can be marshaled before it's sent. 00814 if (cur->filter_out_.ptr()) { 00815 DataLinkSet_rch subset; 00816 DataLinkSet::GuardType guard(pub_links->lock()); 00817 typedef DataLinkSet::MapType MapType; 00818 MapType& map = pub_links->map(); 00819 00820 for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) { 00821 size_t n_subs; 00822 GUIDSeq_var ti = 00823 itr->second->target_intersection(cur->get_pub_id(), 00824 cur->filter_out_.in(), n_subs); 00825 00826 if (ti.ptr() == 0 || ti->length() != n_subs) { 00827 if (!subset.in()) { 00828 subset = make_rch<DataLinkSet>(); 00829 } 00830 00831 subset->insert_link(itr->second); 00832 cur->filter_per_link_[itr->first] = ti._retn(); 00833 00834 } else { 00835 VDBG((LM_DEBUG, 00836 "(%P|%t) DBG: DataLink completely filtered-out %@.\n", 00837 itr->second.in())); 00838 } 00839 } 00840 00841 if (!subset.in()) { 00842 VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n")); 00843 // similar to the "if (pub_links.is_nil())" case above, no links 00844 cur->get_send_listener()->data_delivered(cur); 00845 if (cur != max_transaction_tail_) { 00846 // Move on to the next DataSampleElement to send. 00847 cur = next_elem; 00848 continue; 00849 } else { 00850 break; 00851 } 00852 } 00853 00854 pub_links = subset; 00855 } 00856 00857 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00858 00859 // This will do several things, including adding to the membership 00860 // of the send_links set. Any DataLinks added to the send_links 00861 // set will be also told about the send_start() event. Those 00862 // DataLinks (in the pub_links set) that are already in the 00863 // send_links set will not be told about the send_start() event 00864 // since they heard about it when they were inserted into the 00865 // send_links set. 00866 send_links.send_start(pub_links.in()); 00867 if (cur->get_header().message_id_ != SAMPLE_DATA) { 00868 pub_links->send_control(cur); 00869 } else { 00870 pub_links->send(cur); 00871 } 00872 } 00873 if (cur != max_transaction_tail_) { 00874 // Move on to the next DataSampleElement to send. 00875 cur = next_elem; 00876 } else { 00877 break; 00878 } 00879 } 00880 00881 // This will inform each DataLink in the set about the stop_send() event. 00882 // It will then clear the send_links_ set. 00883 // 00884 // The reason that the send_links_ set is cleared is because we continually 00885 // reuse the same send_links_ object over and over for each call to this 00886 // send method. 00887 RepoId pub_id(repo_id_); 00888 send_links.send_stop(pub_id); 00889 if (transaction_id != 0) 00890 expected_transaction_id_ = max_transaction_id_seen_ + 1; 00891 max_transaction_tail_ = 0; 00892 } 00893 }
bool OpenDDS::DCPS::TransportClient::send_response | ( | const RepoId & | peer, | |
const DataSampleHeader & | header, | |||
Message_Block_Ptr | payload | |||
) |
Definition at line 691 of file TransportClient.cpp.
References ACE_TEXT(), data_link_index_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataLinkSet::insert_link(), LM_DEBUG, OpenDDS::DCPS::move(), OPENDDS_STRING, and OpenDDS::DCPS::DataLinkSet::send_response().
00694 { 00695 DataLinkIndex::iterator found = data_link_index_.find(peer); 00696 00697 if (found == data_link_index_.end()) { 00698 if (DCPS_debug_level > 4) { 00699 GuidConverter converter(peer); 00700 ACE_DEBUG((LM_DEBUG, 00701 ACE_TEXT("(%P|%t) TransportClient::send_response: ") 00702 ACE_TEXT("no link for publication %C, ") 00703 ACE_TEXT("not sending response.\n"), 00704 OPENDDS_STRING(converter).c_str())); 00705 } 00706 00707 return false; 00708 } 00709 00710 DataLinkSet singular; 00711 singular.insert_link(found->second); 00712 singular.send_response(peer, header, move(payload)); 00713 return true; 00714 }
SendControlStatus OpenDDS::DCPS::TransportClient::send_w_control | ( | SendStateDataSampleList | send_list, | |
const DataSampleHeader & | header, | |||
Message_Block_Ptr | msg, | |||
const RepoId & | destination | |||
) |
Definition at line 727 of file TransportClient.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::head(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_ERROR, send_control_to(), send_i(), and send_transaction_lock_.
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().
00731 { 00732 ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard, 00733 send_transaction_lock_, SEND_CONTROL_ERROR); 00734 if (send_list.head()) { 00735 send_i(send_list, 0); 00736 } 00737 return send_control_to(header, move(msg), destination); 00738 }
void OpenDDS::DCPS::TransportClient::stop_associating | ( | const GUID_t * | repos, | |
CORBA::ULong | length | |||
) |
Definition at line 547 of file TransportClient.cpp.
References lock_, and pending_.
00548 { 00549 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00550 00551 if (repos == 0 || length == 0) { 00552 return; 00553 } else { 00554 for (CORBA::ULong i = 0; i < length; ++i) { 00555 pending_.erase(repos[i]); 00556 } 00557 } 00558 }
void OpenDDS::DCPS::TransportClient::stop_associating | ( | ) |
Definition at line 540 of file TransportClient.cpp.
References lock_, and pending_.
Referenced by OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), OpenDDS::DCPS::DataReaderImpl::prepare_to_delete(), OpenDDS::DCPS::ReplayerImpl::remove_all_associations(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::DataReaderImpl::remove_all_associations(), OpenDDS::DCPS::ReplayerImpl::remove_associations(), OpenDDS::DCPS::DataReaderImpl::remove_associations(), and ~TransportClient().
00541 { 00542 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00543 pending_.clear(); 00544 }
bool OpenDDS::DCPS::TransportClient::swap_bytes | ( | ) | const [inline] |
Definition at line 71 of file TransportClient.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::end_coherent_changes().
00071 { return swap_bytes_; }
virtual void OpenDDS::DCPS::TransportClient::transport_assoc_done | ( | int | , | |
const RepoId & | ||||
) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::DataWriterImpl.
Definition at line 136 of file TransportClient.h.
Referenced by use_datalink_i().
void OpenDDS::DCPS::TransportClient::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) |
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 650 of file TransportClient.cpp.
00653 { 00654 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00655 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00656 pos != limit; 00657 ++pos) { 00658 (*pos)->unregister_for_reader(participant, writerid, readerid); 00659 } 00660 }
void OpenDDS::DCPS::TransportClient::unregister_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid | |||
) |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, and OpenDDS::DCPS::RecorderImpl.
Definition at line 678 of file TransportClient.cpp.
00681 { 00682 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00683 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end(); 00684 pos != limit; 00685 ++pos) { 00686 (*pos)->unregister_for_writer(participant, readerid, writerid); 00687 } 00688 }
void OpenDDS::DCPS::TransportClient::use_datalink | ( | const RepoId & | remote_id, | |
const DataLink_rch & | link | |||
) |
Definition at line 441 of file TransportClient.cpp.
References lock_, and use_datalink_i().
Referenced by OpenDDS::DCPS::TransportClient::PendingAssoc::handle_timeout().
00443 { 00444 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00445 00446 use_datalink_i(remote_id, link, guard); 00447 }
void OpenDDS::DCPS::TransportClient::use_datalink_i | ( | const RepoId & | remote_id, | |
const DataLink_rch & | link, | |||
Guard & | guard | |||
) | [private] |
Definition at line 450 of file TransportClient.cpp.
References add_link(), ASSOC_ACTIVE, ASSOC_OK, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pending_, pending_assoc_timer_, ACE_Guard< ACE_LOCK >::release(), transport_assoc_done(), and VDBG_LVL.
Referenced by associate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), and use_datalink().
00453 { 00454 // Try to make a local copy of remote_id to use in calls 00455 // because the reference could be invalidated if the caller 00456 // reference location is deleted (i.e. in stop_accepting_or_connecting 00457 // if use_datalink_i was called from passive_connection) 00458 // Does changing this from a reference to a local affect anything going forward? 00459 RepoId remote_id(remote_id_ref); 00460 00461 GuidConverter peerId_conv(remote_id); 00462 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00463 "TransportClient(%@) using datalink[%@] from %C\n", 00464 this, 00465 link.in(), 00466 OPENDDS_STRING(peerId_conv).c_str()), 0); 00467 00468 PendingMap::iterator iter = pending_.find(remote_id); 00469 00470 if (iter == pending_.end()) { 00471 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00472 "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n", 00473 this, 00474 link.in(), 00475 OPENDDS_STRING(peerId_conv).c_str()), 0); 00476 return; 00477 } 00478 00479 PendingAssoc_rch pend = iter->second; 00480 const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0; 00481 bool ok = false; 00482 00483 if (link.is_nil()) { 00484 00485 if (pend->active_ && pend->initiate_connect(this, guard)) { 00486 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00487 "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n", 00488 this, 00489 link.in(), 00490 OPENDDS_STRING(peerId_conv).c_str()), 0); 00491 return; 00492 } 00493 00494 } else { // link is ready to use 00495 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i " 00496 "TransportClient(%@) about to add_link[%@] to remote: %C\n", 00497 this, 00498 link.in(), 00499 OPENDDS_STRING(peerId_conv).c_str()), 0); 00500 00501 add_link(link, remote_id); 00502 ok = true; 00503 } 00504 00505 // either link is valid or assoc failed, clean up pending object 00506 // for passive side processing 00507 if (!pend->active_) { 00508 00509 for (size_t i = 0; i < pend->impls_.size(); ++i) { 00510 pend->impls_[i]->stop_accepting_or_connecting(*this, pend->data_.remote_id_); 00511 } 00512 } 00513 00514 pending_.erase(iter); 00515 00516 guard.release(); 00517 00518 pending_assoc_timer_->cancel_timer(this, pend); 00519 00520 transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id); 00521 }
friend class ::DDS_TEST [friend] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl, OpenDDS::DCPS::DataWriterImpl, OpenDDS::DCPS::DataWriterImpl_T< MessageType >, OpenDDS::DCPS::RecorderImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 166 of file TransportClient.h.
bool OpenDDS::DCPS::TransportClient::cdr_encapsulation_ [private] |
Definition at line 293 of file TransportClient.h.
Referenced by enable_transport_using_config().
Definition at line 297 of file TransportClient.h.
Referenced by enable_transport_using_config().
DataLinkIndex OpenDDS::DCPS::TransportClient::data_link_index_ [private] |
Definition at line 275 of file TransportClient.h.
Referenced by add_link(), disassociate(), send_control_to(), and send_response().
bool OpenDDS::DCPS::TransportClient::durable_ [private] |
Definition at line 293 of file TransportClient.h.
Referenced by associate(), and enable_transport_using_config().
Definition at line 281 of file TransportClient.h.
Referenced by send_i().
ImplsType OpenDDS::DCPS::TransportClient::impls_ [private] |
Definition at line 267 of file TransportClient.h.
Referenced by associate(), enable_transport_using_config(), register_for_reader(), register_for_writer(), unregister_for_reader(), unregister_for_writer(), and ~TransportClient().
Definition at line 269 of file TransportClient.h.
Referenced by add_link(), disassociate(), remove_all_msgs(), remove_sample(), send_control(), send_final_acks(), and send_i().
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
Reimplemented in OpenDDS::DCPS::DataWriterImpl, and OpenDDS::DCPS::ReplayerImpl.
Definition at line 300 of file TransportClient.h.
Referenced by associate(), disassociate(), register_for_reader(), register_for_writer(), send_control_to(), stop_associating(), unregister_for_reader(), unregister_for_writer(), use_datalink(), and ~TransportClient().
Definition at line 282 of file TransportClient.h.
Referenced by send_i().
Definition at line 289 of file TransportClient.h.
Referenced by send_i().
Definition at line 295 of file TransportClient.h.
Referenced by enable_transport_using_config().
PendingMap OpenDDS::DCPS::TransportClient::pending_ [private] |
Definition at line 268 of file TransportClient.h.
Referenced by associate(), disassociate(), stop_associating(), use_datalink_i(), and ~TransportClient().
Definition at line 263 of file TransportClient.h.
Referenced by associate(), use_datalink_i(), and ~TransportClient().
bool OpenDDS::DCPS::TransportClient::reliable_ [private] |
Definition at line 293 of file TransportClient.h.
Referenced by associate(), and enable_transport_using_config().
Reimplemented in OpenDDS::RTPS::Sedp::Endpoint.
Definition at line 305 of file TransportClient.h.
Referenced by add_link(), associate(), disassociate(), OpenDDS::DCPS::TransportClient::PendingAssoc::initiate_connect(), initiate_connect_i(), remove_all_msgs(), send_control(), send_control_to(), send_i(), and ~TransportClient().
Definition at line 303 of file TransportClient.h.
Referenced by associate().
These are the links being used during the call to send(). This is made a member of the class to minimize allocation/deallocations of the data link set.
Definition at line 273 of file TransportClient.h.
Definition at line 280 of file TransportClient.h.
Referenced by send(), and send_w_control().
bool OpenDDS::DCPS::TransportClient::swap_bytes_ [private] |
Definition at line 293 of file TransportClient.h.
Referenced by enable_transport_using_config().