#include <DataLink.h>
Inheritance diagram for OpenDDS::DCPS::DataLink:
Public Types | |
typedef std::pair< TransportClient *, RepoId > | OnStartCallback |
DISCONNECTED | |
RECONNECTED | |
LOST | |
enum | ConnectionNotice { DISCONNECTED, RECONNECTED, LOST } |
Public Member Functions | |
DataLink (TransportImpl *impl, Priority priority, bool is_loopback, bool is_active) | |
Only called by our TransportImpl object. | |
virtual | ~DataLink () |
int | handle_exception (ACE_HANDLE) |
void | schedule_stop (ACE_Time_Value &schedule_to_stop_at) |
void | stop () |
The stop method is used to stop the DataLink prior to shutdown. | |
void | resume_send () |
int | make_reservation (const RepoId &remote_subscription_id, const RepoId &local_publication_id, TransportSendListener *send_listener) |
int | make_reservation (const RepoId &remote_publication_id, const RepoId &local_subcription_id, TransportReceiveListener *receive_listener) |
void | release_reservations (RepoId remote_id, RepoId local_id, DataLinkSetMap &released_locals) |
void | schedule_delayed_release () |
const ACE_Time_Value & | datalink_release_delay () const |
void | remove_listener (const RepoId &local_id) |
void | send_start () |
void | send (TransportQueueElement *element) |
void | send_stop (RepoId repoId) |
RemoveResult | remove_sample (const DataSampleElement *sample) |
void | remove_all_msgs (RepoId pub_id) |
int | data_received (ReceivedDataSample &sample, const RepoId &readerId=GUID_UNKNOWN) |
void | data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl) |
DataLinkIdType | id () const |
Obtain a unique identifier for this DataLink object. | |
void | transport_shutdown () |
void | notify (ConnectionNotice notice) |
void | notify_connection_deleted () |
virtual bool | issues_on_deleted_callback () const |
virtual void | pre_stop_i () |
bool | release_resources () |
void | terminate_send () |
bool | is_target (const RepoId &remote_sub_id) |
void | clear_associations () |
int | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
int | handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) |
void | set_dscp_codepoint (int cp, ACE_SOCK &socket) |
Priority & | transport_priority () |
Priority | transport_priority () const |
bool & | is_loopback () |
bool | is_loopback () const |
bool & | is_active () |
bool | is_active () const |
bool | cancel_release () |
ACE_Message_Block * | create_control (char submessage_id, DataSampleHeader &header, ACE_Message_Block *data) |
SendControlStatus | send_control (const DataSampleHeader &header, ACE_Message_Block *data) |
GUIDSeq * | target_intersection (const RepoId &pub_id, const GUIDSeq &in, size_t &n_subs) |
TransportImpl_rch | impl () const |
void | default_listener (TransportReceiveListener *trl) |
TransportReceiveListener * | default_listener () const |
bool | add_on_start_callback (TransportClient *client, const RepoId &remote) |
void | remove_on_start_callback (TransportClient *client, const RepoId &remote) |
void | invoke_on_start_callbacks (bool success) |
void | set_scheduling_release (bool scheduling_release) |
virtual void | send_final_acks (const RepoId &readerid) |
Protected Types | |
typedef ACE_Guard< LockType > | GuardType |
Protected Member Functions | |
int | start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy) |
virtual void | stop_i () |
void | send_start_i () |
virtual void | send_i (TransportQueueElement *element, bool relink=true) |
void | send_stop_i (RepoId repoId) |
GUIDSeq * | peer_ids (const RepoId &local_id) const |
OPENDDS_VECTOR (OnStartCallback) on_start_callbacks_ | |
Static Protected Member Functions | |
static ACE_UINT64 | get_next_datalink_id () |
Used to provide unique Ids to all DataLink methods. | |
Protected Attributes | |
TransportStrategy_rch | receive_strategy_ |
The transport receive strategy object for this DataLink. | |
TransportSendStrategy_rch | send_strategy_ |
The transport send strategy object for this DataLink. | |
LockType | strategy_lock_ |
ACE_Time_Value | datalink_release_delay_ |
TransportSendControlElementAllocator * | send_control_allocator_ |
MessageBlockAllocator * | mb_allocator_ |
DataBlockAllocator * | db_allocator_ |
bool | is_loopback_ |
Is remote attached to same transport ? | |
bool | is_active_ |
Is pub or sub ? | |
bool | started_ |
SendResponseListener | send_response_listener_ |
Listener for TransportSendControlElements created in send_control. | |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
Private Member Functions | |
const char * | connection_notice_as_str (ConnectionNotice notice) |
Helper function to output the enum as a string to help debugging. | |
TransportSendListener * | send_listener_for (const RepoId &pub_id) const |
TransportReceiveListener * | recv_listener_for (const RepoId &sub_id) const |
void | prepare_release () |
virtual TransportQueueElement * | customize_queue_element (TransportQueueElement *element) |
virtual void | release_remote_i (const RepoId &) |
virtual void | release_reservations_i (const RepoId &, const RepoId &) |
void | data_received_i (ReceivedDataSample &sample, const RepoId &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain) |
typedef | OPENDDS_MAP_CMP (RepoId, TransportSendListener *, GUID_tKeyLessThan) IdToSendListenerMap |
Map publication Id value to TransportSendListener. | |
typedef | OPENDDS_MAP_CMP (RepoId, TransportReceiveListener *, GUID_tKeyLessThan) IdToRecvListenerMap |
Map subscription Id value to TransportReceieveListener. | |
typedef | OPENDDS_MAP_CMP (RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote |
typedef | OPENDDS_MAP_CMP (RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal |
Private Attributes | |
bool | stopped_ |
ACE_Time_Value | scheduled_to_stop_at_ |
IdToSendListenerMap | send_listeners_ |
IdToRecvListenerMap | recv_listeners_ |
TransportReceiveListener * | default_listener_ |
LockType | pub_sub_maps_lock_ |
AssocByRemote | assoc_by_remote_ |
AssocByLocal | assoc_by_local_ |
LockType | released_assoc_by_local_lock_ |
AssocByLocal | released_assoc_by_local_ |
TransportImpl_rch | impl_ |
A (smart) pointer to the TransportImpl that created this DataLink. | |
ACE_UINT64 | id_ |
The id for this DataLink. | |
ThreadPerConnectionSendTask * | thr_per_con_send_task_ |
AssocByLocal | assoc_releasing_ |
Priority | transport_priority_ |
TRANSPORT_PRIORITY value associated with the link. | |
bool | scheduling_release_ |
Friends | |
class | DataLinkCleanupTask |
class | ThreadPerConnectionSendTask |
OpenDDS_Dcps_Export std::ostream & | operator<< (std::ostream &str, const DataLink &value) |
Convenience function for diagnostic information. |
Notes about object ownership: 1) Own the send strategy object and receive strategy object. 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection is enabled.
Definition at line 62 of file DataLink.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::DataLink::GuardType [protected] |
Definition at line 389 of file DataLink.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLink::LockType [private] |
Definition at line 331 of file DataLink.h.
typedef std::pair<TransportClient*, RepoId> OpenDDS::DCPS::DataLink::OnStartCallback |
Definition at line 247 of file DataLink.h.
Definition at line 70 of file DataLink.h.
00070 { 00071 DISCONNECTED, 00072 RECONNECTED, 00073 LOST 00074 };
OpenDDS::DCPS::DataLink::DataLink | ( | TransportImpl * | impl, | |
Priority | priority, | |||
bool | is_loopback, | |||
bool | is_active | |||
) |
Only called by our TransportImpl object.
A DataLink object is always created by a TransportImpl object. Thus, the TransportImpl object passed-in here is the object that created this DataLink. The ability to specify a priority for individual links is included for construction so its value can be available for activating any threads.
Definition at line 41 of file DataLink.cpp.
References datalink_release_delay_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_next_datalink_id(), id_, impl(), impl_, mb_allocator_, send_control_allocator_, thr_per_con_send_task_, and ThreadPerConnectionSendTask.
00043 : stopped_(false), 00044 scheduled_to_stop_at_(ACE_Time_Value::zero), 00045 default_listener_(0), 00046 thr_per_con_send_task_(0), 00047 transport_priority_(priority), 00048 scheduling_release_(false), 00049 send_control_allocator_(0), 00050 mb_allocator_(0), 00051 db_allocator_(0), 00052 is_loopback_(is_loopback), 00053 is_active_(is_active), 00054 started_(false), 00055 send_response_listener_("DataLink") 00056 { 00057 DBG_ENTRY_LVL("DataLink", "DataLink", 6); 00058 00059 impl->_add_ref(); 00060 this->impl_ = impl; 00061 00062 datalink_release_delay_.sec(this->impl_->config_->datalink_release_delay_ / 1000); 00063 datalink_release_delay_.usec(this->impl_->config_->datalink_release_delay_ % 1000 * 1000); 00064 00065 id_ = DataLink::get_next_datalink_id(); 00066 00067 if (this->impl_->config_->thread_per_connection_) { 00068 this->thr_per_con_send_task_ = new ThreadPerConnectionSendTask(this); 00069 00070 if (this->thr_per_con_send_task_->open() == -1) { 00071 ACE_ERROR((LM_ERROR, 00072 ACE_TEXT("(%P|%t) DataLink::DataLink: ") 00073 ACE_TEXT("failed to open ThreadPerConnectionSendTask\n"))); 00074 00075 } else if (DCPS_debug_level > 4) { 00076 ACE_DEBUG((LM_DEBUG, 00077 ACE_TEXT("(%P|%t) DataLink::DataLink - ") 00078 ACE_TEXT("started new thread to send data with.\n"))); 00079 } 00080 } 00081 00082 // Initialize transport control sample allocators: 00083 size_t control_chunks = this->impl_->config_->datalink_control_chunks_; 00084 00085 this->send_control_allocator_ = 00086 new TransportSendControlElementAllocator(control_chunks); 00087 00088 this->mb_allocator_ = new MessageBlockAllocator(control_chunks); 00089 this->db_allocator_ = new DataBlockAllocator(control_chunks); 00090 }
OpenDDS::DCPS::DataLink::~DataLink | ( | ) | [virtual] |
Definition at line 92 of file DataLink.cpp.
References assoc_by_local_, OpenDDS::DCPS::ThreadPerConnectionSendTask::close(), db_allocator_, DBG_ENTRY_LVL, mb_allocator_, send_control_allocator_, and thr_per_con_send_task_.
00093 { 00094 DBG_ENTRY_LVL("DataLink", "~DataLink", 6); 00095 00096 if (!assoc_by_local_.empty()) { 00097 ACE_DEBUG((LM_WARNING, 00098 ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ") 00099 ACE_TEXT("link still in use by %d entities when deleted!\n"), 00100 this, assoc_by_local_.size())); 00101 } 00102 00103 delete this->db_allocator_; 00104 delete this->mb_allocator_; 00105 00106 delete this->send_control_allocator_; 00107 00108 if (this->thr_per_con_send_task_ != 0) { 00109 this->thr_per_con_send_task_->close(1); 00110 delete this->thr_per_con_send_task_; 00111 } 00112 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::add_on_start_callback | ( | TransportClient * | client, | |
const RepoId & | remote | |||
) |
Definition at line 285 of file DataLink.inl.
References OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, started_, and strategy_lock_.
00286 { 00287 GuardType guard(strategy_lock_); 00288 00289 if (started_ && !send_strategy_.is_nil()) { 00290 return false; // link already started 00291 } 00292 on_start_callbacks_.push_back(std::make_pair(client, remote)); 00293 00294 return true; 00295 }
bool OpenDDS::DCPS::DataLink::cancel_release | ( | ) |
Definition at line 441 of file DataLink.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, scheduled_to_stop_at_, scheduling_release_, set_scheduling_release(), and stopped_.
00442 { 00443 DBG_ENTRY_LVL("DataLink", "cancel_release", 6); 00444 if (stopped_) { 00445 if (DCPS_debug_level > 0) { 00446 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this)); 00447 } 00448 return false; 00449 } 00450 if (scheduling_release_) { 00451 if (DCPS_debug_level > 0) { 00452 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this)); 00453 } 00454 this->set_scheduling_release(false); 00455 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00456 TransportReactorTask_rch reactor(this->impl_->reactor_task()); 00457 reactor->get_reactor()->notify(this); 00458 } 00459 return true; 00460 }
void OpenDDS::DCPS::DataLink::clear_associations | ( | ) |
This is called by DataLinkCleanupTask thread to remove the associations based on the snapshot in release_resources().
Definition at line 915 of file DataLink.cpp.
References assoc_releasing_, recv_listener_for(), OpenDDS::DCPS::TransportReceiveListener::remove_associations(), send_listener_for(), and OpenDDS::DCPS::set_to_seq().
00916 { 00917 for (AssocByLocal::iterator iter = assoc_releasing_.begin(); 00918 iter != assoc_releasing_.end(); ++iter) { 00919 TransportSendListener* const tsl = send_listener_for(iter->first); 00920 if (tsl) { 00921 ReaderIdSeq sub_ids; 00922 set_to_seq(iter->second, sub_ids); 00923 tsl->remove_associations(sub_ids, false); 00924 continue; 00925 } 00926 TransportReceiveListener* const trl = recv_listener_for(iter->first); 00927 if (trl) { 00928 WriterIdSeq pub_ids; 00929 set_to_seq(iter->second, pub_ids); 00930 trl->remove_associations(pub_ids, false); 00931 } 00932 } 00933 assoc_releasing_.clear(); 00934 }
ACE_INLINE const char * OpenDDS::DCPS::DataLink::connection_notice_as_str | ( | ConnectionNotice | notice | ) | [private] |
Helper function to output the enum as a string to help debugging.
Definition at line 310 of file DataLink.inl.
Referenced by notify().
00311 { 00312 static const char* NoticeStr[] = { "DISCONNECTED", 00313 "RECONNECTED", 00314 "LOST" 00315 }; 00316 00317 return NoticeStr [notice]; 00318 }
ACE_Message_Block * OpenDDS::DCPS::DataLink::create_control | ( | char | submessage_id, | |
DataSampleHeader & | header, | |||
ACE_Message_Block * | data | |||
) |
This allows a subclass to easily create a transport control sample to send via send_control.
Definition at line 469 of file DataLink.cpp.
References db_allocator_, DBG_ENTRY_LVL, header, mb_allocator_, and OpenDDS::DCPS::TRANSPORT_CONTROL.
Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00472 { 00473 DBG_ENTRY_LVL("DataLink", "create_control", 6); 00474 00475 header.byte_order_ = ACE_CDR_BYTE_ORDER; 00476 header.message_id_ = TRANSPORT_CONTROL; 00477 header.submessage_id_ = submessage_id; 00478 header.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 00479 00480 ACE_Message_Block* message; 00481 ACE_NEW_MALLOC_RETURN(message, 00482 static_cast<ACE_Message_Block*>( 00483 this->mb_allocator_->malloc(sizeof(ACE_Message_Block))), 00484 ACE_Message_Block(header.max_marshaled_size(), 00485 ACE_Message_Block::MB_DATA, 00486 data, 00487 0, // data 00488 0, // allocator_strategy 00489 0, // locking_strategy 00490 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00491 ACE_Time_Value::zero, 00492 ACE_Time_Value::max_time, 00493 this->db_allocator_, 00494 this->mb_allocator_), 00495 0); 00496 00497 *message << header; 00498 00499 return message; 00500 }
virtual TransportQueueElement* OpenDDS::DCPS::DataLink::customize_queue_element | ( | TransportQueueElement * | element | ) | [inline, private, virtual] |
Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 316 of file DataLink.h.
Referenced by send().
int OpenDDS::DCPS::DataLink::data_received | ( | ReceivedDataSample & | sample, | |
const RepoId & | readerId = GUID_UNKNOWN | |||
) |
This is called by our TransportReceiveStrategy object when it has received a complete data sample. This method will cause the appropriate TransportReceiveListener objects to be told that data_received(). If readerId is not GUID_UNKNOWN, only the TransportReceiveListener with that ID (if one exists) will receive the data.
Definition at line 527 of file DataLink.cpp.
References data_received_i(), and OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::MulticastDataLink::sample_received().
00529 { 00530 data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED); 00531 return 0; 00532 }
void OpenDDS::DCPS::DataLink::data_received_i | ( | ReceivedDataSample & | sample, | |
const RepoId & | readerId, | |||
const RepoIdSet & | incl_excl, | |||
ReceiveListenerSet::ConstrainReceiveSet | constrain | |||
) | [private] |
Definition at line 541 of file DataLink.cpp.
References assoc_by_remote_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::DataSampleHeader::content_filter_entries_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, default_listener_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED, OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by data_received(), and data_received_include().
00545 { 00546 DBG_ENTRY_LVL("DataLink", "data_received_i", 6); 00547 // Which remote publication sent this message? 00548 const RepoId& publication_id = sample.header_.publication_id_; 00549 00550 // Locate the set of TransportReceiveListeners associated with this 00551 // DataLink that are interested in hearing about any samples received 00552 // from the remote publisher_id. 00553 if (DCPS_debug_level > 9) { 00554 const GuidConverter converter(publication_id); 00555 const GuidConverter reader(readerId); 00556 ACE_DEBUG((LM_DEBUG, 00557 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00558 ACE_TEXT("from publication %C received sample: %C to readerId %C (%s).\n"), 00559 OPENDDS_STRING(converter).c_str(), 00560 to_string(sample.header_).c_str(), 00561 OPENDDS_STRING(reader).c_str(), 00562 constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED")); 00563 } 00564 00565 if (Transport_debug_level > 9) { 00566 const GuidConverter converter(publication_id); 00567 ACE_DEBUG((LM_DEBUG, 00568 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00569 ACE_TEXT("from publication %C received sample: %C.\n"), 00570 OPENDDS_STRING(converter).c_str(), 00571 to_string(sample.header_).c_str())); 00572 } 00573 00574 ReceiveListenerSet_rch listener_set; 00575 { 00576 GuardType guard(this->pub_sub_maps_lock_); 00577 AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id); 00578 if (iter != assoc_by_remote_.end()) 00579 listener_set = iter->second; 00580 00581 if (listener_set.is_nil() && this->default_listener_) { 00582 this->default_listener_->data_received(sample); 00583 return; 00584 } 00585 } 00586 00587 if (listener_set.is_nil()) { 00588 // Nobody has any interest in this message. Drop it on the floor. 00589 if (Transport_debug_level > 4) { 00590 const GuidConverter converter(publication_id); 00591 ACE_DEBUG((LM_DEBUG, 00592 ACE_TEXT("(%P|%t) DataLink::data_received_i: ") 00593 ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"), 00594 OPENDDS_STRING(converter).c_str())); 00595 } 00596 00597 return; 00598 } 00599 00600 if (readerId != GUID_UNKNOWN) { 00601 listener_set->data_received(sample, readerId); 00602 return; 00603 } 00604 00605 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00606 00607 if (sample.header_.content_filter_ 00608 && sample.header_.content_filter_entries_.length()) { 00609 ReceiveListenerSet subset(*listener_set.in()); 00610 subset.remove_all(sample.header_.content_filter_entries_); 00611 subset.data_received(sample, incl_excl, constrain); 00612 00613 } else { 00614 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00615 00616 if (DCPS_debug_level > 9) { 00617 // Just get the set to do our dirty work by having it iterate over its 00618 // collection of TransportReceiveListeners, and invoke the data_received() 00619 // method on each one. 00620 OPENDDS_STRING included_ids; 00621 bool first = true; 00622 RepoIdSet::const_iterator iter = incl_excl.begin(); 00623 while(iter != incl_excl.end()) { 00624 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter)); 00625 first = false; 00626 ++iter; 00627 } 00628 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %s ids:%C\n", 00629 constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str())); 00630 } 00631 listener_set->data_received(sample, incl_excl, constrain); 00632 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00633 } 00634 00635 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00636 }
void OpenDDS::DCPS::DataLink::data_received_include | ( | ReceivedDataSample & | sample, | |
const RepoIdSet & | incl | |||
) |
Varation of data_received() that allows for excluding a subset of readers by specifying which readers specifically should receive. Any reader ID that does not appear in the include set will be skipped.
Definition at line 535 of file DataLink.cpp.
References data_received_i(), OpenDDS::DCPS::GUID_UNKNOWN, and OpenDDS::DCPS::ReceiveListenerSet::SET_INCLUDED.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample().
00536 { 00537 data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED); 00538 }
ACE_INLINE const ACE_Time_Value & OpenDDS::DCPS::DataLink::datalink_release_delay | ( | ) | const |
Definition at line 68 of file DataLink.inl.
References datalink_release_delay_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00069 { 00070 return this->datalink_release_delay_; 00071 }
ACE_INLINE TransportReceiveListener * OpenDDS::DCPS::DataLink::default_listener | ( | ) | const |
Definition at line 398 of file DataLink.inl.
References default_listener_.
00399 { 00400 GuardType guard(this->pub_sub_maps_lock_); 00401 return this->default_listener_; 00402 }
ACE_INLINE void OpenDDS::DCPS::DataLink::default_listener | ( | TransportReceiveListener * | trl | ) |
Definition at line 390 of file DataLink.inl.
References default_listener_.
00391 { 00392 GuardType guard(this->pub_sub_maps_lock_); 00393 this->default_listener_ = trl; 00394 }
ACE_UINT64 OpenDDS::DCPS::DataLink::get_next_datalink_id | ( | ) | [static, protected] |
Used to provide unique Ids to all DataLink methods.
Definition at line 640 of file DataLink.cpp.
References id().
Referenced by DataLink().
00641 { 00642 static ACE_UINT64 next_id = 0; 00643 static LockType lock; 00644 00645 ACE_UINT64 id; 00646 { 00647 GuardType guard(lock); 00648 id = next_id++; 00649 00650 if (0 == next_id) { 00651 ACE_ERROR((LM_ERROR, 00652 ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ") 00653 ACE_TEXT("has rolled over and is reusing ids!\n"))); 00654 } 00655 } 00656 00657 return id; 00658 }
int OpenDDS::DCPS::DataLink::handle_close | ( | ACE_HANDLE | h, | |
ACE_Reactor_Mask | m | |||
) |
Definition at line 952 of file DataLink.cpp.
References handle_timeout().
00953 { 00954 if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) { 00955 // Reactor is shutting down with this timer still pending. 00956 // Take the same cleanup actions as if the timeout had expired. 00957 handle_timeout(ACE_Time_Value::zero, 0); 00958 } 00959 00960 return 0; 00961 }
int OpenDDS::DCPS::DataLink::handle_exception | ( | ACE_HANDLE | ) |
Definition at line 142 of file DataLink.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), OpenDDS::DCPS::DCPS_debug_level, handle_timeout(), impl_, scheduled_to_stop_at_, and stop().
00143 { 00144 if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) { 00145 if (DCPS_debug_level > 0) { 00146 ACE_DEBUG((LM_DEBUG, 00147 ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n"))); 00148 } 00149 if (this->impl_ != 0) { 00150 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer(); 00151 if (reactor->cancel_timer(this) > 0) { 00152 if (DCPS_debug_level > 0) { 00153 ACE_DEBUG((LM_DEBUG, 00154 ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n"))); 00155 } 00156 } 00157 } else { 00158 if (DCPS_debug_level > 0) { 00159 ACE_DEBUG((LM_DEBUG, 00160 ACE_TEXT("(%P|%t) DataLink::handle_exception() - impl_ == 0\n"))); 00161 } 00162 } 00163 this->_remove_ref(); 00164 return 0; 00165 } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) { 00166 if (this->scheduling_release_) { 00167 if (DCPS_debug_level > 0) { 00168 ACE_DEBUG((LM_DEBUG, 00169 ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n"))); 00170 } 00171 this->handle_timeout(ACE_Time_Value::zero, 0); 00172 return 0; 00173 } 00174 if (DCPS_debug_level > 0) { 00175 ACE_DEBUG((LM_DEBUG, 00176 ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n"))); 00177 } 00178 this->stop(); 00179 this->_remove_ref(); 00180 return 0; 00181 } else /* SCHEDULE TO STOP IN THE FUTURE*/ { 00182 if (DCPS_debug_level > 0) { 00183 ACE_DEBUG((LM_DEBUG, 00184 ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n"))); 00185 } 00186 this->_add_ref(); 00187 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer(); 00188 ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday(); 00189 reactor->schedule_timer(this, 0, future_release_time); 00190 } 00191 return 0; 00192 }
int OpenDDS::DCPS::DataLink::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
Definition at line 937 of file DataLink.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), assoc_by_local_, assoc_by_remote_, impl_, and VDBG_LVL.
Referenced by handle_close(), and handle_exception().
00938 { 00939 if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) { 00940 VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4); 00941 this->impl_->unbind_link(this); 00942 00943 if (assoc_by_remote_.empty() && assoc_by_local_.empty()) { 00944 this->stop(); 00945 } 00946 } 00947 this->_remove_ref(); 00948 return 0; 00949 }
ACE_INLINE DataLinkIdType OpenDDS::DCPS::DataLink::id | ( | ) | const |
Obtain a unique identifier for this DataLink object.
Definition at line 228 of file DataLink.inl.
References DBG_ENTRY_LVL, and id_.
Referenced by get_next_datalink_id().
00229 { 00230 DBG_ENTRY_LVL("DataLink","id",6); 00231 return id_; 00232 }
TransportImpl_rch OpenDDS::DCPS::DataLink::impl | ( | ) | const |
Definition at line 115 of file DataLink.cpp.
References impl_.
Referenced by OpenDDS::DCPS::UdpDataLink::control_received(), DataLink(), OpenDDS::DCPS::ShmemDataLink::local_address(), OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::ShmemDataLink::signal_semaphore().
00116 { 00117 return impl_; 00118 }
void OpenDDS::DCPS::DataLink::invoke_on_start_callbacks | ( | bool | success | ) |
Definition at line 121 of file DataLink.cpp.
References strategy_lock_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received(), and start().
00122 { 00123 const DataLink_rch link(success ? this : 0, false); 00124 00125 while (true) { 00126 GuardType guard(strategy_lock_); 00127 00128 if (on_start_callbacks_.empty()) { 00129 break; 00130 } 00131 00132 OnStartCallback last_callback = on_start_callbacks_.back(); 00133 on_start_callbacks_.pop_back(); 00134 00135 guard.release(); 00136 last_callback.first->use_datalink(last_callback.second, link); 00137 } 00138 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::is_active | ( | ) | const |
Definition at line 62 of file DataLink.inl.
References is_active_.
00063 { 00064 return this->is_active_; 00065 }
ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_active | ( | ) |
Definition at line 55 of file DataLink.inl.
References is_active_.
Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::TcpTransport::release_datalink(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00056 { 00057 return this->is_active_; 00058 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::is_loopback | ( | ) | const |
Definition at line 48 of file DataLink.inl.
References is_loopback_.
00049 { 00050 return this->is_loopback_; 00051 }
ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_loopback | ( | ) |
Definition at line 41 of file DataLink.inl.
References is_loopback_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00042 { 00043 return this->is_loopback_; 00044 }
bool OpenDDS::DCPS::DataLink::is_target | ( | const RepoId & | remote_sub_id | ) |
This is called on publisher side to see if this link communicates with the provided sub.
Definition at line 869 of file DataLink.cpp.
References assoc_by_remote_.
00870 { 00871 GuardType guard(this->pub_sub_maps_lock_); 00872 return assoc_by_remote_.count(remote_sub_id); 00873 }
ACE_INLINE bool OpenDDS::DCPS::DataLink::issues_on_deleted_callback | ( | ) | const [virtual] |
int OpenDDS::DCPS::DataLink::make_reservation | ( | const RepoId & | remote_publication_id, | |
const RepoId & | local_subcription_id, | |||
TransportReceiveListener * | receive_listener | |||
) |
Only called by our TransportImpl object.
Return Codes: 0 means successful reservation made. -1 means failure.
Definition at line 296 of file DataLink.cpp.
References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_strategy_, and strategy_lock_.
00299 { 00300 DBG_ENTRY_LVL("DataLink", "make_reservation", 6); 00301 00302 if (DCPS_debug_level > 9) { 00303 GuidConverter local(local_subscription_id), remote(remote_publication_id); 00304 ACE_DEBUG((LM_DEBUG, 00305 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ") 00306 ACE_TEXT("creating association local subscription %C ") 00307 ACE_TEXT("<--> with remote publication %C.\n"), 00308 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str())); 00309 } 00310 00311 { 00312 GuardType guard(strategy_lock_); 00313 00314 if (!send_strategy_.is_nil()) { 00315 send_strategy_->link_released(false); 00316 } 00317 } 00318 { 00319 GuardType guard(pub_sub_maps_lock_); 00320 00321 assoc_by_local_[local_subscription_id].insert(remote_publication_id); 00322 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id]; 00323 00324 if (rls.is_nil()) 00325 rls = new ReceiveListenerSet; 00326 rls->insert(local_subscription_id, receive_listener); 00327 00328 recv_listeners_[local_subscription_id] = receive_listener; 00329 } 00330 return 0; 00331 }
int OpenDDS::DCPS::DataLink::make_reservation | ( | const RepoId & | remote_subscription_id, | |
const RepoId & | local_publication_id, | |||
TransportSendListener * | send_listener | |||
) |
Only called by our TransportImpl object.
Return Codes: 0 means successful reservation made. -1 means failure.
Definition at line 257 of file DataLink.cpp.
References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pub_sub_maps_lock_, send_listeners_, send_strategy_, and strategy_lock_.
00260 { 00261 DBG_ENTRY_LVL("DataLink", "make_reservation", 6); 00262 00263 if (DCPS_debug_level > 9) { 00264 GuidConverter local(local_publication_id), remote(remote_subscription_id); 00265 ACE_DEBUG((LM_DEBUG, 00266 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ") 00267 ACE_TEXT("creating association local publication %C ") 00268 ACE_TEXT("<--> with remote subscription %C.\n"), 00269 OPENDDS_STRING(local).c_str(), 00270 OPENDDS_STRING(remote).c_str())); 00271 } 00272 00273 { 00274 GuardType guard(strategy_lock_); 00275 00276 if (!send_strategy_.is_nil()) { 00277 send_strategy_->link_released(false); 00278 } 00279 } 00280 { 00281 GuardType guard(pub_sub_maps_lock_); 00282 00283 assoc_by_local_[local_publication_id].insert(remote_subscription_id); 00284 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id]; 00285 00286 if (rls.is_nil()) 00287 rls = new ReceiveListenerSet; 00288 rls->insert(local_publication_id, 0); 00289 00290 send_listeners_[local_publication_id] = send_listener; 00291 } 00292 return 0; 00293 }
void OpenDDS::DCPS::DataLink::notify | ( | ConnectionNotice | notice | ) |
Notify the datawriters and datareaders that the connection is disconnected, lost, or reconnected. The datareader/datawriter will notify the corresponding listener.
Definition at line 682 of file DataLink.cpp.
References assoc_by_local_, connection_notice_as_str(), DBG_ENTRY_LVL, DISCONNECTED, LOST, OPENDDS_STRING, RECONNECTED, recv_listeners_, send_listeners_, OpenDDS::DCPS::set_to_seq(), OpenDDS::DCPS::Transport_debug_level, and VDBG.
00683 { 00684 DBG_ENTRY_LVL("DataLink", "notify", 6); 00685 00686 VDBG((LM_DEBUG, 00687 ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"), 00688 this, 00689 connection_notice_as_str(notice))); 00690 00691 GuardType guard(this->pub_sub_maps_lock_); 00692 00693 // Notify the datawriters 00694 // the lost publications due to a connection problem. 00695 for (IdToSendListenerMap::iterator itr = send_listeners_.begin(); 00696 itr != send_listeners_.end(); ++itr) { 00697 00698 TransportSendListener* tsl = itr->second; 00699 00700 if (tsl != 0) { 00701 if (Transport_debug_level > 0) { 00702 GuidConverter converter(itr->first); 00703 ACE_DEBUG((LM_DEBUG, 00704 ACE_TEXT("(%P|%t) DataLink::notify: ") 00705 ACE_TEXT("notify pub %C %C.\n"), 00706 OPENDDS_STRING(converter).c_str(), 00707 connection_notice_as_str(notice))); 00708 } 00709 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first); 00710 if (local_it == assoc_by_local_.end()) { 00711 if (Transport_debug_level) { 00712 GuidConverter converter(itr->first); 00713 ACE_DEBUG((LM_DEBUG, 00714 ACE_TEXT("(%P|%t) DataLink::notify: ") 00715 ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"), 00716 OPENDDS_STRING(converter).c_str(), 00717 connection_notice_as_str(notice))); 00718 } 00719 break; 00720 } 00721 const RepoIdSet& rids = local_it->second; 00722 00723 ReaderIdSeq subids; 00724 set_to_seq(rids, subids); 00725 00726 switch (notice) { 00727 case DISCONNECTED: 00728 tsl->notify_publication_disconnected(subids); 00729 break; 00730 00731 case RECONNECTED: 00732 tsl->notify_publication_reconnected(subids); 00733 break; 00734 00735 case LOST: 00736 tsl->notify_publication_lost(subids); 00737 break; 00738 00739 default: 00740 ACE_ERROR((LM_ERROR, 00741 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ") 00742 ACE_TEXT("unknown notice to TransportSendListener\n"))); 00743 break; 00744 } 00745 00746 } else { 00747 if (Transport_debug_level > 0) { 00748 GuidConverter converter(itr->first); 00749 ACE_DEBUG((LM_DEBUG, 00750 ACE_TEXT("(%P|%t) DataLink::notify: ") 00751 ACE_TEXT("not notify pub %C %C \n"), 00752 OPENDDS_STRING(converter).c_str(), 00753 connection_notice_as_str(notice))); 00754 } 00755 } 00756 00757 } 00758 00759 // Notify the datareaders registered with TransportImpl 00760 // the lost subscriptions due to a connection problem. 00761 for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin(); 00762 itr != recv_listeners_.end(); ++itr) { 00763 00764 TransportReceiveListener* trl = itr->second; 00765 00766 if (trl != 0) { 00767 if (Transport_debug_level > 0) { 00768 GuidConverter converter(itr->first); 00769 ACE_DEBUG((LM_DEBUG, 00770 ACE_TEXT("(%P|%t) DataLink::notify: ") 00771 ACE_TEXT("notify sub %C %C.\n"), 00772 OPENDDS_STRING(converter).c_str(), 00773 connection_notice_as_str(notice))); 00774 } 00775 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first); 00776 if (local_it == assoc_by_local_.end()) { 00777 if (Transport_debug_level) { 00778 GuidConverter converter(itr->first); 00779 ACE_DEBUG((LM_DEBUG, 00780 ACE_TEXT("(%P|%t) DataLink::notify: ") 00781 ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"), 00782 OPENDDS_STRING(converter).c_str(), 00783 connection_notice_as_str(notice))); 00784 } 00785 break; 00786 } 00787 const RepoIdSet& rids = local_it->second; 00788 00789 WriterIdSeq pubids; 00790 set_to_seq(rids, pubids); 00791 00792 switch (notice) { 00793 case DISCONNECTED: 00794 trl->notify_subscription_disconnected(pubids); 00795 break; 00796 00797 case RECONNECTED: 00798 trl->notify_subscription_reconnected(pubids); 00799 break; 00800 00801 case LOST: 00802 trl->notify_subscription_lost(pubids); 00803 break; 00804 00805 default: 00806 ACE_ERROR((LM_ERROR, 00807 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ") 00808 ACE_TEXT("unknown notice to datareader.\n"))); 00809 break; 00810 } 00811 00812 } else { 00813 if (Transport_debug_level > 0) { 00814 GuidConverter converter(itr->first); 00815 ACE_DEBUG((LM_DEBUG, 00816 ACE_TEXT("(%P|%t) DataLink::notify: ") 00817 ACE_TEXT("not notify sub %C subscription lost.\n"), 00818 OPENDDS_STRING(converter).c_str())); 00819 } 00820 00821 } 00822 } 00823 }
void OpenDDS::DCPS::DataLink::notify_connection_deleted | ( | ) |
Definition at line 825 of file DataLink.cpp.
References OpenDDS::DCPS::TransportReceiveListener::notify_connection_deleted(), OpenDDS::DCPS::TransportSendListener::notify_connection_deleted(), recv_listener_for(), released_assoc_by_local_, and send_listener_for().
00826 { 00827 GuardType guard(this->released_assoc_by_local_lock_); 00828 for (AssocByLocal::iterator iter = released_assoc_by_local_.begin(); 00829 iter != released_assoc_by_local_.end(); ++iter) { 00830 TransportSendListener* const tsl = send_listener_for(iter->first); 00831 if (tsl) { 00832 for (RepoIdSet::iterator ris_it = iter->second.begin(); 00833 ris_it != iter->second.end(); ++ris_it) { 00834 tsl->notify_connection_deleted(*ris_it); 00835 } 00836 iter->second.clear(); 00837 continue; 00838 } 00839 TransportReceiveListener* const trl = recv_listener_for(iter->first); 00840 if (trl) { 00841 for (RepoIdSet::iterator ris_it = iter->second.begin(); 00842 ris_it != iter->second.end(); ++ris_it) { 00843 trl->notify_connection_deleted(*ris_it); 00844 } 00845 iter->second.clear(); 00846 } 00847 } 00848 }
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RepoIdSet | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReceiveListenerSet_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
TransportReceiveListener * | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Map subscription Id value to TransportReceieveListener.
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
TransportSendListener * | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Map publication Id value to TransportSendListener.
OpenDDS::DCPS::DataLink::OPENDDS_VECTOR | ( | OnStartCallback | ) | [protected] |
For a given local RepoId (publication or subscription), return the list of remote peer RepoIds (subscriptions or publications) that this link knows about due to make_reservation().
Definition at line 344 of file DataLink.cpp.
References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::set_to_seq().
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::get_locators(), and OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos().
00345 { 00346 GuardType guard(pub_sub_maps_lock_); 00347 00348 const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id); 00349 00350 if (iter == assoc_by_local_.end()) 00351 return 0; 00352 00353 GUIDSeq_var result = new GUIDSeq; 00354 set_to_seq(iter->second, static_cast<GUIDSeq&>(result)); 00355 return result._retn(); 00356 }
void OpenDDS::DCPS::DataLink::pre_stop_i | ( | ) | [virtual] |
Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::TcpDataLink.
Definition at line 851 of file DataLink.cpp.
References OpenDDS::DCPS::ThreadPerConnectionSendTask::close(), and thr_per_con_send_task_.
Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i(), and stop().
00852 { 00853 if (this->thr_per_con_send_task_ != 0) { 00854 this->thr_per_con_send_task_->close(1); 00855 } 00856 }
void OpenDDS::DCPS::DataLink::prepare_release | ( | ) | [private] |
Save current sub and pub association maps for releasing and create empty maps for new associations.
Definition at line 901 of file DataLink.cpp.
References assoc_by_local_, and assoc_releasing_.
Referenced by release_resources().
00902 { 00903 GuardType guard(this->pub_sub_maps_lock_); 00904 00905 if (!assoc_releasing_.empty()) { 00906 ACE_ERROR((LM_ERROR, 00907 ACE_TEXT("(%P|%t) DataLink::prepare_release: ") 00908 ACE_TEXT("already prepared for release.\n"))); 00909 return; 00910 } 00911 00912 assoc_releasing_ = assoc_by_local_; 00913 }
ACE_INLINE TransportReceiveListener * OpenDDS::DCPS::DataLink::recv_listener_for | ( | const RepoId & | sub_id | ) | const [private] |
Definition at line 376 of file DataLink.inl.
References recv_listeners_.
Referenced by clear_associations(), and notify_connection_deleted().
00377 { 00378 // sub_map_ (and recv_listeners_) are already locked when entering this 00379 // private method. 00380 IdToRecvListenerMap::const_iterator found = 00381 this->recv_listeners_.find(sub_id); 00382 if (found == this->recv_listeners_.end()) { 00383 return 0; 00384 } 00385 return found->second; 00386 }
virtual void OpenDDS::DCPS::DataLink::release_remote_i | ( | const RepoId & | ) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::MulticastDataLink, and OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 322 of file DataLink.h.
Referenced by release_reservations().
void OpenDDS::DCPS::DataLink::release_reservations | ( | RepoId | remote_id, | |
RepoId | local_id, | |||
DataLinkSetMap & | released_locals | |||
) |
This will release reservations that were made by one of the make_reservation() methods. All we know is that the supplied RepoId is considered to be a remote id. It could be a remote subscriber or a remote publisher.
Definition at line 366 of file DataLink.cpp.
References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, impl_, OPENDDS_STRING, release_remote_i(), release_reservations_i(), released_assoc_by_local_, and VDBG_LVL.
00368 { 00369 DBG_ENTRY_LVL("DataLink", "release_reservations", 6); 00370 00371 if (DCPS_debug_level > 9) { 00372 GuidConverter local(local_id); 00373 GuidConverter remote(remote_id); 00374 ACE_DEBUG((LM_DEBUG, 00375 ACE_TEXT("(%P|%t) DataLink::release_reservations() - ") 00376 ACE_TEXT("releasing association local: %C ") 00377 ACE_TEXT("<--> with remote %C.\n"), 00378 OPENDDS_STRING(local).c_str(), 00379 OPENDDS_STRING(remote).c_str())); 00380 } 00381 00382 //let the specific class release its reservations 00383 //done this way to prevent deadlock of holding pub_sub_maps_lock_ 00384 //then obtaining a specific class lock in release_reservations_i 00385 //which reverses lock ordering of the active send logic of needing 00386 //the specific class lock before obtaining the over arching DataLink 00387 //pub_sub_maps_lock_ 00388 this->release_reservations_i(remote_id, local_id); 00389 00390 GuardType guard(this->pub_sub_maps_lock_); 00391 00392 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id]; 00393 if (rls->size() == 1) { 00394 assoc_by_remote_.erase(remote_id); 00395 release_remote_i(remote_id); 00396 } else { 00397 rls->remove(local_id); 00398 } 00399 RepoIdSet& ris = assoc_by_local_[local_id]; 00400 if (ris.size() == 1) { 00401 DataLinkSet_rch& links = released_locals[local_id]; 00402 if (links.is_nil()) 00403 links = new DataLinkSet; 00404 links->insert_link(this); 00405 { 00406 GuardType guard(this->released_assoc_by_local_lock_); 00407 released_assoc_by_local_[local_id].insert(remote_id); 00408 } 00409 assoc_by_local_.erase(local_id); 00410 } else { 00411 ris.erase(remote_id); 00412 } 00413 00414 if (assoc_by_local_.empty()) { 00415 VDBG_LVL((LM_DEBUG, 00416 ACE_TEXT("(%P|%t) DataLink::release_reservations: ") 00417 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5); 00418 this->impl_->release_datalink(this); 00419 } 00420 }
virtual void OpenDDS::DCPS::DataLink::release_reservations_i | ( | const RepoId & | , | |
const RepoId & | ||||
) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 323 of file DataLink.h.
Referenced by release_reservations().
bool OpenDDS::DCPS::DataLink::release_resources | ( | ) |
Definition at line 859 of file DataLink.cpp.
References DBG_ENTRY_LVL, impl_, and prepare_release().
00860 { 00861 DBG_ENTRY_LVL("DataLink", "release_resources", 6); 00862 00863 this->prepare_release(); 00864 00865 return impl_->release_link_resources(this); 00866 }
ACE_INLINE void OpenDDS::DCPS::DataLink::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 208 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.
00209 { 00210 DBG_ENTRY_LVL("DataLink","remove_all_msgs",6); 00211 00212 // This one is easy. Simply delegate to our TransportSendStrategy 00213 // data member. 00214 00215 TransportSendStrategy_rch strategy; 00216 { 00217 GuardType guard(this->strategy_lock_); 00218 00219 strategy = this->send_strategy_; 00220 } 00221 00222 if (!strategy.is_nil()) { 00223 strategy->remove_all_msgs(pub_id); 00224 } 00225 }
ACE_INLINE void OpenDDS::DCPS::DataLink::remove_listener | ( | const RepoId & | local_id | ) |
Either send or receive listener for this local_id should be removed from internal DataLink structures so it no longer receives events.
Definition at line 329 of file DataLink.inl.
References OPENDDS_STRING, recv_listeners_, send_listeners_, and OpenDDS::DCPS::Transport_debug_level.
00330 { 00331 GuardType guard(this->pub_sub_maps_lock_); 00332 { 00333 IdToSendListenerMap::iterator pos = this->send_listeners_.find(local_id); 00334 if (pos != this->send_listeners_.end()) { 00335 this->send_listeners_.erase(pos); 00336 if (Transport_debug_level > 5) { 00337 GuidConverter converter(local_id); 00338 ACE_DEBUG((LM_DEBUG, 00339 ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from send_listeners\n"), 00340 OPENDDS_STRING(converter).c_str())); 00341 } 00342 return; 00343 } 00344 } 00345 { 00346 IdToRecvListenerMap::iterator pos = this->recv_listeners_.find(local_id); 00347 if (pos != this->recv_listeners_.end()) { 00348 this->recv_listeners_.erase(pos); 00349 if (Transport_debug_level > 5) { 00350 GuidConverter converter(local_id); 00351 ACE_DEBUG((LM_DEBUG, 00352 ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from recv_listeners\n"), 00353 OPENDDS_STRING(converter).c_str())); 00354 } 00355 return; 00356 } 00357 } 00358 }
ACE_INLINE void OpenDDS::DCPS::DataLink::remove_on_start_callback | ( | TransportClient * | client, | |
const RepoId & | remote | |||
) |
Definition at line 299 of file DataLink.inl.
References OpenDDS::DCPS::remove(), and strategy_lock_.
00300 { 00301 GuardType guard(strategy_lock_); 00302 on_start_callbacks_.erase(std::remove(on_start_callbacks_.begin(), 00303 on_start_callbacks_.end(), 00304 std::make_pair(client, remote)), 00305 on_start_callbacks_.end()); 00306 }
ACE_INLINE RemoveResult OpenDDS::DCPS::DataLink::remove_sample | ( | const DataSampleElement * | sample | ) |
This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).
Definition at line 180 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample(), send_strategy_, thr_per_con_send_task_, and VDBG.
00181 { 00182 DBG_ENTRY_LVL("DataLink", "remove_sample", 6); 00183 00184 if (this->thr_per_con_send_task_ != 0) { 00185 const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample); 00186 if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) { 00187 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00188 "Removed sample from ThreadPerConnection queue.\n")); 00189 return rr; 00190 } 00191 } 00192 00193 TransportSendStrategy_rch strategy; 00194 { 00195 GuardType guard(this->strategy_lock_); 00196 00197 strategy = this->send_strategy_; 00198 } 00199 00200 if (!strategy.is_nil()) { 00201 return strategy->remove_sample(sample); 00202 } 00203 00204 return REMOVE_NOT_FOUND; 00205 }
void OpenDDS::DCPS::DataLink::resume_send | ( | ) |
The resume_send is used in the case of reconnection on the subscriber's side.
Definition at line 250 of file DataLink.cpp.
00251 { 00252 if (!this->send_strategy_->isDirectMode()) 00253 this->send_strategy_->resume_send(); 00254 }
void OpenDDS::DCPS::DataLink::schedule_delayed_release | ( | ) |
Definition at line 423 of file DataLink.cpp.
References datalink_release_delay_, DBG_ENTRY_LVL, schedule_stop(), and VDBG.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink().
00424 { 00425 DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6); 00426 00427 VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this)); 00428 00429 // The samples have to be removed at this point, otherwise the samples 00430 // can not be delivered when new association is added and still use 00431 // this connection/datalink. 00432 if (!this->send_strategy_.is_nil()) { 00433 this->send_strategy_->clear(); 00434 } 00435 00436 ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_; 00437 this->schedule_stop(future_release_time); 00438 }
void OpenDDS::DCPS::DataLink::schedule_stop | ( | ACE_Time_Value & | schedule_to_stop_at | ) |
Definition at line 198 of file DataLink.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::DCPS_debug_level, and scheduled_to_stop_at_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and schedule_delayed_release().
00199 { 00200 if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) { 00201 //Add ref before handing to the reactor 00202 //ref removed in handle_exception or in handle_timeout based on stopping now or delayed 00203 this->_add_ref(); 00204 this->scheduled_to_stop_at_ = schedule_to_stop_at; 00205 TransportReactorTask_rch reactor(this->impl_->reactor_task()); 00206 reactor->get_reactor()->notify(this); 00207 // reactor will invoke our DataLink::handle_exception() 00208 } else { 00209 if (DCPS_debug_level > 0) { 00210 ACE_DEBUG((LM_DEBUG, 00211 ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n"))); 00212 } 00213 } 00214 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send | ( | TransportQueueElement * | element | ) |
Definition at line 105 of file DataLink.inl.
References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), customize_queue_element(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND, send_i(), and thr_per_con_send_task_.
Referenced by send_control().
00106 { 00107 DBG_ENTRY_LVL("DataLink","send",6); 00108 00109 element = this->customize_queue_element(element); 00110 if (!element) { 00111 return; 00112 } 00113 00114 if (this->thr_per_con_send_task_ != 0) { 00115 this->thr_per_con_send_task_->add_request(SEND, element); 00116 00117 } else { 00118 this->send_i(element); 00119 00120 } 00121 }
SendControlStatus OpenDDS::DCPS::DataLink::send_control | ( | const DataSampleHeader & | header, | |
ACE_Message_Block * | data | |||
) |
This allows a subclass to send transport control samples over this DataLink. This is useful for sending transport-specific control messages between one or more endpoints under this DataLink's control.
Definition at line 503 of file DataLink.cpp.
References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, OpenDDS::DCPS::GUID_UNKNOWN, header, send(), send_control_allocator_, OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::SEND_CONTROL_OK, send_response_listener_, send_start(), send_stop(), and OpenDDS::DCPS::SendResponseListener::track_message().
Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().
00504 { 00505 DBG_ENTRY_LVL("DataLink", "send_control", 6); 00506 00507 TransportSendControlElement* const elem = 00508 TransportSendControlElement::alloc(1, // initial_count 00509 GUID_UNKNOWN, &send_response_listener_, 00510 header, message, send_control_allocator_); 00511 if (!elem) return SEND_CONTROL_ERROR; 00512 00513 send_response_listener_.track_message(); 00514 00515 RepoId senderId(header.publication_id_); 00516 send_start(); 00517 send(elem); 00518 send_stop(senderId); 00519 00520 return SEND_CONTROL_OK; 00521 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_final_acks | ( | const RepoId & | readerid | ) | [virtual] |
ACE_INLINE void OpenDDS::DCPS::DataLink::send_i | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.
Definition at line 124 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and OpenDDS::DCPS::RtpsUdpDataLink::send_i().
00125 { 00126 DBG_ENTRY_LVL("DataLink","send_i",6); 00127 // This one is easy. Simply delegate to our TransportSendStrategy 00128 // data member. 00129 00130 TransportSendStrategy_rch strategy; 00131 { 00132 GuardType guard(this->strategy_lock_); 00133 00134 strategy = this->send_strategy_; 00135 } 00136 00137 if (!strategy.is_nil()) { 00138 strategy->send(element, relink); 00139 } 00140 }
ACE_INLINE TransportSendListener * OpenDDS::DCPS::DataLink::send_listener_for | ( | const RepoId & | pub_id | ) | const [private] |
Definition at line 362 of file DataLink.inl.
References send_listeners_.
Referenced by clear_associations(), and notify_connection_deleted().
00363 { 00364 // pub_map_ (and send_listeners_) are already locked when entering this 00365 // private method. 00366 IdToSendListenerMap::const_iterator found = 00367 this->send_listeners_.find(pub_id); 00368 if (found == this->send_listeners_.end()) { 00369 return 0; 00370 } 00371 return found->second; 00372 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_start | ( | ) |
Called by the TransportClient objects that reference this DataLink. Used by the TransportClient to send a sample, or to send a control message. These functions either give the request to the PerThreadConnectionSendTask when thread_per_connection configuration is true or just simply delegate to the send strategy.
Definition at line 74 of file DataLink.inl.
References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_START, send_start_i(), and thr_per_con_send_task_.
Referenced by send_control().
00075 { 00076 DBG_ENTRY_LVL("DataLink","send_start",6); 00077 00078 if (this->thr_per_con_send_task_ != 0) { 00079 this->thr_per_con_send_task_->add_request(SEND_START); 00080 00081 } else 00082 this->send_start_i(); 00083 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_start_i | ( | ) | [protected] |
The implementation of the functions that accomplish the sample or control message delivery. IThey just simply delegate to the send strategy.
Definition at line 86 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_start().
00087 { 00088 DBG_ENTRY_LVL("DataLink","send_start_i",6); 00089 // This one is easy. Simply delegate to our TransportSendStrategy 00090 // data member. 00091 00092 TransportSendStrategy_rch strategy; 00093 { 00094 GuardType guard(this->strategy_lock_); 00095 00096 strategy = this->send_strategy_; 00097 } 00098 00099 if (!strategy.is_nil()) { 00100 strategy->send_start(); 00101 } 00102 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop | ( | RepoId | repoId | ) |
Definition at line 143 of file DataLink.inl.
References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_STOP, send_stop_i(), and thr_per_con_send_task_.
Referenced by send_control().
00144 { 00145 DBG_ENTRY_LVL("DataLink","send_stop",6); 00146 00147 if (this->thr_per_con_send_task_ != 0) { 00148 this->thr_per_con_send_task_->add_request(SEND_STOP); 00149 00150 } else 00151 this->send_stop_i(repoId); 00152 }
ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop_i | ( | RepoId | repoId | ) | [protected] |
Definition at line 155 of file DataLink.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.
Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_stop().
00156 { 00157 DBG_ENTRY_LVL("DataLink","send_stop_i",6); 00158 // This one is easy. Simply delegate to our TransportSendStrategy 00159 // data member. 00160 00161 TransportSendStrategy_rch strategy = 0; 00162 { 00163 GuardType guard(this->strategy_lock_); 00164 00165 strategy = this->send_strategy_; 00166 } 00167 00168 if (!strategy.is_nil()) { 00169 strategy->send_stop(repoId); 00170 } 00171 }
void OpenDDS::DCPS::DataLink::set_dscp_codepoint | ( | int | cp, | |
ACE_SOCK & | socket | |||
) |
Definition at line 964 of file DataLink.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
Referenced by OpenDDS::DCPS::UdpDataLink::open().
00965 { 00966 /** 00967 * The following IPV6 code was lifted in spirit from the RTCORBA 00968 * implementation of setting the DiffServ codepoint. 00969 */ 00970 int result = 0; 00971 00972 // Shift the code point up to bits, so that we only use the DS field 00973 int tos = cp << 2; 00974 00975 const char* which = "IPV4 TOS"; 00976 #if defined (ACE_HAS_IPV6) 00977 ACE_INET_Addr local_address; 00978 00979 if (socket.get_local_addr(local_address) == -1) { 00980 return; 00981 00982 } else if (local_address.get_type() == AF_INET6) 00983 #if !defined (IPV6_TCLASS) 00984 { 00985 if (DCPS_debug_level > 0) { 00986 ACE_ERROR((LM_ERROR, 00987 ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ") 00988 ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"), 00989 cp)); 00990 } 00991 00992 return; 00993 } 00994 00995 #else /* IPV6_TCLASS */ 00996 { 00997 which = "IPV6 TCLASS"; 00998 result = socket.set_option( 00999 IPPROTO_IPV6, 01000 IPV6_TCLASS, 01001 &tos, 01002 sizeof(tos)); 01003 01004 } else // This is a bit tricky and might be hard to follow... 01005 01006 #endif /* IPV6_TCLASS */ 01007 #endif /* ACE_HAS_IPV6 */ 01008 01009 #ifdef IP_TOS 01010 result = socket.set_option( 01011 IPPROTO_IP, 01012 IP_TOS, 01013 &tos, 01014 sizeof(tos)); 01015 01016 if ((result == -1) && (errno != ENOTSUP) 01017 #ifdef WSAEINVAL 01018 && (errno != WSAEINVAL) 01019 #endif 01020 ) { 01021 #endif // IP_TOS 01022 ACE_DEBUG((LM_DEBUG, 01023 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ") 01024 ACE_TEXT("failed to set the %C codepoint to %d: %m, ") 01025 ACE_TEXT("try running as superuser.\n"), 01026 which, 01027 cp)); 01028 #ifdef IP_TOS 01029 } else if (DCPS_debug_level > 4) { 01030 ACE_DEBUG((LM_DEBUG, 01031 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ") 01032 ACE_TEXT("set %C codepoint to %d.\n"), 01033 which, 01034 cp)); 01035 } 01036 #endif 01037 }
ACE_INLINE void OpenDDS::DCPS::DataLink::set_scheduling_release | ( | bool | scheduling_release | ) |
Definition at line 174 of file DataLink.inl.
References scheduling_release_.
Referenced by cancel_release(), OpenDDS::DCPS::TcpTransport::release_datalink(), and transport_shutdown().
00175 { 00176 this->scheduling_release_ = scheduling_release; 00177 }
ACE_INLINE int OpenDDS::DCPS::DataLink::start | ( | const TransportSendStrategy_rch & | send_strategy, | |
const TransportStrategy_rch & | receive_strategy | |||
) | [protected] |
This is how the subclass "announces" to this DataLink base class that this DataLink has now been "connected" and should start the supplied strategy objects. This start method is also going to keep a "copy" of the references to the strategy objects. Also note that it is acceptable to pass-in a NULL (0) TransportReceiveStrategy*, but it is assumed that the TransportSendStrategy* argument is not NULL.
If the start() method fails to start either strategy, then a -1 is returned. Otherwise, a 0 is returned. In the failure case, if one of the strategy objects was started successfully, then it will be stopped before the start() method returns -1.
Definition at line 235 of file DataLink.inl.
References DBG_ENTRY_LVL, invoke_on_start_callbacks(), OpenDDS::DCPS::RcHandle< T >::is_nil(), receive_strategy_, send_strategy_, and started_.
Referenced by OpenDDS::DCPS::MulticastDataLink::join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ShmemDataLink::open(), and OpenDDS::DCPS::RtpsUdpDataLink::open().
00237 { 00238 DBG_ENTRY_LVL("DataLink","start",6); 00239 00240 // We assume that the send_strategy is not NULL, but the receive_strategy 00241 // is allowed to be NULL. 00242 00243 // Attempt to start the strategies, and if there is a start() failure, 00244 // make sure to stop() any strategy that was already start()'ed. 00245 if (send_strategy->start() != 0) { 00246 // Failed to start the TransportSendStrategy. 00247 invoke_on_start_callbacks(false); 00248 return -1; 00249 } 00250 00251 if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) { 00252 // Failed to start the TransportReceiveStrategy. 00253 00254 // Remember to stop() the TransportSendStrategy since we did start it, 00255 // and now need to "undo" that action. 00256 send_strategy->stop(); 00257 invoke_on_start_callbacks(false); 00258 return -1; 00259 } 00260 00261 // We started both strategy objects. Save them to data members since 00262 // we will now take ownership of them. 00263 { 00264 GuardType guard(this->strategy_lock_); 00265 00266 this->send_strategy_ = send_strategy; 00267 this->receive_strategy_ = receive_strategy; 00268 } 00269 invoke_on_start_callbacks(true); 00270 { 00271 //catch any associations added during initial invoke_on_start_callbacks 00272 //only after first use_datalink has resolved does datalink's state truly 00273 //change to started, thus can't let pending associations proceed normally yet 00274 GuardType guard(this->strategy_lock_); 00275 this->started_ = true; 00276 } 00277 //Now state transitioned to started so no new on_start_callbacks will be added 00278 //so resolve any added during transition to started. 00279 invoke_on_start_callbacks(true); 00280 return 0; 00281 }
void OpenDDS::DCPS::DataLink::stop | ( | ) |
The stop method is used to stop the DataLink prior to shutdown.
Definition at line 217 of file DataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::is_nil(), pre_stop_i(), receive_strategy_, scheduled_to_stop_at_, send_strategy_, stop_i(), and stopped_.
Referenced by handle_exception(), OpenDDS::DCPS::UdpTransport::release_datalink(), OpenDDS::DCPS::ShmemTransport::release_datalink(), and transport_shutdown().
00218 { 00219 this->pre_stop_i(); 00220 00221 TransportSendStrategy_rch send_strategy; 00222 TransportStrategy_rch recv_strategy; 00223 00224 { 00225 GuardType guard(this->strategy_lock_); 00226 00227 if (this->stopped_) return; 00228 00229 send_strategy = this->send_strategy_; 00230 this->send_strategy_ = 0; 00231 00232 recv_strategy = this->receive_strategy_; 00233 this->receive_strategy_ = 0; 00234 } 00235 00236 if (!send_strategy.is_nil()) { 00237 send_strategy->stop(); 00238 } 00239 00240 if (!recv_strategy.is_nil()) { 00241 recv_strategy->stop(); 00242 } 00243 00244 this->stop_i(); 00245 this->stopped_ = true; 00246 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00247 }
void OpenDDS::DCPS::DataLink::stop_i | ( | ) | [protected, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, OpenDDS::DCPS::TcpDataLink, and OpenDDS::DCPS::UdpDataLink.
Definition at line 463 of file DataLink.cpp.
References DBG_ENTRY_LVL.
Referenced by stop().
00464 { 00465 DBG_ENTRY_LVL("DataLink", "stop_i", 6); 00466 }
GUIDSeq * OpenDDS::DCPS::DataLink::target_intersection | ( | const RepoId & | pub_id, | |
const GUIDSeq & | in, | |||
size_t & | n_subs | |||
) |
For a given publication "pub_id", store the total number of corresponding subscriptions in "n_subs" and given a set of subscriptions (the "in" sequence), return the subset of the input set "in" which are targets of this DataLink (see is_target()).
Definition at line 876 of file DataLink.cpp.
References assoc_by_local_, and OpenDDS::DCPS::push_back().
00878 { 00879 GUIDSeq_var res; 00880 GuardType guard(this->pub_sub_maps_lock_); 00881 AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id); 00882 00883 if (iter != assoc_by_local_.end()) { 00884 n_subs = iter->second.size(); 00885 const CORBA::ULong len = in.length(); 00886 00887 for (CORBA::ULong i(0); i < len; ++i) { 00888 if (iter->second.count(in[i])) { 00889 if (res.ptr() == 0) { 00890 res = new GUIDSeq; 00891 } 00892 00893 push_back(res.inout(), in[i]); 00894 } 00895 } 00896 } 00897 00898 return res._retn(); 00899 }
ACE_INLINE void OpenDDS::DCPS::DataLink::terminate_send | ( | ) |
Definition at line 322 of file DataLink.inl.
References send_strategy_.
00323 { 00324 this->send_strategy_->terminate_send(false); 00325 }
ACE_INLINE Priority OpenDDS::DCPS::DataLink::transport_priority | ( | ) | const |
Definition at line 34 of file DataLink.inl.
References transport_priority_.
00035 { 00036 return this->transport_priority_; 00037 }
ACE_INLINE Priority & OpenDDS::DCPS::DataLink::transport_priority | ( | ) |
Accessors for the TRANSPORT_PRIORITY value associated with this link.
Definition at line 27 of file DataLink.inl.
References transport_priority_.
Referenced by OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00028 { 00029 return this->transport_priority_; 00030 }
void OpenDDS::DCPS::DataLink::transport_shutdown | ( | ) |
Our TransportImpl will inform us if it is being shutdown() by calling this method.
Definition at line 661 of file DataLink.cpp.
References DBG_ENTRY_LVL, impl_, scheduled_to_stop_at_, set_scheduling_release(), and stop().
00662 { 00663 DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6); 00664 00665 if (!this->send_strategy_.is_nil()) { 00666 this->send_strategy_->transport_shutdown(); 00667 } 00668 00669 //this->cancel_release(); 00670 this->set_scheduling_release(false); 00671 this->scheduled_to_stop_at_ = ACE_Time_Value::zero; 00672 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer(); 00673 reactor->cancel_timer(this); 00674 00675 this->stop(); 00676 00677 // Drop our reference to the TransportImpl object 00678 this->impl_ = 0; 00679 }
friend class DataLinkCleanupTask [friend] |
Definition at line 66 of file DataLink.h.
OpenDDS_Dcps_Export std::ostream& operator<< | ( | std::ostream & | str, | |
const DataLink & | value | |||
) | [friend] |
Convenience function for diagnostic information.
Definition at line 1041 of file DataLink.cpp.
01042 { 01043 str << " There are " << value.assoc_by_local_.size() 01044 << " local entities currently using this link comprising following associations:" 01045 << std::endl; 01046 01047 for (DataLink::AssocByLocal::const_iterator 01048 localId = value.assoc_by_local_.begin(); 01049 localId != value.assoc_by_local_.end(); 01050 ++localId) { 01051 for (RepoIdSet::const_iterator 01052 remoteId = localId->second.begin(); 01053 remoteId != localId->second.end(); 01054 ++remoteId) { 01055 str << GuidConverter(localId->first) << " --> " 01056 << GuidConverter(*remoteId) << " " << std::endl; 01057 } 01058 } 01059 return str; 01060 }
friend class ThreadPerConnectionSendTask [friend] |
AssocByLocal OpenDDS::DCPS::DataLink::assoc_by_local_ [private] |
Definition at line 363 of file DataLink.h.
Referenced by handle_timeout(), make_reservation(), notify(), OpenDDS::DCPS::operator<<(), peer_ids(), prepare_release(), release_reservations(), target_intersection(), and ~DataLink().
AssocByRemote OpenDDS::DCPS::DataLink::assoc_by_remote_ [private] |
Definition at line 360 of file DataLink.h.
Referenced by data_received_i(), handle_timeout(), is_target(), make_reservation(), and release_reservations().
AssocByLocal OpenDDS::DCPS::DataLink::assoc_releasing_ [private] |
Definition at line 380 of file DataLink.h.
Referenced by clear_associations(), and prepare_release().
ACE_Time_Value OpenDDS::DCPS::DataLink::datalink_release_delay_ [protected] |
Configurable delay in milliseconds that the datalink should be released after all associations are removed.
Definition at line 399 of file DataLink.h.
Referenced by DataLink(), datalink_release_delay(), and schedule_delayed_release().
DataBlockAllocator* OpenDDS::DCPS::DataLink::db_allocator_ [protected] |
Definition at line 408 of file DataLink.h.
Referenced by create_control(), DataLink(), and ~DataLink().
If default_listener_ is not null and this DataLink receives a sample from a publication GUID that's not in pub_map_, it will call data_received() on the default_listener_.
Definition at line 355 of file DataLink.h.
Referenced by data_received_i(), and default_listener().
ACE_UINT64 OpenDDS::DCPS::DataLink::id_ [private] |
The id for this DataLink.
Definition at line 372 of file DataLink.h.
Referenced by DataLink(), and id().
A (smart) pointer to the TransportImpl that created this DataLink.
Definition at line 369 of file DataLink.h.
Referenced by DataLink(), handle_exception(), handle_timeout(), impl(), release_reservations(), release_resources(), and transport_shutdown().
bool OpenDDS::DCPS::DataLink::is_active_ [protected] |
bool OpenDDS::DCPS::DataLink::is_loopback_ [protected] |
Is remote attached to same transport ?
Definition at line 411 of file DataLink.h.
Referenced by is_loopback(), and OpenDDS::DCPS::UdpDataLink::open().
Allocators for data and message blocks used by transport control samples when send_control is called.
Definition at line 407 of file DataLink.h.
Referenced by create_control(), DataLink(), and ~DataLink().
LockType OpenDDS::DCPS::DataLink::pub_sub_maps_lock_ [mutable, private] |
The transport receive strategy object for this DataLink.
Definition at line 286 of file DataLink.h.
Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::TcpDataLink::reconnect(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), start(), and stop().
IdToRecvListenerMap OpenDDS::DCPS::DataLink::recv_listeners_ [private] |
Definition at line 350 of file DataLink.h.
Referenced by make_reservation(), notify(), recv_listener_for(), and remove_listener().
AssocByLocal OpenDDS::DCPS::DataLink::released_assoc_by_local_ [private] |
Definition at line 366 of file DataLink.h.
Referenced by notify_connection_deleted(), and release_reservations().
LockType OpenDDS::DCPS::DataLink::released_assoc_by_local_lock_ [mutable, private] |
Definition at line 365 of file DataLink.h.
ACE_Time_Value OpenDDS::DCPS::DataLink::scheduled_to_stop_at_ [private] |
Definition at line 342 of file DataLink.h.
Referenced by cancel_release(), handle_exception(), schedule_stop(), stop(), and transport_shutdown().
bool OpenDDS::DCPS::DataLink::scheduling_release_ [private] |
Definition at line 385 of file DataLink.h.
Referenced by cancel_release(), and set_scheduling_release().
Allocator for TransportSendControlElements created when send_control is called.
Definition at line 403 of file DataLink.h.
Referenced by DataLink(), send_control(), and ~DataLink().
IdToSendListenerMap OpenDDS::DCPS::DataLink::send_listeners_ [private] |
Definition at line 346 of file DataLink.h.
Referenced by make_reservation(), notify(), remove_listener(), and send_listener_for().
Listener for TransportSendControlElements created in send_control.
Definition at line 417 of file DataLink.h.
Referenced by send_control().
The transport send strategy object for this DataLink.
Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, and OpenDDS::DCPS::UdpDataLink.
Definition at line 392 of file DataLink.h.
Referenced by add_on_start_callback(), make_reservation(), OpenDDS::DCPS::TcpDataLink::reconnect(), remove_all_msgs(), remove_sample(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), send_i(), send_start_i(), send_stop_i(), start(), stop(), and terminate_send().
bool OpenDDS::DCPS::DataLink::started_ [protected] |
bool OpenDDS::DCPS::DataLink::stopped_ [private] |
A boolean indicating if the DataLink has been stopped. This value is protected by the strategy_lock_.
Definition at line 341 of file DataLink.h.
Referenced by cancel_release(), and stop().
LockType OpenDDS::DCPS::DataLink::strategy_lock_ [protected] |
Definition at line 394 of file DataLink.h.
Referenced by add_on_start_callback(), invoke_on_start_callbacks(), make_reservation(), and remove_on_start_callback().
The task used to do the sending. This ThreadPerConnectionSendTask object is created when the thread_per_connection configuration is true. It only dedicate to this datalink.
Definition at line 377 of file DataLink.h.
Referenced by DataLink(), pre_stop_i(), remove_sample(), send(), send_start(), send_stop(), and ~DataLink().
TRANSPORT_PRIORITY value associated with the link.
Definition at line 383 of file DataLink.h.
Referenced by transport_priority().