#include <DataLink.h>
This class manages the reservations based on the associations between datareader and datawriter. It basically delegate the samples to send strategy for sending and deliver the samples received by receive strategy to the listener.
Notes about object ownership: 1) Own the send strategy object and receive strategy object. 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection is enabled.
Definition at line 66 of file DataLink.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::DataLink::GuardType [protected] |
Definition at line 392 of file DataLink.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLink::LockType [private] |
Definition at line 337 of file DataLink.h.
typedef std::pair<TransportClient_wrch, RepoId> OpenDDS::DCPS::DataLink::OnStartCallback |
Definition at line 249 of file DataLink.h.
Definition at line 248 of file DataLink.h.
Definition at line 73 of file DataLink.h.
00073 { 00074 DISCONNECTED, 00075 RECONNECTED, 00076 LOST 00077 };
OpenDDS::DCPS::DataLink::DataLink | ( | TransportImpl & | impl, | |
Priority | priority, | |||
bool | is_loopback, | |||
bool | is_active | |||
) |
Only called by our TransportImpl object.
A DataLink object is always created by a TransportImpl object. Thus, the TransportImpl object passed-in here is the object that created this DataLink. The ability to specify a priority for individual links is included for construction so its value can be available for activating any threads.
Definition at line 42 of file DataLink.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::config(), OpenDDS::DCPS::TransportInst::datalink_control_chunks_, OpenDDS::DCPS::TransportInst::datalink_release_delay_, datalink_release_delay_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_next_datalink_id(), id_, LM_DEBUG, LM_ERROR, mb_allocator_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), ACE_Time_Value::sec(), thr_per_con_send_task_, OpenDDS::DCPS::TransportInst::thread_per_connection_, ThreadPerConnectionSendTask, and ACE_Time_Value::usec().
00044 : stopped_(false), 00045 scheduled_to_stop_at_(ACE_Time_Value::zero), 00046 impl_(impl), 00047 transport_priority_(priority), 00048 scheduling_release_(false), 00049 is_loopback_(is_loopback), 00050 is_active_(is_active), 00051 started_(false), 00052 send_response_listener_("DataLink") 00053 { 00054 DBG_ENTRY_LVL("DataLink", "DataLink", 6); 00055 00056 datalink_release_delay_.sec(impl.config().datalink_release_delay_ / 1000); 00057 datalink_release_delay_.usec(impl.config().datalink_release_delay_ % 1000 * 1000); 00058 00059 id_ = DataLink::get_next_datalink_id(); 00060 00061 if (impl.config().thread_per_connection_) { 00062 this->thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this)); 00063 00064 if (this->thr_per_con_send_task_->open() == -1) { 00065 ACE_ERROR((LM_ERROR, 00066 ACE_TEXT("(%P|%t) DataLink::DataLink: ") 00067 ACE_TEXT("failed to open ThreadPerConnectionSendTask\n"))); 00068 00069 } else if (DCPS_debug_level > 4) { 00070 ACE_DEBUG((LM_DEBUG, 00071 ACE_TEXT("(%P|%t) DataLink::DataLink - ") 00072 ACE_TEXT("started new thread to send data with.\n"))); 00073 } 00074 } 00075 00076 // Initialize transport control sample allocators: 00077 size_t control_chunks = impl.config().datalink_control_chunks_; 00078 00079 this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks)); 00080 this->db_allocator_.reset(new DataBlockAllocator(control_chunks)); 00081 }
OpenDDS::DCPS::DataLink::~DataLink | ( | ) | [virtual] |
Definition at line 83 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, DBG_ENTRY_LVL, LM_WARNING, and thr_per_con_send_task_.
00084 { 00085 DBG_ENTRY_LVL("DataLink", "~DataLink", 6); 00086 00087 if (!assoc_by_local_.empty()) { 00088 ACE_DEBUG((LM_WARNING, 00089 ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ") 00090 ACE_TEXT("link still in use by %d entities when deleted!\n"), 00091 this, assoc_by_local_.size())); 00092 } 00093 00094 if (this->thr_per_con_send_task_ != 0) { 00095 this->thr_per_con_send_task_->close(1); 00096 } 00097 }
bool OpenDDS::DCPS::DataLink::add_on_start_callback | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote | |||
) |
Definition at line 107 of file DataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, started_, and strategy_lock_.
00108 { 00109 GuardType guard(strategy_lock_); 00110 00111 if (started_ && !send_strategy_.is_nil()) { 00112 return false; // link already started 00113 } 00114 on_start_callbacks_.push_back(std::make_pair(client, remote)); 00115 return true; 00116 }
bool OpenDDS::DCPS::DataLink::cancel_release | ( | ) |
Definition at line 449 of file DataLink.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, scheduling_release_, set_scheduling_release(), stopped_, and ACE_Time_Value::zero.
00450 { 00451 DBG_ENTRY_LVL("DataLink", "cancel_release", 6); 00452 if (stopped_) { 00453 if (DCPS_debug_level > 0) { 00454 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this)); 00455 } 00456 return false; 00457 } 00458 if (scheduling_release_) { 00459 if (DCPS_debug_level > 0) { 00460 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this)); 00461 } 00462 this->set_scheduling_release(false); 00463 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00464 notify_reactor(); 00465 } 00466 return true; 00467 }
void OpenDDS::DCPS::DataLink::clear_associations | ( | ) |
This is called by DataLinkCleanupTask thread to remove the associations based on the snapshot in release_resources().
Definition at line 901 of file DataLink.cpp.
References assoc_releasing_, recv_listener_for(), send_listener_for(), and OpenDDS::DCPS::set_to_seq().
00902 { 00903 for (AssocByLocal::iterator iter = assoc_releasing_.begin(); 00904 iter != assoc_releasing_.end(); ++iter) { 00905 TransportSendListener_rch tsl = send_listener_for(iter->first); 00906 if (tsl) { 00907 ReaderIdSeq sub_ids; 00908 set_to_seq(iter->second, sub_ids); 00909 tsl->remove_associations(sub_ids, false); 00910 continue; 00911 } 00912 TransportReceiveListener_rch trl = recv_listener_for(iter->first); 00913 if (trl) { 00914 WriterIdSeq pub_ids; 00915 set_to_seq(iter->second, pub_ids); 00916 trl->remove_associations(pub_ids, false); 00917 } 00918 } 00919 assoc_releasing_.clear(); 00920 }
ACE_INLINE const char * OpenDDS::DCPS::DataLink::connection_notice_as_str | ( | ConnectionNotice | notice | ) | [private] |
Helper function to output the enum as a string to help debugging.
Definition at line 289 of file DataLink.inl.
Referenced by notify().
00290 { 00291 static const char* NoticeStr[] = { "DISCONNECTED", 00292 "RECONNECTED", 00293 "LOST" 00294 }; 00295 00296 return NoticeStr [notice]; 00297 }
ACE_Message_Block * OpenDDS::DCPS::DataLink::create_control | ( | char | submessage_id, | |
DataSampleHeader & | header, | |||
Message_Block_Ptr | data | |||
) |
This allows a subclass to easily create a transport control sample to send via send_control.
Definition at line 476 of file DataLink.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::DataSampleHeader::submessage_id_, OpenDDS::DCPS::TRANSPORT_CONTROL, and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00479 { 00480 DBG_ENTRY_LVL("DataLink", "create_control", 6); 00481 00482 header.byte_order_ = ACE_CDR_BYTE_ORDER; 00483 header.message_id_ = TRANSPORT_CONTROL; 00484 header.submessage_id_ = submessage_id; 00485 header.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 00486 00487 ACE_Message_Block* message = 0; 00488 ACE_NEW_MALLOC_RETURN(message, 00489 static_cast<ACE_Message_Block*>( 00490 this->mb_allocator_->malloc(sizeof(ACE_Message_Block))), 00491 ACE_Message_Block(header.max_marshaled_size(), 00492 ACE_Message_Block::MB_DATA, 00493 data.release(), 00494 0, // data 00495 0, // allocator_strategy 00496 0, // locking_strategy 00497 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00498 ACE_Time_Value::zero, 00499 ACE_Time_Value::max_time, 00500 this->db_allocator_.get(), 00501 this->mb_allocator_.get()), 00502 0); 00503 00504 if (!(*message << header)) { 00505 ACE_ERROR((LM_ERROR, 00506 ACE_TEXT("(%P|%t) DataLink::create_control: ") 00507 ACE_TEXT("cannot put header in message\n"))); 00508 ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block); 00509 message = 0; 00510 } 00511 00512 return message; 00513 }
virtual TransportQueueElement* OpenDDS::DCPS::DataLink::customize_queue_element | ( | TransportQueueElement * | element | ) | [inline, private, virtual] |
Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 320 of file DataLink.h.
Referenced by send().
int OpenDDS::DCPS::DataLink::data_received | ( | ReceivedDataSample & | sample, | |
const RepoId & | readerId = GUID_UNKNOWN | |||
) |
This is called by our TransportReceiveStrategy object when it has received a complete data sample. This method will cause the appropriate TransportReceiveListener objects to be told that data_received(). If readerId is not GUID_UNKNOWN, only the TransportReceiveListener with that ID (if one exists) will receive the data.
This method will "deliver" the sample to all TransportReceiveListeners within this DataLink that are interested in the (remote) publisher id that sent the sample.
Definition at line 538 of file DataLink.cpp.
References data_received_i(), and OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED.
Referenced by OpenDDS::DCPS::ReliableSession::deliver_held_data(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::handle_exception(), and OpenDDS::DCPS::MulticastDataLink::sample_received().
00540 { 00541 data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED); 00542 return 0; 00543 }
void OpenDDS::DCPS::DataLink::data_received_i | ( | ReceivedDataSample & | sample, | |
const RepoId & | readerId, | |||
const RepoIdSet & | incl_excl, | |||
ReceiveListenerSet::ConstrainReceiveSet | constrain | |||
) | [private] |
Definition at line 552 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_remote_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::DataSampleHeader::content_filter_entries_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, default_listener_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ReceivedDataSample::header_, if(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED, OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by data_received(), and data_received_include().
00556 { 00557 DBG_ENTRY_LVL("DataLink", "data_received_i", 6); 00558 // Which remote publication sent this message? 00559 const RepoId& publication_id = sample.header_.publication_id_; 00560 00561 // Locate the set of TransportReceiveListeners associated with this 00562 // DataLink that are interested in hearing about any samples received 00563 // from the remote publisher_id. 00564 if (DCPS_debug_level > 9) { 00565 const GuidConverter converter(publication_id); 00566 const GuidConverter reader(readerId); 00567 ACE_DEBUG((LM_DEBUG, 00568 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00569 ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"), 00570 OPENDDS_STRING(converter).c_str(), 00571 to_string(sample.header_).c_str(), 00572 OPENDDS_STRING(reader).c_str(), 00573 constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED")); 00574 } 00575 00576 if (Transport_debug_level > 9) { 00577 const GuidConverter converter(publication_id); 00578 ACE_DEBUG((LM_DEBUG, 00579 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00580 ACE_TEXT("from publication %C received sample: %C.\n"), 00581 OPENDDS_STRING(converter).c_str(), 00582 to_string(sample.header_).c_str())); 00583 } 00584 00585 ReceiveListenerSet_rch listener_set; 00586 { 00587 GuardType guard(this->pub_sub_maps_lock_); 00588 AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id); 00589 if (iter != assoc_by_remote_.end()) 00590 listener_set = iter->second; 00591 00592 if (listener_set.is_nil()) { 00593 TransportReceiveListener_rch listener = this->default_listener_.lock(); 00594 if (listener) 00595 listener->data_received(sample); 00596 return; 00597 } 00598 } 00599 00600 if (listener_set.is_nil()) { 00601 // Nobody has any interest in this message. Drop it on the floor. 00602 if (Transport_debug_level > 4) { 00603 const GuidConverter converter(publication_id); 00604 ACE_DEBUG((LM_DEBUG, 00605 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00606 ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"), 00607 OPENDDS_STRING(converter).c_str())); 00608 } 00609 00610 return; 00611 } 00612 00613 if (readerId != GUID_UNKNOWN) { 00614 listener_set->data_received(sample, readerId); 00615 return; 00616 } 00617 00618 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00619 00620 if (sample.header_.content_filter_ 00621 && sample.header_.content_filter_entries_.length()) { 00622 ReceiveListenerSet subset(*listener_set.in()); 00623 subset.remove_all(sample.header_.content_filter_entries_); 00624 subset.data_received(sample, incl_excl, constrain); 00625 00626 } else { 00627 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00628 00629 if (DCPS_debug_level > 9) { 00630 // Just get the set to do our dirty work by having it iterate over its 00631 // collection of TransportReceiveListeners, and invoke the data_received() 00632 // method on each one. 00633 OPENDDS_STRING included_ids; 00634 bool first = true; 00635 RepoIdSet::const_iterator iter = incl_excl.begin(); 00636 while(iter != incl_excl.end()) { 00637 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter)); 00638 first = false; 00639 ++iter; 00640 } 00641 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n", 00642 constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str())); 00643 } 00644 listener_set->data_received(sample, incl_excl, constrain); 00645 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00646 } 00647 00648 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00649 }
void OpenDDS::DCPS::DataLink::data_received_include | ( | ReceivedDataSample & | sample, | |
const RepoIdSet & | incl | |||
) |
Varation of data_received() that allows for excluding a subset of readers by specifying which readers specifically should receive. Any reader ID that does not appear in the include set will be skipped.
Definition at line 546 of file DataLink.cpp.
References data_received_i(), OpenDDS::DCPS::GUID_UNKNOWN, and OpenDDS::DCPS::ReceiveListenerSet::SET_INCLUDED.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().
00547 { 00548 data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED); 00549 }
ACE_INLINE const ACE_Time_Value & OpenDDS::DCPS::DataLink::datalink_release_delay | ( | ) | const |
Definition at line 62 of file DataLink.inl.
References datalink_release_delay_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00063 { 00064 return this->datalink_release_delay_; 00065 }
ACE_INLINE TransportReceiveListener_wrch OpenDDS::DCPS::DataLink::default_listener | ( | ) | const |
Definition at line 379 of file DataLink.inl.
References default_listener_, and pub_sub_maps_lock_.
00380 { 00381 GuardType guard(this->pub_sub_maps_lock_); 00382 return this->default_listener_; 00383 }
ACE_INLINE void OpenDDS::DCPS::DataLink::default_listener | ( | const TransportReceiveListener_wrch & | trl | ) |
Definition at line 371 of file DataLink.inl.
References default_listener_, and pub_sub_maps_lock_.
00372 { 00373 GuardType guard(this->pub_sub_maps_lock_); 00374 this->default_listener_ = trl; 00375 }
ACE_UINT64 OpenDDS::DCPS::DataLink::get_next_datalink_id | ( | ) | [static, protected] |
Used to provide unique Ids to all DataLink methods.
Definition at line 653 of file DataLink.cpp.
References ACE_TEXT(), id(), and LM_ERROR.
Referenced by DataLink().
00654 { 00655 static ACE_UINT64 next_id = 0; 00656 static LockType lock; 00657 00658 ACE_UINT64 id; 00659 { 00660 GuardType guard(lock); 00661 id = next_id++; 00662 00663 if (0 == next_id) { 00664 ACE_ERROR((LM_ERROR, 00665 ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ") 00666 ACE_TEXT("has rolled over and is reusing ids!\n"))); 00667 } 00668 } 00669 00670 return id; 00671 }
int OpenDDS::DCPS::DataLink::handle_close | ( | ACE_HANDLE | h, | |
ACE_Reactor_Mask | m | |||
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 937 of file DataLink.cpp.
References handle_timeout(), ACE_Event_Handler::TIMER_MASK, and ACE_Time_Value::zero.
00938 { 00939 if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) { 00940 // Reactor is shutting down with this timer still pending. 00941 // Take the same cleanup actions as if the timeout had expired. 00942 handle_timeout(ACE_Time_Value::zero, 0); 00943 } 00944 00945 return 0; 00946 }
int OpenDDS::DCPS::DataLink::handle_exception | ( | ACE_HANDLE | ) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 154 of file DataLink.cpp.
References ACE_TEXT(), ACE_Reactor_Timer_Interface::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::gettimeofday(), handle_timeout(), impl_, LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Reactor_Timer_Interface::schedule_timer(), scheduled_to_stop_at_, scheduling_release_, stop(), OpenDDS::DCPS::TransportImpl::timer(), and ACE_Time_Value::zero.
00155 { 00156 if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) { 00157 if (DCPS_debug_level > 0) { 00158 ACE_DEBUG((LM_DEBUG, 00159 ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n"))); 00160 } 00161 ACE_Reactor_Timer_Interface* reactor = impl_.timer(); 00162 if (reactor->cancel_timer(this) > 0) { 00163 if (DCPS_debug_level > 0) { 00164 ACE_DEBUG((LM_DEBUG, 00165 ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n"))); 00166 } 00167 } 00168 return 0; 00169 } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) { 00170 if (this->scheduling_release_) { 00171 if (DCPS_debug_level > 0) { 00172 ACE_DEBUG((LM_DEBUG, 00173 ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n"))); 00174 } 00175 this->handle_timeout(ACE_Time_Value::zero, 0); 00176 return 0; 00177 } 00178 if (DCPS_debug_level > 0) { 00179 ACE_DEBUG((LM_DEBUG, 00180 ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n"))); 00181 } 00182 this->stop(); 00183 return 0; 00184 } else /* SCHEDULE TO STOP IN THE FUTURE*/ { 00185 if (DCPS_debug_level > 0) { 00186 ACE_DEBUG((LM_DEBUG, 00187 ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n"))); 00188 } 00189 ACE_Reactor_Timer_Interface* reactor = impl_.timer(); 00190 ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday(); 00191 reactor->schedule_timer(this, 0, future_release_time); 00192 } 00193 return 0; 00194 }
bool OpenDDS::DCPS::DataLink::handle_send_request_ack | ( | TransportQueueElement * | element | ) | [private, virtual] |
Reimplemented in OpenDDS::DCPS::TcpDataLink.
Definition at line 1025 of file DataLink.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered().
Referenced by send().
int OpenDDS::DCPS::DataLink::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 923 of file DataLink.cpp.
References assoc_by_local_, assoc_by_remote_, impl_, LM_DEBUG, scheduled_to_stop_at_, stop(), OpenDDS::DCPS::TransportImpl::unbind_link(), VDBG_LVL, and ACE_Time_Value::zero.
Referenced by handle_close(), and handle_exception().
00924 { 00925 if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) { 00926 VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4); 00927 impl_.unbind_link(this); 00928 00929 if (assoc_by_remote_.empty() && assoc_by_local_.empty()) { 00930 this->stop(); 00931 } 00932 } 00933 return 0; 00934 }
ACE_INLINE DataLinkIdType OpenDDS::DCPS::DataLink::id | ( | void | ) | const |
Obtain a unique identifier for this DataLink object.
Definition at line 231 of file DataLink.inl.
References DBG_ENTRY_LVL, and id_.
Referenced by get_next_datalink_id().
00232 { 00233 DBG_ENTRY_LVL("DataLink","id",6); 00234 return id_; 00235 }
TransportImpl & OpenDDS::DCPS::DataLink::impl | ( | void | ) | const |
Reimplemented in OpenDDS::DCPS::ShmemDataLink.
Definition at line 100 of file DataLink.cpp.
References impl_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::UdpDataLink::control_received(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::TcpDataLink::reconnect(), and OpenDDS::DCPS::MulticastDataLink::transport().
00101 { 00102 return impl_; 00103 }
void OpenDDS::DCPS::DataLink::invoke_on_start_callbacks | ( | bool | success | ) |
Definition at line 131 of file DataLink.cpp.
References ACE_Guard< ACE_LOCK >::release(), and strategy_lock_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received(), and start().
00132 { 00133 const DataLink_rch link(success ? this : 0, inc_count()); 00134 00135 while (true) { 00136 GuardType guard(strategy_lock_); 00137 00138 if (on_start_callbacks_.empty()) { 00139 break; 00140 } 00141 00142 OnStartCallback last_callback = on_start_callbacks_.back(); 00143 on_start_callbacks_.pop_back(); 00144 00145 guard.release(); 00146 TransportClient_rch client = last_callback.first.lock(); 00147 if (client) 00148 client->use_datalink(last_callback.second, link); 00149 } 00150 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::is_active | ( | ) | const |
Definition at line 56 of file DataLink.inl.
References is_active_.
00057 { 00058 return this->is_active_; 00059 }
ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_active | ( | ) |
Definition at line 49 of file DataLink.inl.
References is_active_.
Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::TcpTransport::release_datalink(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00050 { 00051 return this->is_active_; 00052 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::is_loopback | ( | void | ) | const |
Definition at line 42 of file DataLink.inl.
References is_loopback_.
00043 { 00044 return this->is_loopback_; 00045 }
ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_loopback | ( | void | ) |
Definition at line 35 of file DataLink.inl.
References is_loopback_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00036 { 00037 return this->is_loopback_; 00038 }
bool OpenDDS::DCPS::DataLink::is_target | ( | const RepoId & | remote_sub_id | ) |
This is called on publisher side to see if this link communicates with the provided sub.
Definition at line 855 of file DataLink.cpp.
References assoc_by_remote_, and pub_sub_maps_lock_.
00856 { 00857 GuardType guard(this->pub_sub_maps_lock_); 00858 return assoc_by_remote_.count(remote_sub_id); 00859 }
int OpenDDS::DCPS::DataLink::make_reservation | ( | const RepoId & | remote_publication_id, | |
const RepoId & | local_subcription_id, | |||
const TransportReceiveListener_wrch & | receive_listener | |||
) |
Only called by our TransportImpl object.
Return Codes: 0 means successful reservation made. -1 means failure.
Definition at line 301 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_strategy_, and strategy_lock_.
00304 { 00305 DBG_ENTRY_LVL("DataLink", "make_reservation", 6); 00306 00307 if (DCPS_debug_level > 9) { 00308 GuidConverter local(local_subscription_id), remote(remote_publication_id); 00309 ACE_DEBUG((LM_DEBUG, 00310 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ") 00311 ACE_TEXT("creating association local subscription %C ") 00312 ACE_TEXT("<--> with remote publication %C.\n"), 00313 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str())); 00314 } 00315 00316 { 00317 GuardType guard(strategy_lock_); 00318 00319 if (!send_strategy_.is_nil()) { 00320 send_strategy_->link_released(false); 00321 } 00322 } 00323 { 00324 GuardType guard(pub_sub_maps_lock_); 00325 00326 assoc_by_local_[local_subscription_id].insert(remote_publication_id); 00327 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id]; 00328 00329 if (rls.is_nil()) 00330 rls = make_rch<ReceiveListenerSet>(); 00331 rls->insert(local_subscription_id, receive_listener); 00332 00333 recv_listeners_.insert(std::make_pair(local_subscription_id, 00334 receive_listener)); 00335 } 00336 return 0; 00337 }
int OpenDDS::DCPS::DataLink::make_reservation | ( | const RepoId & | remote_subscription_id, | |
const RepoId & | local_publication_id, | |||
const TransportSendListener_wrch & | send_listener | |||
) |
Only called by our TransportImpl object.
Return Codes: 0 means successful reservation made. -1 means failure.
Definition at line 262 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, send_listeners_, send_strategy_, and strategy_lock_.
00265 { 00266 DBG_ENTRY_LVL("DataLink", "make_reservation", 6); 00267 00268 if (DCPS_debug_level > 9) { 00269 GuidConverter local(local_publication_id), remote(remote_subscription_id); 00270 ACE_DEBUG((LM_DEBUG, 00271 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ") 00272 ACE_TEXT("creating association local publication %C ") 00273 ACE_TEXT("<--> with remote subscription %C.\n"), 00274 OPENDDS_STRING(local).c_str(), 00275 OPENDDS_STRING(remote).c_str())); 00276 } 00277 00278 { 00279 GuardType guard(strategy_lock_); 00280 00281 if (!send_strategy_.is_nil()) { 00282 send_strategy_->link_released(false); 00283 } 00284 } 00285 { 00286 GuardType guard(pub_sub_maps_lock_); 00287 00288 assoc_by_local_[local_publication_id].insert(remote_subscription_id); 00289 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id]; 00290 00291 if (rls.is_nil()) 00292 rls = make_rch<ReceiveListenerSet>(); 00293 rls->insert(local_publication_id, TransportReceiveListener_rch()); 00294 00295 send_listeners_.insert(std::make_pair(local_publication_id,send_listener)); 00296 } 00297 return 0; 00298 }
void OpenDDS::DCPS::DataLink::notify | ( | ConnectionNotice | notice | ) |
Notify the datawriters and datareaders that the connection is disconnected, lost, or reconnected. The datareader/datawriter will notify the corresponding listener.
Definition at line 692 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, connection_notice_as_str(), DBG_ENTRY_LVL, DISCONNECTED, LM_DEBUG, LM_ERROR, LOST, OPENDDS_STRING, pub_sub_maps_lock_, RECONNECTED, recv_listeners_, send_listeners_, OpenDDS::DCPS::set_to_seq(), OpenDDS::DCPS::Transport_debug_level, and VDBG.
00693 { 00694 DBG_ENTRY_LVL("DataLink", "notify", 6); 00695 00696 VDBG((LM_DEBUG, 00697 ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"), 00698 this, 00699 connection_notice_as_str(notice))); 00700 00701 GuardType guard(this->pub_sub_maps_lock_); 00702 00703 // Notify the datawriters 00704 // the lost publications due to a connection problem. 00705 for (IdToSendListenerMap::iterator itr = send_listeners_.begin(); 00706 itr != send_listeners_.end(); ++itr) { 00707 00708 TransportSendListener_rch tsl = itr->second.lock(); 00709 00710 if (tsl) { 00711 if (Transport_debug_level > 0) { 00712 GuidConverter converter(itr->first); 00713 ACE_DEBUG((LM_DEBUG, 00714 ACE_TEXT("(%P|%t) DataLink::notify: ") 00715 ACE_TEXT("notify pub %C %C.\n"), 00716 OPENDDS_STRING(converter).c_str(), 00717 connection_notice_as_str(notice))); 00718 } 00719 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first); 00720 if (local_it == assoc_by_local_.end()) { 00721 if (Transport_debug_level) { 00722 GuidConverter converter(itr->first); 00723 ACE_DEBUG((LM_DEBUG, 00724 ACE_TEXT("(%P|%t) DataLink::notify: ") 00725 ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"), 00726 OPENDDS_STRING(converter).c_str(), 00727 connection_notice_as_str(notice))); 00728 } 00729 break; 00730 } 00731 const RepoIdSet& rids = local_it->second; 00732 00733 ReaderIdSeq subids; 00734 set_to_seq(rids, subids); 00735 00736 switch (notice) { 00737 case DISCONNECTED: 00738 tsl->notify_publication_disconnected(subids); 00739 break; 00740 00741 case RECONNECTED: 00742 tsl->notify_publication_reconnected(subids); 00743 break; 00744 00745 case LOST: 00746 tsl->notify_publication_lost(subids); 00747 break; 00748 00749 default: 00750 ACE_ERROR((LM_ERROR, 00751 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ") 00752 ACE_TEXT("unknown notice to TransportSendListener\n"))); 00753 break; 00754 } 00755 00756 } else { 00757 if (Transport_debug_level > 0) { 00758 GuidConverter converter(itr->first); 00759 ACE_DEBUG((LM_DEBUG, 00760 ACE_TEXT("(%P|%t) DataLink::notify: ") 00761 ACE_TEXT("not notify pub %C %C \n"), 00762 OPENDDS_STRING(converter).c_str(), 00763 connection_notice_as_str(notice))); 00764 } 00765 } 00766 00767 } 00768 00769 // Notify the datareaders registered with TransportImpl 00770 // the lost subscriptions due to a connection problem. 00771 for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin(); 00772 itr != recv_listeners_.end(); ++itr) { 00773 00774 TransportReceiveListener_rch trl = itr->second.lock(); 00775 00776 if (trl) { 00777 if (Transport_debug_level > 0) { 00778 GuidConverter converter(itr->first); 00779 ACE_DEBUG((LM_DEBUG, 00780 ACE_TEXT("(%P|%t) DataLink::notify: ") 00781 ACE_TEXT("notify sub %C %C.\n"), 00782 OPENDDS_STRING(converter).c_str(), 00783 connection_notice_as_str(notice))); 00784 } 00785 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first); 00786 if (local_it == assoc_by_local_.end()) { 00787 if (Transport_debug_level) { 00788 GuidConverter converter(itr->first); 00789 ACE_DEBUG((LM_DEBUG, 00790 ACE_TEXT("(%P|%t) DataLink::notify: ") 00791 ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"), 00792 OPENDDS_STRING(converter).c_str(), 00793 connection_notice_as_str(notice))); 00794 } 00795 break; 00796 } 00797 const RepoIdSet& rids = local_it->second; 00798 00799 WriterIdSeq pubids; 00800 set_to_seq(rids, pubids); 00801 00802 switch (notice) { 00803 case DISCONNECTED: 00804 trl->notify_subscription_disconnected(pubids); 00805 break; 00806 00807 case RECONNECTED: 00808 trl->notify_subscription_reconnected(pubids); 00809 break; 00810 00811 case LOST: 00812 trl->notify_subscription_lost(pubids); 00813 break; 00814 00815 default: 00816 ACE_ERROR((LM_ERROR, 00817 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ") 00818 ACE_TEXT("unknown notice to datareader.\n"))); 00819 break; 00820 } 00821 00822 } else { 00823 if (Transport_debug_level > 0) { 00824 GuidConverter converter(itr->first); 00825 ACE_DEBUG((LM_DEBUG, 00826 ACE_TEXT("(%P|%t) DataLink::notify: ") 00827 ACE_TEXT("not notify sub %C subscription lost.\n"), 00828 OPENDDS_STRING(converter).c_str())); 00829 } 00830 00831 } 00832 } 00833 }
void OpenDDS::DCPS::DataLink::notify_reactor | ( | void | ) | [private] |
Definition at line 215 of file DataLink.cpp.
References impl_, ACE_Reactor::notify(), ACE_Event_Handler::reactor(), and OpenDDS::DCPS::TransportImpl::reactor_task().
Referenced by cancel_release(), and schedule_stop().
00216 { 00217 TransportReactorTask_rch reactor(impl_.reactor_task()); 00218 reactor->get_reactor()->notify(this); 00219 }
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RepoIdSet | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReceiveListenerSet_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
TransportReceiveListener_wrch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Map subscription Id value to TransportReceieveListener.
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
TransportSendListener_wrch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Map publication Id value to TransportSendListener.
OpenDDS::DCPS::DataLink::OPENDDS_VECTOR | ( | OnStartCallback | ) | [protected] |
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg(), OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags(), OpenDDS::DCPS::RtpsUdpDataLink::marshal_gaps(), OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i(), OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i(), OpenDDS::DCPS::RtpsUdpDataLink::process_gap_i(), OpenDDS::DCPS::RtpsUdpDataLink::received(), OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().
For a given local RepoId (publication or subscription), return the list of remote peer RepoIds (subscriptions or publications) that this link knows about due to make_reservation().
Definition at line 350 of file DataLink.cpp.
References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::set_to_seq().
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), and OpenDDS::DCPS::RtpsUdpDataLink::get_locators().
00351 { 00352 GuardType guard(pub_sub_maps_lock_); 00353 00354 const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id); 00355 00356 if (iter == assoc_by_local_.end()) 00357 return 0; 00358 00359 GUIDSeq_var result = new GUIDSeq; 00360 set_to_seq(iter->second, static_cast<GUIDSeq&>(result)); 00361 return result._retn(); 00362 }
void OpenDDS::DCPS::DataLink::pre_stop_i | ( | ) | [virtual] |
Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::TcpDataLink.
Definition at line 838 of file DataLink.cpp.
References thr_per_con_send_task_.
Referenced by stop().
00839 { 00840 if (this->thr_per_con_send_task_ != 0) { 00841 this->thr_per_con_send_task_->close(1); 00842 } 00843 }
void OpenDDS::DCPS::DataLink::prepare_release | ( | ) | [private] |
Save current sub and pub association maps for releasing and create empty maps for new associations.
Definition at line 887 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, assoc_releasing_, LM_ERROR, and pub_sub_maps_lock_.
Referenced by release_resources().
00888 { 00889 GuardType guard(this->pub_sub_maps_lock_); 00890 00891 if (!assoc_releasing_.empty()) { 00892 ACE_ERROR((LM_ERROR, 00893 ACE_TEXT("(%P|%t) DataLink::prepare_release: ") 00894 ACE_TEXT("already prepared for release.\n"))); 00895 return; 00896 } 00897 00898 assoc_releasing_ = assoc_by_local_; 00899 }
ACE_INLINE TransportReceiveListener_rch OpenDDS::DCPS::DataLink::recv_listener_for | ( | const RepoId & | sub_id | ) | const [private] |
Definition at line 357 of file DataLink.inl.
References recv_listeners_.
Referenced by clear_associations().
00358 { 00359 // sub_map_ (and recv_listeners_) are already locked when entering this 00360 // private method. 00361 IdToRecvListenerMap::const_iterator found = 00362 this->recv_listeners_.find(sub_id); 00363 if (found == this->recv_listeners_.end()) { 00364 return TransportReceiveListener_rch(); 00365 } 00366 return found->second.lock(); 00367 }
virtual void OpenDDS::DCPS::DataLink::release_remote_i | ( | const RepoId & | ) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::MulticastDataLink, and OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 326 of file DataLink.h.
Referenced by release_reservations().
void OpenDDS::DCPS::DataLink::release_reservations | ( | RepoId | remote_id, | |
RepoId | local_id, | |||
DataLinkSetMap & | released_locals | |||
) |
This will release reservations that were made by one of the make_reservation() methods. All we know is that the supplied RepoId is considered to be a remote id. It could be a remote subscriber or a remote publisher.
This gets invoked when a TransportClient::remove_associations() call has been made. Because this DataLink can be shared amongst different TransportClient objects, and different threads could be "managing" the different TransportClient objects, we need to make sure that this release_reservations() works in conjunction with a simultaneous call (in another thread) to one of this DataLink's make_reservation() methods.
Definition at line 372 of file DataLink.cpp.
References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, impl_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::release_datalink(), release_remote_i(), release_reservations_i(), and VDBG_LVL.
00374 { 00375 DBG_ENTRY_LVL("DataLink", "release_reservations", 6); 00376 00377 if (DCPS_debug_level > 9) { 00378 GuidConverter local(local_id); 00379 GuidConverter remote(remote_id); 00380 ACE_DEBUG((LM_DEBUG, 00381 ACE_TEXT("(%P|%t) DataLink::release_reservations() - ") 00382 ACE_TEXT("releasing association local: %C ") 00383 ACE_TEXT("<--> with remote %C.\n"), 00384 OPENDDS_STRING(local).c_str(), 00385 OPENDDS_STRING(remote).c_str())); 00386 } 00387 00388 //let the specific class release its reservations 00389 //done this way to prevent deadlock of holding pub_sub_maps_lock_ 00390 //then obtaining a specific class lock in release_reservations_i 00391 //which reverses lock ordering of the active send logic of needing 00392 //the specific class lock before obtaining the over arching DataLink 00393 //pub_sub_maps_lock_ 00394 this->release_reservations_i(remote_id, local_id); 00395 00396 bool release_remote_required = false; 00397 { 00398 GuardType guard(this->pub_sub_maps_lock_); 00399 00400 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id]; 00401 if (rls->size() == 1) { 00402 assoc_by_remote_.erase(remote_id); 00403 release_remote_required = true; 00404 } else { 00405 rls->remove(local_id); 00406 } 00407 RepoIdSet& ris = assoc_by_local_[local_id]; 00408 if (ris.size() == 1) { 00409 DataLinkSet_rch& links = released_locals[local_id]; 00410 if (links.is_nil()) 00411 links = make_rch<DataLinkSet>(); 00412 links->insert_link(rchandle_from(this)); 00413 assoc_by_local_.erase(local_id); 00414 } else { 00415 ris.erase(remote_id); 00416 } 00417 00418 if (assoc_by_local_.empty()) { 00419 VDBG_LVL((LM_DEBUG, 00420 ACE_TEXT("(%P|%t) DataLink::release_reservations: ") 00421 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5); 00422 00423 impl_.release_datalink(this); 00424 } 00425 } 00426 if (release_remote_required) 00427 release_remote_i(remote_id); 00428 }
virtual void OpenDDS::DCPS::DataLink::release_reservations_i | ( | const RepoId & | , | |
const RepoId & | ||||
) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 327 of file DataLink.h.
Referenced by release_reservations().
bool OpenDDS::DCPS::DataLink::release_resources | ( | ) |
Definition at line 846 of file DataLink.cpp.
References DBG_ENTRY_LVL, impl_, prepare_release(), and OpenDDS::DCPS::TransportImpl::release_link_resources().
00847 { 00848 DBG_ENTRY_LVL("DataLink", "release_resources", 6); 00849 00850 this->prepare_release(); 00851 return impl_.release_link_resources(this); 00852 }
ACE_INLINE void OpenDDS::DCPS::DataLink::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 211 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.
00212 { 00213 DBG_ENTRY_LVL("DataLink","remove_all_msgs",6); 00214 00215 // This one is easy. Simply delegate to our TransportSendStrategy 00216 // data member. 00217 00218 TransportSendStrategy_rch strategy; 00219 { 00220 GuardType guard(this->strategy_lock_); 00221 00222 strategy = this->send_strategy_; 00223 } 00224 00225 if (!strategy.is_nil()) { 00226 strategy->remove_all_msgs(pub_id); 00227 } 00228 }
ACE_INLINE void OpenDDS::DCPS::DataLink::remove_listener | ( | const RepoId & | local_id | ) |
Either send or receive listener for this local_id should be removed from internal DataLink structures so it no longer receives events.
Definition at line 308 of file DataLink.inl.
References ACE_TEXT(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_listeners_, and OpenDDS::DCPS::Transport_debug_level.
00309 { 00310 GuardType guard(pub_sub_maps_lock_); 00311 { 00312 IdToSendListenerMap::iterator pos = send_listeners_.find(local_id); 00313 if (pos != send_listeners_.end()) { 00314 send_listeners_.erase(pos); 00315 if (Transport_debug_level > 5) { 00316 GuidConverter converter(local_id); 00317 ACE_DEBUG((LM_DEBUG, 00318 ACE_TEXT("(%P|%t) DataLink::remove_listener: ") 00319 ACE_TEXT("removed %C from send_listeners\n"), 00320 OPENDDS_STRING(converter).c_str())); 00321 } 00322 return; 00323 } 00324 } 00325 { 00326 IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id); 00327 if (pos != recv_listeners_.end()) { 00328 recv_listeners_.erase(pos); 00329 if (Transport_debug_level > 5) { 00330 GuidConverter converter(local_id); 00331 ACE_DEBUG((LM_DEBUG, 00332 ACE_TEXT("(%P|%t) DataLink::remove_listener: ") 00333 ACE_TEXT("removed %C from recv_listeners\n"), 00334 OPENDDS_STRING(converter).c_str())); 00335 } 00336 return; 00337 } 00338 } 00339 }
void OpenDDS::DCPS::DataLink::remove_on_start_callback | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote | |||
) |
Definition at line 119 of file DataLink.cpp.
References OpenDDS::DCPS::remove(), and strategy_lock_.
00120 { 00121 GuardType guard(strategy_lock_); 00122 on_start_callbacks_.erase( 00123 std::remove(on_start_callbacks_.begin(), 00124 on_start_callbacks_.end(), 00125 std::make_pair(client, remote)), 00126 on_start_callbacks_.end()); 00127 }
ACE_INLINE RemoveResult OpenDDS::DCPS::DataLink::remove_sample | ( | const DataSampleElement * | sample, | |
void * | context | |||
) | [virtual] |
This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 183 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, send_strategy_, strategy_lock_, thr_per_con_send_task_, and VDBG.
00184 { 00185 DBG_ENTRY_LVL("DataLink", "remove_sample", 6); 00186 00187 if (this->thr_per_con_send_task_ != 0) { 00188 const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample); 00189 if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) { 00190 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00191 "Removed sample from ThreadPerConnection queue.\n")); 00192 return rr; 00193 } 00194 } 00195 00196 TransportSendStrategy_rch strategy; 00197 { 00198 GuardType guard(this->strategy_lock_); 00199 00200 strategy = this->send_strategy_; 00201 } 00202 00203 if (!strategy.is_nil()) { 00204 return strategy->remove_sample(sample, context); 00205 } 00206 00207 return REMOVE_NOT_FOUND; 00208 }
void OpenDDS::DCPS::DataLink::resume_send | ( | ) |
The resume_send is used in the case of reconnection on the subscriber's side.
Definition at line 255 of file DataLink.cpp.
References send_strategy_.
00256 { 00257 if (!this->send_strategy_->isDirectMode()) 00258 this->send_strategy_->resume_send(); 00259 }
void OpenDDS::DCPS::DataLink::schedule_delayed_release | ( | ) |
Definition at line 431 of file DataLink.cpp.
References datalink_release_delay_, DBG_ENTRY_LVL, ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, schedule_stop(), send_strategy_, and VDBG.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink().
00432 { 00433 DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6); 00434 00435 VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this)); 00436 00437 // The samples have to be removed at this point, otherwise the samples 00438 // can not be delivered when new association is added and still use 00439 // this connection/datalink. 00440 if (!this->send_strategy_.is_nil()) { 00441 this->send_strategy_->clear(); 00442 } 00443 00444 ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_; 00445 this->schedule_stop(future_release_time); 00446 }
void OpenDDS::DCPS::DataLink::schedule_stop | ( | const ACE_Time_Value & | schedule_to_stop_at | ) |
Definition at line 200 of file DataLink.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, stopped_, and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and schedule_delayed_release().
00201 { 00202 if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) { 00203 this->scheduled_to_stop_at_ = schedule_to_stop_at; 00204 notify_reactor(); 00205 // reactor will invoke our DataLink::handle_exception() 00206 } else { 00207 if (DCPS_debug_level > 0) { 00208 ACE_DEBUG((LM_DEBUG, 00209 ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n"))); 00210 } 00211 } 00212 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send | ( | TransportQueueElement * | element | ) |
Definition at line 99 of file DataLink.inl.
References customize_queue_element(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, handle_send_request_ack(), OpenDDS::DCPS::TransportQueueElement::is_request_ack(), OpenDDS::DCPS::SEND, send_i(), and thr_per_con_send_task_.
Referenced by send_control().
00100 { 00101 DBG_ENTRY_LVL("DataLink","send",6); 00102 00103 if (element->is_request_ack() && 00104 this->handle_send_request_ack(element)) { 00105 return; 00106 } 00107 00108 element = this->customize_queue_element(element); 00109 if (!element) { 00110 return; 00111 } 00112 00113 if (this->thr_per_con_send_task_ != 0) { 00114 if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) { 00115 element->data_dropped(true); 00116 } 00117 00118 } else { 00119 this->send_i(element); 00120 00121 } 00122 }
SendControlStatus OpenDDS::DCPS::DataLink::send_control | ( | const DataSampleHeader & | header, | |
Message_Block_Ptr | data | |||
) |
This allows a subclass to send transport control samples over this DataLink. This is useful for sending transport-specific control messages between one or more endpoints under this DataLink's control.
Definition at line 516 of file DataLink.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, send(), OpenDDS::DCPS::SEND_CONTROL_OK, send_response_listener_, send_start(), send_stop(), and OpenDDS::DCPS::SendResponseListener::track_message().
Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00517 { 00518 DBG_ENTRY_LVL("DataLink", "send_control", 6); 00519 00520 TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count 00521 GUID_UNKNOWN, &send_response_listener_, 00522 header, move(message)); 00523 00524 send_response_listener_.track_message(); 00525 00526 RepoId senderId(header.publication_id_); 00527 send_start(); 00528 send(elem); 00529 send_stop(senderId); 00530 00531 return SEND_CONTROL_OK; 00532 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_final_acks | ( | const RepoId & | readerid | ) | [virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 387 of file DataLink.inl.
ACE_INLINE void OpenDDS::DCPS::DataLink::send_i | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 125 of file DataLink.inl.
References OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, send_strategy_, and strategy_lock_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), send(), and OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message().
00126 { 00127 DBG_ENTRY_LVL("DataLink","send_i",6); 00128 // This one is easy. Simply delegate to our TransportSendStrategy 00129 // data member. 00130 00131 TransportSendStrategy_rch strategy; 00132 { 00133 GuardType guard(this->strategy_lock_); 00134 00135 strategy = this->send_strategy_; 00136 } 00137 00138 if (strategy) { 00139 strategy->send(element, relink); 00140 } else { 00141 element->data_dropped(true); 00142 } 00143 }
ACE_INLINE TransportSendListener_rch OpenDDS::DCPS::DataLink::send_listener_for | ( | const RepoId & | pub_id | ) | const [private] |
Definition at line 343 of file DataLink.inl.
References send_listeners_.
Referenced by clear_associations().
00344 { 00345 // pub_map_ (and send_listeners_) are already locked when entering this 00346 // private method. 00347 IdToSendListenerMap::const_iterator found = 00348 this->send_listeners_.find(pub_id); 00349 if (found == this->send_listeners_.end()) { 00350 return TransportSendListener_rch(); 00351 } 00352 return found->second.lock(); 00353 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_start | ( | ) |
Called by the TransportClient objects that reference this DataLink. Used by the TransportClient to send a sample, or to send a control message. These functions either give the request to the PerThreadConnectionSendTask when thread_per_connection configuration is true or just simply delegate to the send strategy.
Definition at line 68 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_START, send_start_i(), and thr_per_con_send_task_.
Referenced by send_control().
00069 { 00070 DBG_ENTRY_LVL("DataLink","send_start",6); 00071 00072 if (this->thr_per_con_send_task_ != 0) { 00073 this->thr_per_con_send_task_->add_request(SEND_START); 00074 00075 } else 00076 this->send_start_i(); 00077 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_start_i | ( | ) | [protected] |
The implementation of the functions that accomplish the sample or control message delivery. IThey just simply delegate to the send strategy.
Definition at line 80 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_start().
00081 { 00082 DBG_ENTRY_LVL("DataLink","send_start_i",6); 00083 // This one is easy. Simply delegate to our TransportSendStrategy 00084 // data member. 00085 00086 TransportSendStrategy_rch strategy; 00087 { 00088 GuardType guard(this->strategy_lock_); 00089 00090 strategy = this->send_strategy_; 00091 } 00092 00093 if (!strategy.is_nil()) { 00094 strategy->send_start(); 00095 } 00096 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop | ( | RepoId | repoId | ) |
Definition at line 146 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_STOP, send_stop_i(), and thr_per_con_send_task_.
Referenced by send_control().
00147 { 00148 DBG_ENTRY_LVL("DataLink","send_stop",6); 00149 00150 if (this->thr_per_con_send_task_ != 0) { 00151 this->thr_per_con_send_task_->add_request(SEND_STOP); 00152 00153 } else 00154 this->send_stop_i(repoId); 00155 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop_i | ( | RepoId | repoId | ) | [protected] |
Definition at line 158 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_stop().
00159 { 00160 DBG_ENTRY_LVL("DataLink","send_stop_i",6); 00161 // This one is easy. Simply delegate to our TransportSendStrategy 00162 // data member. 00163 00164 TransportSendStrategy_rch strategy; 00165 { 00166 GuardType guard(this->strategy_lock_); 00167 00168 strategy = this->send_strategy_; 00169 } 00170 00171 if (!strategy.is_nil()) { 00172 strategy->send_stop(repoId); 00173 } 00174 }
void OpenDDS::DCPS::DataLink::set_dscp_codepoint | ( | int | cp, | |
ACE_SOCK & | socket | |||
) |
The following IPV6 code was lifted in spirit from the RTCORBA implementation of setting the DiffServ codepoint.
Definition at line 949 of file DataLink.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, defined(), ACE_SOCK::get_local_addr(), ACE_Addr::get_type(), LM_DEBUG, LM_ERROR, and ACE_SOCK::set_option().
Referenced by OpenDDS::DCPS::UdpDataLink::open().
00950 { 00951 /** 00952 * The following IPV6 code was lifted in spirit from the RTCORBA 00953 * implementation of setting the DiffServ codepoint. 00954 */ 00955 int result = 0; 00956 00957 // Shift the code point up to bits, so that we only use the DS field 00958 int tos = cp << 2; 00959 00960 const char* which = "IPV4 TOS"; 00961 #if defined (ACE_HAS_IPV6) 00962 ACE_INET_Addr local_address; 00963 00964 if (socket.get_local_addr(local_address) == -1) { 00965 return; 00966 00967 } else if (local_address.get_type() == AF_INET6) 00968 #if !defined (IPV6_TCLASS) 00969 { 00970 if (DCPS_debug_level > 0) { 00971 ACE_ERROR((LM_ERROR, 00972 ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ") 00973 ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"), 00974 cp)); 00975 } 00976 00977 return; 00978 } 00979 00980 #else /* IPV6_TCLASS */ 00981 { 00982 which = "IPV6 TCLASS"; 00983 result = socket.set_option( 00984 IPPROTO_IPV6, 00985 IPV6_TCLASS, 00986 &tos, 00987 sizeof(tos)); 00988 00989 } else // This is a bit tricky and might be hard to follow... 00990 00991 #endif /* IPV6_TCLASS */ 00992 #endif /* ACE_HAS_IPV6 */ 00993 00994 #ifdef IP_TOS 00995 result = socket.set_option( 00996 IPPROTO_IP, 00997 IP_TOS, 00998 &tos, 00999 sizeof(tos)); 01000 01001 if ((result == -1) && (errno != ENOTSUP) 01002 #ifdef WSAEINVAL 01003 && (errno != WSAEINVAL) 01004 #endif 01005 ) { 01006 #endif // IP_TOS 01007 ACE_DEBUG((LM_DEBUG, 01008 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ") 01009 ACE_TEXT("failed to set the %C codepoint to %d: %m, ") 01010 ACE_TEXT("try running as superuser.\n"), 01011 which, 01012 cp)); 01013 #ifdef IP_TOS 01014 } else if (DCPS_debug_level > 4) { 01015 ACE_DEBUG((LM_DEBUG, 01016 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ") 01017 ACE_TEXT("set %C codepoint to %d.\n"), 01018 which, 01019 cp)); 01020 } 01021 #endif 01022 }
ACE_INLINE void OpenDDS::DCPS::DataLink::set_scheduling_release | ( | bool | scheduling_release | ) |
Definition at line 177 of file DataLink.inl.
References scheduling_release_.
Referenced by cancel_release(), OpenDDS::DCPS::TcpTransport::release_datalink(), and transport_shutdown().
00178 { 00179 this->scheduling_release_ = scheduling_release; 00180 }
ACE_INLINE int OpenDDS::DCPS::DataLink::start | ( | const TransportSendStrategy_rch & | send_strategy, | |
const TransportStrategy_rch & | receive_strategy | |||
) | [protected] |
This is how the subclass "announces" to this DataLink base class that this DataLink has now been "connected" and should start the supplied strategy objects. This start method is also going to keep a "copy" of the references to the strategy objects. Also note that it is acceptable to pass-in a NULL (0) TransportReceiveStrategy*, but it is assumed that the TransportSendStrategy* argument is not NULL.
If the start() method fails to start either strategy, then a -1 is returned. Otherwise, a 0 is returned. In the failure case, if one of the strategy objects was started successfully, then it will be stopped before the start() method returns -1.
Definition at line 238 of file DataLink.inl.
References DBG_ENTRY_LVL, invoke_on_start_callbacks(), OpenDDS::DCPS::RcHandle< T >::is_nil(), receive_strategy_, send_strategy_, started_, and strategy_lock_.
Referenced by OpenDDS::DCPS::TcpDataLink::connect(), OpenDDS::DCPS::MulticastDataLink::join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ShmemDataLink::open(), and OpenDDS::DCPS::RtpsUdpDataLink::open().
00240 { 00241 DBG_ENTRY_LVL("DataLink","start",6); 00242 00243 // We assume that the send_strategy is not NULL, but the receive_strategy 00244 // is allowed to be NULL. 00245 00246 // Attempt to start the strategies, and if there is a start() failure, 00247 // make sure to stop() any strategy that was already start()'ed. 00248 if (send_strategy->start() != 0) { 00249 // Failed to start the TransportSendStrategy. 00250 invoke_on_start_callbacks(false); 00251 return -1; 00252 } 00253 00254 if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) { 00255 // Failed to start the TransportReceiveStrategy. 00256 00257 // Remember to stop() the TransportSendStrategy since we did start it, 00258 // and now need to "undo" that action. 00259 send_strategy->stop(); 00260 invoke_on_start_callbacks(false); 00261 return -1; 00262 } 00263 00264 // We started both strategy objects. Save them to data members since 00265 // we will now take ownership of them. 00266 { 00267 GuardType guard(this->strategy_lock_); 00268 00269 this->send_strategy_ = send_strategy; 00270 this->receive_strategy_ = receive_strategy; 00271 } 00272 invoke_on_start_callbacks(true); 00273 { 00274 //catch any associations added during initial invoke_on_start_callbacks 00275 //only after first use_datalink has resolved does datalink's state truly 00276 //change to started, thus can't let pending associations proceed normally yet 00277 GuardType guard(this->strategy_lock_); 00278 this->started_ = true; 00279 } 00280 //Now state transitioned to started so no new on_start_callbacks will be added 00281 //so resolve any added during transition to started. 00282 invoke_on_start_callbacks(true); 00283 return 0; 00284 }
void OpenDDS::DCPS::DataLink::stop | ( | void | ) |
The stop method is used to stop the DataLink prior to shutdown.
Definition at line 222 of file DataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::is_nil(), pre_stop_i(), receive_strategy_, OpenDDS::DCPS::RcHandle< T >::reset(), scheduled_to_stop_at_, send_strategy_, stop_i(), stopped_, strategy_lock_, and ACE_Time_Value::zero.
Referenced by handle_exception(), handle_timeout(), OpenDDS::DCPS::UdpTransport::release_datalink(), OpenDDS::DCPS::ShmemTransport::release_datalink(), and transport_shutdown().
00223 { 00224 this->pre_stop_i(); 00225 00226 TransportSendStrategy_rch send_strategy; 00227 TransportStrategy_rch recv_strategy; 00228 00229 { 00230 GuardType guard(this->strategy_lock_); 00231 00232 if (this->stopped_) return; 00233 00234 send_strategy = this->send_strategy_; 00235 this->send_strategy_.reset(); 00236 00237 recv_strategy = this->receive_strategy_; 00238 this->receive_strategy_.reset(); 00239 } 00240 00241 if (!send_strategy.is_nil()) { 00242 send_strategy->stop(); 00243 } 00244 00245 if (!recv_strategy.is_nil()) { 00246 recv_strategy->stop(); 00247 } 00248 00249 this->stop_i(); 00250 this->stopped_ = true; 00251 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00252 }
void OpenDDS::DCPS::DataLink::stop_i | ( | ) | [protected, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, OpenDDS::DCPS::TcpDataLink, and OpenDDS::DCPS::UdpDataLink.
Definition at line 470 of file DataLink.cpp.
References DBG_ENTRY_LVL.
Referenced by stop().
00471 { 00472 DBG_ENTRY_LVL("DataLink", "stop_i", 6); 00473 }
GUIDSeq * OpenDDS::DCPS::DataLink::target_intersection | ( | const RepoId & | pub_id, | |
const GUIDSeq & | in, | |||
size_t & | n_subs | |||
) |
For a given publication "pub_id", store the total number of corresponding subscriptions in "n_subs" and given a set of subscriptions (the "in" sequence), return the subset of the input set "in" which are targets of this DataLink (see is_target()).
Definition at line 862 of file DataLink.cpp.
References assoc_by_local_, len, pub_sub_maps_lock_, and OpenDDS::DCPS::push_back().
00864 { 00865 GUIDSeq_var res; 00866 GuardType guard(this->pub_sub_maps_lock_); 00867 AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id); 00868 00869 if (iter != assoc_by_local_.end()) { 00870 n_subs = iter->second.size(); 00871 const CORBA::ULong len = in.length(); 00872 00873 for (CORBA::ULong i(0); i < len; ++i) { 00874 if (iter->second.count(in[i])) { 00875 if (res.ptr() == 0) { 00876 res = new GUIDSeq; 00877 } 00878 00879 push_back(res.inout(), in[i]); 00880 } 00881 } 00882 } 00883 00884 return res._retn(); 00885 }
ACE_INLINE void OpenDDS::DCPS::DataLink::terminate_send | ( | ) |
Definition at line 301 of file DataLink.inl.
References send_strategy_.
00302 { 00303 this->send_strategy_->terminate_send(false); 00304 }
ACE_INLINE Priority OpenDDS::DCPS::DataLink::transport_priority | ( | ) | const |
Definition at line 28 of file DataLink.inl.
References transport_priority_.
00029 { 00030 return this->transport_priority_; 00031 }
ACE_INLINE Priority & OpenDDS::DCPS::DataLink::transport_priority | ( | ) |
Accessors for the TRANSPORT_PRIORITY value associated with this link.
Definition at line 21 of file DataLink.inl.
References transport_priority_.
Referenced by OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ThreadPerConnectionSendTask::open(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00022 { 00023 return this->transport_priority_; 00024 }
void OpenDDS::DCPS::DataLink::transport_shutdown | ( | ) |
Our TransportImpl will inform us if it is being shutdown() by calling this method.
Definition at line 674 of file DataLink.cpp.
References ACE_Reactor_Timer_Interface::cancel_timer(), DBG_ENTRY_LVL, impl_, ACE_Event_Handler::reactor(), scheduled_to_stop_at_, set_scheduling_release(), stop(), OpenDDS::DCPS::TransportImpl::timer(), and ACE_Time_Value::zero.
00675 { 00676 DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6); 00677 00678 //this->cancel_release(); 00679 this->set_scheduling_release(false); 00680 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00681 00682 ACE_Reactor_Timer_Interface* reactor = impl_.timer(); 00683 reactor->cancel_timer(this); 00684 00685 this->stop(); 00686 // this->send_listeners_.clear(); 00687 // this->recv_listeners_.clear(); 00688 // Drop our reference to the TransportImpl object 00689 }
friend class DataLinkCleanupTask [friend] |
Definition at line 69 of file DataLink.h.
OpenDDS_Dcps_Export std::ostream& operator<< | ( | std::ostream & | str, | |
const DataLink & | value | |||
) | [friend] |
Convenience function for diagnostic information.
friend class ThreadPerConnectionSendTask [friend] |
Definition at line 290 of file DataLink.h.
Referenced by DataLink().
AssocByLocal OpenDDS::DCPS::DataLink::assoc_by_local_ [private] |
Definition at line 369 of file DataLink.h.
Referenced by handle_timeout(), make_reservation(), notify(), OpenDDS::DCPS::operator<<(), peer_ids(), prepare_release(), release_reservations(), target_intersection(), and ~DataLink().
AssocByRemote OpenDDS::DCPS::DataLink::assoc_by_remote_ [private] |
Definition at line 366 of file DataLink.h.
Referenced by data_received_i(), handle_timeout(), is_target(), make_reservation(), and release_reservations().
AssocByLocal OpenDDS::DCPS::DataLink::assoc_releasing_ [private] |
Definition at line 383 of file DataLink.h.
Referenced by clear_associations(), and prepare_release().
Configurable delay in milliseconds that the datalink should be released after all associations are removed.
Definition at line 402 of file DataLink.h.
Referenced by DataLink(), datalink_release_delay(), and schedule_delayed_release().
Definition at line 407 of file DataLink.h.
Referenced by create_control(), and DataLink().
If default_listener_ is not null and this DataLink receives a sample from a publication GUID that's not in pub_map_, it will call data_received() on the default_listener_.
Definition at line 361 of file DataLink.h.
Referenced by data_received_i(), and default_listener().
ACE_UINT64 OpenDDS::DCPS::DataLink::id_ [private] |
The id for this DataLink.
Definition at line 375 of file DataLink.h.
Referenced by DataLink(), and id().
TransportImpl& OpenDDS::DCPS::DataLink::impl_ [private] |
A (smart) pointer to the TransportImpl that created this DataLink.
Definition at line 372 of file DataLink.h.
Referenced by handle_exception(), handle_timeout(), impl(), notify_reactor(), release_reservations(), release_resources(), and transport_shutdown().
bool OpenDDS::DCPS::DataLink::is_active_ [protected] |
Is pub or sub ?
Definition at line 412 of file DataLink.h.
Referenced by is_active(), and OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().
bool OpenDDS::DCPS::DataLink::is_loopback_ [protected] |
Is remote attached to same transport ?
Definition at line 410 of file DataLink.h.
Referenced by is_loopback(), and OpenDDS::DCPS::UdpDataLink::open().
Allocators for data and message blocks used by transport control samples when send_control is called.
Definition at line 406 of file DataLink.h.
Referenced by create_control(), and DataLink().
LockType OpenDDS::DCPS::DataLink::pub_sub_maps_lock_ [mutable, private] |
Definition at line 363 of file DataLink.h.
Referenced by data_received_i(), default_listener(), is_target(), make_reservation(), notify(), peer_ids(), prepare_release(), release_reservations(), remove_listener(), and target_intersection().
The transport receive strategy object for this DataLink.
Definition at line 288 of file DataLink.h.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::open(), OpenDDS::DCPS::TcpDataLink::receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), OpenDDS::DCPS::TcpDataLink::reconnect(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink(), start(), and stop().
IdToRecvListenerMap OpenDDS::DCPS::DataLink::recv_listeners_ [private] |
Definition at line 356 of file DataLink.h.
Referenced by make_reservation(), notify(), recv_listener_for(), and remove_listener().
Definition at line 348 of file DataLink.h.
Referenced by cancel_release(), handle_exception(), handle_timeout(), schedule_stop(), stop(), and transport_shutdown().
bool OpenDDS::DCPS::DataLink::scheduling_release_ [private] |
Definition at line 388 of file DataLink.h.
Referenced by cancel_release(), handle_exception(), and set_scheduling_release().
IdToSendListenerMap OpenDDS::DCPS::DataLink::send_listeners_ [private] |
Definition at line 352 of file DataLink.h.
Referenced by make_reservation(), notify(), remove_listener(), and send_listener_for().
Listener for TransportSendControlElements created in send_control.
Definition at line 416 of file DataLink.h.
Referenced by send_control().
The transport send strategy object for this DataLink.
Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::ShmemDataLink, and OpenDDS::DCPS::UdpDataLink.
Definition at line 395 of file DataLink.h.
Referenced by OpenDDS::DCPS::TcpDataLink::ack_received(), add_on_start_callback(), make_reservation(), OpenDDS::DCPS::RtpsUdpDataLink::open(), OpenDDS::DCPS::TcpDataLink::reconnect(), remove_all_msgs(), remove_sample(), resume_send(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink(), schedule_delayed_release(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), send_i(), send_start_i(), send_stop_i(), OpenDDS::DCPS::TcpDataLink::send_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), start(), stop(), and terminate_send().
bool OpenDDS::DCPS::DataLink::started_ [protected] |
Definition at line 413 of file DataLink.h.
Referenced by add_on_start_callback(), and start().
bool OpenDDS::DCPS::DataLink::stopped_ [private] |
A boolean indicating if the DataLink has been stopped. This value is protected by the strategy_lock_.
Definition at line 347 of file DataLink.h.
Referenced by cancel_release(), schedule_stop(), and stop().
LockType OpenDDS::DCPS::DataLink::strategy_lock_ [protected] |
Definition at line 397 of file DataLink.h.
Referenced by add_on_start_callback(), invoke_on_start_callbacks(), make_reservation(), OpenDDS::DCPS::TcpDataLink::reconnect(), remove_all_msgs(), remove_on_start_callback(), remove_sample(), send_i(), send_start_i(), send_stop_i(), start(), and stop().
The task used to do the sending. This ThreadPerConnectionSendTask object is created when the thread_per_connection configuration is true. It only dedicate to this datalink.
Definition at line 380 of file DataLink.h.
Referenced by DataLink(), pre_stop_i(), remove_sample(), send(), send_start(), send_stop(), and ~DataLink().
TRANSPORT_PRIORITY value associated with the link.
Definition at line 386 of file DataLink.h.
Referenced by transport_priority().