OpenDDS::DCPS::DataLink Class Reference

#include <DataLink.h>

Inheritance diagram for OpenDDS::DCPS::DataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataLink:
Collaboration graph
[legend]

List of all members.

Public Types

enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
typedef WeakRcHandle
< TransportClient
TransportClient_wrch
typedef std::pair
< TransportClient_wrch, RepoId
OnStartCallback

Public Member Functions

 DataLink (TransportImpl &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object.
virtual ~DataLink ()
int handle_exception (ACE_HANDLE)
void schedule_stop (const ACE_Time_Value &schedule_to_stop_at)
void stop ()
 The stop method is used to stop the DataLink prior to shutdown.
void resume_send ()
int make_reservation (const RepoId &remote_subscription_id, const RepoId &local_publication_id, const TransportSendListener_wrch &send_listener)
int make_reservation (const RepoId &remote_publication_id, const RepoId &local_subcription_id, const TransportReceiveListener_wrch &receive_listener)
void release_reservations (RepoId remote_id, RepoId local_id, DataLinkSetMap &released_locals)
void schedule_delayed_release ()
const ACE_Time_Valuedatalink_release_delay () const
void remove_listener (const RepoId &local_id)
void send_start ()
void send (TransportQueueElement *element)
void send_stop (RepoId repoId)
virtual RemoveResult remove_sample (const DataSampleElement *sample, void *context)
void remove_all_msgs (RepoId pub_id)
int data_received (ReceivedDataSample &sample, const RepoId &readerId=GUID_UNKNOWN)
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object.
void transport_shutdown ()
void notify (ConnectionNotice notice)
virtual void pre_stop_i ()
bool release_resources ()
void terminate_send ()
bool is_target (const RepoId &remote_sub_id)
void clear_associations ()
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
Prioritytransport_priority ()
Priority transport_priority () const
bool & is_loopback ()
bool is_loopback () const
bool & is_active ()
bool is_active () const
bool cancel_release ()
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
GUIDSeqtarget_intersection (const RepoId &pub_id, const GUIDSeq &in, size_t &n_subs)
TransportImplimpl () const
void default_listener (const TransportReceiveListener_wrch &trl)
TransportReceiveListener_wrch default_listener () const
bool add_on_start_callback (const TransportClient_wrch &client, const RepoId &remote)
void remove_on_start_callback (const TransportClient_wrch &client, const RepoId &remote)
void invoke_on_start_callbacks (bool success)
void set_scheduling_release (bool scheduling_release)
virtual void send_final_acks (const RepoId &readerid)

Protected Types

typedef ACE_Guard< LockTypeGuardType

Protected Member Functions

int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy)
virtual void stop_i ()
void send_start_i ()
virtual void send_i (TransportQueueElement *element, bool relink=true)
void send_stop_i (RepoId repoId)
GUIDSeqpeer_ids (const RepoId &local_id) const
 OPENDDS_VECTOR (OnStartCallback) on_start_callbacks_

Static Protected Member Functions

static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods.

Protected Attributes

TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink.
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
LockType strategy_lock_
ACE_Time_Value datalink_release_delay_
unique_ptr< MessageBlockAllocatormb_allocator_
unique_ptr< DataBlockAllocatordb_allocator_
bool is_loopback_
 Is remote attached to same transport ?
bool is_active_
 Is pub or sub ?
bool started_
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control.

Private Types

typedef ACE_SYNCH_MUTEX LockType

Private Member Functions

const char * connection_notice_as_str (ConnectionNotice notice)
 Helper function to output the enum as a string to help debugging.
TransportSendListener_rch send_listener_for (const RepoId &pub_id) const
TransportReceiveListener_rch recv_listener_for (const RepoId &sub_id) const
void prepare_release ()
virtual bool handle_send_request_ack (TransportQueueElement *element)
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
virtual void release_remote_i (const RepoId &)
virtual void release_reservations_i (const RepoId &, const RepoId &)
void data_received_i (ReceivedDataSample &sample, const RepoId &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
void notify_reactor ()
typedef OPENDDS_MAP_CMP (RepoId, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap
 Map publication Id value to TransportSendListener.
typedef OPENDDS_MAP_CMP (RepoId, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap
 Map subscription Id value to TransportReceieveListener.
typedef OPENDDS_MAP_CMP (RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote
typedef OPENDDS_MAP_CMP (RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal

Private Attributes

bool stopped_
ACE_Time_Value scheduled_to_stop_at_
IdToSendListenerMap send_listeners_
IdToRecvListenerMap recv_listeners_
TransportReceiveListener_wrch default_listener_
LockType pub_sub_maps_lock_
AssocByRemote assoc_by_remote_
AssocByLocal assoc_by_local_
TransportImplimpl_
 A (smart) pointer to the TransportImpl that created this DataLink.
ACE_UINT64 id_
 The id for this DataLink.
unique_ptr
< ThreadPerConnectionSendTask
thr_per_con_send_task_
AssocByLocal assoc_releasing_
Priority transport_priority_
 TRANSPORT_PRIORITY value associated with the link.
bool scheduling_release_

Friends

class DataLinkCleanupTask
class ThreadPerConnectionSendTask
OpenDDS_Dcps_Export std::ostream & operator<< (std::ostream &str, const DataLink &value)
 Convenience function for diagnostic information.

Detailed Description

This class manages the reservations based on the associations between datareader and datawriter. It basically delegate the samples to send strategy for sending and deliver the samples received by receive strategy to the listener.

Notes about object ownership: 1) Own the send strategy object and receive strategy object. 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection is enabled.

Definition at line 66 of file DataLink.h.


Member Typedef Documentation

Definition at line 392 of file DataLink.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLink::LockType [private]

Definition at line 337 of file DataLink.h.

Definition at line 249 of file DataLink.h.

Definition at line 248 of file DataLink.h.


Member Enumeration Documentation

Enumerator:
DISCONNECTED 
RECONNECTED 
LOST 

Definition at line 73 of file DataLink.h.

00073                         {
00074     DISCONNECTED,
00075     RECONNECTED,
00076     LOST
00077   };


Constructor & Destructor Documentation

OpenDDS::DCPS::DataLink::DataLink ( TransportImpl impl,
Priority  priority,
bool  is_loopback,
bool  is_active 
)

Only called by our TransportImpl object.

A DataLink object is always created by a TransportImpl object. Thus, the TransportImpl object passed-in here is the object that created this DataLink. The ability to specify a priority for individual links is included for construction so its value can be available for activating any threads.

Definition at line 42 of file DataLink.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::config(), OpenDDS::DCPS::TransportInst::datalink_control_chunks_, OpenDDS::DCPS::TransportInst::datalink_release_delay_, datalink_release_delay_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_next_datalink_id(), id_, LM_DEBUG, LM_ERROR, mb_allocator_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), ACE_Time_Value::sec(), thr_per_con_send_task_, OpenDDS::DCPS::TransportInst::thread_per_connection_, ThreadPerConnectionSendTask, and ACE_Time_Value::usec().

00044   : stopped_(false),
00045     scheduled_to_stop_at_(ACE_Time_Value::zero),
00046     impl_(impl),
00047     transport_priority_(priority),
00048     scheduling_release_(false),
00049     is_loopback_(is_loopback),
00050     is_active_(is_active),
00051     started_(false),
00052     send_response_listener_("DataLink")
00053 {
00054   DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00055 
00056   datalink_release_delay_.sec(impl.config().datalink_release_delay_ / 1000);
00057   datalink_release_delay_.usec(impl.config().datalink_release_delay_ % 1000 * 1000);
00058 
00059   id_ = DataLink::get_next_datalink_id();
00060 
00061   if (impl.config().thread_per_connection_) {
00062     this->thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this));
00063 
00064     if (this->thr_per_con_send_task_->open() == -1) {
00065       ACE_ERROR((LM_ERROR,
00066                  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00067                  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00068 
00069     } else if (DCPS_debug_level > 4) {
00070       ACE_DEBUG((LM_DEBUG,
00071                  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00072                  ACE_TEXT("started new thread to send data with.\n")));
00073     }
00074   }
00075 
00076   // Initialize transport control sample allocators:
00077   size_t control_chunks = impl.config().datalink_control_chunks_;
00078 
00079   this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
00080   this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
00081 }

Here is the call graph for this function:

OpenDDS::DCPS::DataLink::~DataLink (  )  [virtual]

Definition at line 83 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, DBG_ENTRY_LVL, LM_WARNING, and thr_per_con_send_task_.

00084 {
00085   DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00086 
00087   if (!assoc_by_local_.empty()) {
00088     ACE_DEBUG((LM_WARNING,
00089                ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00090                ACE_TEXT("link still in use by %d entities when deleted!\n"),
00091                this, assoc_by_local_.size()));
00092   }
00093 
00094   if (this->thr_per_con_send_task_ != 0) {
00095     this->thr_per_con_send_task_->close(1);
00096   }
00097 }

Here is the call graph for this function:


Member Function Documentation

bool OpenDDS::DCPS::DataLink::add_on_start_callback ( const TransportClient_wrch client,
const RepoId remote 
)

Definition at line 107 of file DataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, started_, and strategy_lock_.

00108 {
00109   GuardType guard(strategy_lock_);
00110 
00111   if (started_ && !send_strategy_.is_nil()) {
00112     return false; // link already started
00113   }
00114   on_start_callbacks_.push_back(std::make_pair(client, remote));
00115   return true;
00116 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataLink::cancel_release (  ) 

Definition at line 449 of file DataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, scheduling_release_, set_scheduling_release(), stopped_, and ACE_Time_Value::zero.

00450 {
00451   DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00452   if (stopped_) {
00453     if (DCPS_debug_level > 0) {
00454       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00455     }
00456     return false;
00457   }
00458   if (scheduling_release_) {
00459     if (DCPS_debug_level > 0) {
00460       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00461     }
00462     this->set_scheduling_release(false);
00463     this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00464     notify_reactor();
00465   }
00466   return true;
00467 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataLink::clear_associations (  ) 

This is called by DataLinkCleanupTask thread to remove the associations based on the snapshot in release_resources().

Definition at line 901 of file DataLink.cpp.

References assoc_releasing_, recv_listener_for(), send_listener_for(), and OpenDDS::DCPS::set_to_seq().

00902 {
00903   for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00904        iter != assoc_releasing_.end(); ++iter) {
00905     TransportSendListener_rch tsl = send_listener_for(iter->first);
00906     if (tsl) {
00907       ReaderIdSeq sub_ids;
00908       set_to_seq(iter->second, sub_ids);
00909       tsl->remove_associations(sub_ids, false);
00910       continue;
00911     }
00912     TransportReceiveListener_rch trl = recv_listener_for(iter->first);
00913     if (trl) {
00914       WriterIdSeq pub_ids;
00915       set_to_seq(iter->second, pub_ids);
00916       trl->remove_associations(pub_ids, false);
00917     }
00918   }
00919   assoc_releasing_.clear();
00920 }

Here is the call graph for this function:

ACE_INLINE const char * OpenDDS::DCPS::DataLink::connection_notice_as_str ( ConnectionNotice  notice  )  [private]

Helper function to output the enum as a string to help debugging.

Definition at line 289 of file DataLink.inl.

Referenced by notify().

00290 {
00291   static const char* NoticeStr[] = { "DISCONNECTED",
00292                                      "RECONNECTED",
00293                                      "LOST"
00294                                    };
00295 
00296   return NoticeStr [notice];
00297 }

Here is the caller graph for this function:

ACE_Message_Block * OpenDDS::DCPS::DataLink::create_control ( char  submessage_id,
DataSampleHeader header,
Message_Block_Ptr  data 
)

This allows a subclass to easily create a transport control sample to send via send_control.

Definition at line 476 of file DataLink.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::DataSampleHeader::submessage_id_, OpenDDS::DCPS::TRANSPORT_CONTROL, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00479 {
00480   DBG_ENTRY_LVL("DataLink", "create_control", 6);
00481 
00482   header.byte_order_ = ACE_CDR_BYTE_ORDER;
00483   header.message_id_ = TRANSPORT_CONTROL;
00484   header.submessage_id_ = submessage_id;
00485   header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00486 
00487   ACE_Message_Block* message = 0;
00488   ACE_NEW_MALLOC_RETURN(message,
00489                         static_cast<ACE_Message_Block*>(
00490                           this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00491                         ACE_Message_Block(header.max_marshaled_size(),
00492                                           ACE_Message_Block::MB_DATA,
00493                                           data.release(),
00494                                           0,  // data
00495                                           0,  // allocator_strategy
00496                                           0,  // locking_strategy
00497                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00498                                           ACE_Time_Value::zero,
00499                                           ACE_Time_Value::max_time,
00500                                           this->db_allocator_.get(),
00501                                           this->mb_allocator_.get()),
00502                         0);
00503 
00504   if (!(*message << header)) {
00505     ACE_ERROR((LM_ERROR,
00506                ACE_TEXT("(%P|%t) DataLink::create_control: ")
00507                ACE_TEXT("cannot put header in message\n")));
00508     ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
00509     message = 0;
00510   }
00511 
00512   return message;
00513 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual TransportQueueElement* OpenDDS::DCPS::DataLink::customize_queue_element ( TransportQueueElement element  )  [inline, private, virtual]

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 320 of file DataLink.h.

Referenced by send().

00322   {
00323     return element;
00324   }

Here is the caller graph for this function:

int OpenDDS::DCPS::DataLink::data_received ( ReceivedDataSample sample,
const RepoId readerId = GUID_UNKNOWN 
)

This is called by our TransportReceiveStrategy object when it has received a complete data sample. This method will cause the appropriate TransportReceiveListener objects to be told that data_received(). If readerId is not GUID_UNKNOWN, only the TransportReceiveListener with that ID (if one exists) will receive the data.

This method will "deliver" the sample to all TransportReceiveListeners within this DataLink that are interested in the (remote) publisher id that sent the sample.

Definition at line 538 of file DataLink.cpp.

References data_received_i(), and OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED.

Referenced by OpenDDS::DCPS::ReliableSession::deliver_held_data(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::handle_exception(), and OpenDDS::DCPS::MulticastDataLink::sample_received().

00540 {
00541   data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00542   return 0;
00543 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::data_received_i ( ReceivedDataSample sample,
const RepoId readerId,
const RepoIdSet &  incl_excl,
ReceiveListenerSet::ConstrainReceiveSet  constrain 
) [private]

Definition at line 552 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_remote_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::DataSampleHeader::content_filter_entries_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, default_listener_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ReceivedDataSample::header_, if(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED, OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by data_received(), and data_received_include().

00556 {
00557   DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00558   // Which remote publication sent this message?
00559   const RepoId& publication_id = sample.header_.publication_id_;
00560 
00561   // Locate the set of TransportReceiveListeners associated with this
00562   // DataLink that are interested in hearing about any samples received
00563   // from the remote publisher_id.
00564   if (DCPS_debug_level > 9) {
00565     const GuidConverter converter(publication_id);
00566     const GuidConverter reader(readerId);
00567     ACE_DEBUG((LM_DEBUG,
00568                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569                ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
00570                OPENDDS_STRING(converter).c_str(),
00571                to_string(sample.header_).c_str(),
00572                OPENDDS_STRING(reader).c_str(),
00573                constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00574   }
00575 
00576   if (Transport_debug_level > 9) {
00577     const GuidConverter converter(publication_id);
00578     ACE_DEBUG((LM_DEBUG,
00579                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00580                ACE_TEXT("from publication %C received sample: %C.\n"),
00581                OPENDDS_STRING(converter).c_str(),
00582                to_string(sample.header_).c_str()));
00583   }
00584 
00585   ReceiveListenerSet_rch listener_set;
00586   {
00587     GuardType guard(this->pub_sub_maps_lock_);
00588     AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00589     if (iter != assoc_by_remote_.end())
00590       listener_set = iter->second;
00591 
00592     if (listener_set.is_nil()) {
00593       TransportReceiveListener_rch listener = this->default_listener_.lock();
00594       if (listener)
00595         listener->data_received(sample);
00596       return;
00597     }
00598   }
00599 
00600   if (listener_set.is_nil()) {
00601     // Nobody has any interest in this message.  Drop it on the floor.
00602     if (Transport_debug_level > 4) {
00603       const GuidConverter converter(publication_id);
00604       ACE_DEBUG((LM_DEBUG,
00605                  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00606                  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00607                  OPENDDS_STRING(converter).c_str()));
00608     }
00609 
00610     return;
00611   }
00612 
00613   if (readerId != GUID_UNKNOWN) {
00614     listener_set->data_received(sample, readerId);
00615     return;
00616   }
00617 
00618 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00619 
00620   if (sample.header_.content_filter_
00621       && sample.header_.content_filter_entries_.length()) {
00622     ReceiveListenerSet subset(*listener_set.in());
00623     subset.remove_all(sample.header_.content_filter_entries_);
00624     subset.data_received(sample, incl_excl, constrain);
00625 
00626   } else {
00627 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00628 
00629     if (DCPS_debug_level > 9) {
00630       // Just get the set to do our dirty work by having it iterate over its
00631       // collection of TransportReceiveListeners, and invoke the data_received()
00632       // method on each one.
00633       OPENDDS_STRING included_ids;
00634       bool first = true;
00635       RepoIdSet::const_iterator iter = incl_excl.begin();
00636       while(iter != incl_excl.end()) {
00637         included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00638         first = false;
00639         ++iter;
00640       }
00641       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
00642                  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00643     }
00644     listener_set->data_received(sample, incl_excl, constrain);
00645 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00646   }
00647 
00648 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00649 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::data_received_include ( ReceivedDataSample sample,
const RepoIdSet &  incl 
)

Varation of data_received() that allows for excluding a subset of readers by specifying which readers specifically should receive. Any reader ID that does not appear in the include set will be skipped.

Definition at line 546 of file DataLink.cpp.

References data_received_i(), OpenDDS::DCPS::GUID_UNKNOWN, and OpenDDS::DCPS::ReceiveListenerSet::SET_INCLUDED.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

00547 {
00548   data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00549 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE const ACE_Time_Value & OpenDDS::DCPS::DataLink::datalink_release_delay (  )  const

Definition at line 62 of file DataLink.inl.

References datalink_release_delay_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00063 {
00064   return this->datalink_release_delay_;
00065 }

Here is the caller graph for this function:

ACE_INLINE TransportReceiveListener_wrch OpenDDS::DCPS::DataLink::default_listener (  )  const

Definition at line 379 of file DataLink.inl.

References default_listener_, and pub_sub_maps_lock_.

00380 {
00381   GuardType guard(this->pub_sub_maps_lock_);
00382   return this->default_listener_;
00383 }

ACE_INLINE void OpenDDS::DCPS::DataLink::default_listener ( const TransportReceiveListener_wrch trl  ) 

Definition at line 371 of file DataLink.inl.

References default_listener_, and pub_sub_maps_lock_.

00372 {
00373   GuardType guard(this->pub_sub_maps_lock_);
00374   this->default_listener_ = trl;
00375 }

ACE_UINT64 OpenDDS::DCPS::DataLink::get_next_datalink_id (  )  [static, protected]

Used to provide unique Ids to all DataLink methods.

Definition at line 653 of file DataLink.cpp.

References ACE_TEXT(), id(), and LM_ERROR.

Referenced by DataLink().

00654 {
00655   static ACE_UINT64 next_id = 0;
00656   static LockType lock;
00657 
00658   ACE_UINT64 id;
00659   {
00660     GuardType guard(lock);
00661     id = next_id++;
00662 
00663     if (0 == next_id) {
00664       ACE_ERROR((LM_ERROR,
00665                  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00666                  ACE_TEXT("has rolled over and is reusing ids!\n")));
00667     }
00668   }
00669 
00670   return id;
00671 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::DataLink::handle_close ( ACE_HANDLE  h,
ACE_Reactor_Mask  m 
) [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 937 of file DataLink.cpp.

References handle_timeout(), ACE_Event_Handler::TIMER_MASK, and ACE_Time_Value::zero.

00938 {
00939   if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00940     // Reactor is shutting down with this timer still pending.
00941     // Take the same cleanup actions as if the timeout had expired.
00942     handle_timeout(ACE_Time_Value::zero, 0);
00943   }
00944 
00945   return 0;
00946 }

Here is the call graph for this function:

int OpenDDS::DCPS::DataLink::handle_exception ( ACE_HANDLE   )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 154 of file DataLink.cpp.

References ACE_TEXT(), ACE_Reactor_Timer_Interface::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::gettimeofday(), handle_timeout(), impl_, LM_DEBUG, ACE_Event_Handler::reactor(), ACE_Reactor_Timer_Interface::schedule_timer(), scheduled_to_stop_at_, scheduling_release_, stop(), OpenDDS::DCPS::TransportImpl::timer(), and ACE_Time_Value::zero.

00155 {
00156   if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00157     if (DCPS_debug_level > 0) {
00158       ACE_DEBUG((LM_DEBUG,
00159                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00160     }
00161     ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00162     if (reactor->cancel_timer(this) > 0) {
00163       if (DCPS_debug_level > 0) {
00164         ACE_DEBUG((LM_DEBUG,
00165                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00166       }
00167     }
00168     return 0;
00169   } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00170     if (this->scheduling_release_) {
00171       if (DCPS_debug_level > 0) {
00172         ACE_DEBUG((LM_DEBUG,
00173                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00174       }
00175       this->handle_timeout(ACE_Time_Value::zero, 0);
00176       return 0;
00177     }
00178     if (DCPS_debug_level > 0) {
00179       ACE_DEBUG((LM_DEBUG,
00180                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00181     }
00182     this->stop();
00183     return 0;
00184   } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
00185     if (DCPS_debug_level > 0) {
00186       ACE_DEBUG((LM_DEBUG,
00187                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00188     }
00189     ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00190     ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00191     reactor->schedule_timer(this, 0, future_release_time);
00192   }
00193   return 0;
00194 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataLink::handle_send_request_ack ( TransportQueueElement element  )  [private, virtual]

Reimplemented in OpenDDS::DCPS::TcpDataLink.

Definition at line 1025 of file DataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered().

Referenced by send().

01026 {
01027   element->data_delivered();
01028   return true;
01029 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::DataLink::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
) [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 923 of file DataLink.cpp.

References assoc_by_local_, assoc_by_remote_, impl_, LM_DEBUG, scheduled_to_stop_at_, stop(), OpenDDS::DCPS::TransportImpl::unbind_link(), VDBG_LVL, and ACE_Time_Value::zero.

Referenced by handle_close(), and handle_exception().

00924 {
00925   if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00926     VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00927     impl_.unbind_link(this);
00928 
00929     if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00930       this->stop();
00931     }
00932   }
00933   return 0;
00934 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE DataLinkIdType OpenDDS::DCPS::DataLink::id ( void   )  const

Obtain a unique identifier for this DataLink object.

Definition at line 231 of file DataLink.inl.

References DBG_ENTRY_LVL, and id_.

Referenced by get_next_datalink_id().

00232 {
00233   DBG_ENTRY_LVL("DataLink","id",6);
00234   return id_;
00235 }

Here is the caller graph for this function:

TransportImpl & OpenDDS::DCPS::DataLink::impl ( void   )  const
void OpenDDS::DCPS::DataLink::invoke_on_start_callbacks ( bool  success  ) 

Definition at line 131 of file DataLink.cpp.

References ACE_Guard< ACE_LOCK >::release(), and strategy_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received(), and start().

00132 {
00133   const DataLink_rch link(success ? this : 0, inc_count());
00134 
00135   while (true) {
00136     GuardType guard(strategy_lock_);
00137 
00138     if (on_start_callbacks_.empty()) {
00139       break;
00140     }
00141 
00142     OnStartCallback last_callback = on_start_callbacks_.back();
00143     on_start_callbacks_.pop_back();
00144 
00145     guard.release();
00146     TransportClient_rch client = last_callback.first.lock();
00147     if (client)
00148       client->use_datalink(last_callback.second, link);
00149   }
00150 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_active (  )  const

Definition at line 56 of file DataLink.inl.

References is_active_.

00057 {
00058   return this->is_active_;
00059 }

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_active (  ) 

Definition at line 49 of file DataLink.inl.

References is_active_.

Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::TcpTransport::release_datalink(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00050 {
00051   return this->is_active_;
00052 }

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_loopback ( void   )  const

Definition at line 42 of file DataLink.inl.

References is_loopback_.

00043 {
00044   return this->is_loopback_;
00045 }

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_loopback ( void   ) 

Definition at line 35 of file DataLink.inl.

References is_loopback_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00036 {
00037   return this->is_loopback_;
00038 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataLink::is_target ( const RepoId remote_sub_id  ) 

This is called on publisher side to see if this link communicates with the provided sub.

Definition at line 855 of file DataLink.cpp.

References assoc_by_remote_, and pub_sub_maps_lock_.

00856 {
00857   GuardType guard(this->pub_sub_maps_lock_);
00858   return assoc_by_remote_.count(remote_sub_id);
00859 }

int OpenDDS::DCPS::DataLink::make_reservation ( const RepoId remote_publication_id,
const RepoId local_subcription_id,
const TransportReceiveListener_wrch receive_listener 
)

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Definition at line 301 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_strategy_, and strategy_lock_.

00304 {
00305   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00306 
00307   if (DCPS_debug_level > 9) {
00308     GuidConverter local(local_subscription_id), remote(remote_publication_id);
00309     ACE_DEBUG((LM_DEBUG,
00310                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00311                ACE_TEXT("creating association local subscription %C ")
00312                ACE_TEXT("<--> with remote publication  %C.\n"),
00313                OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00314   }
00315 
00316   {
00317     GuardType guard(strategy_lock_);
00318 
00319     if (!send_strategy_.is_nil()) {
00320       send_strategy_->link_released(false);
00321     }
00322   }
00323   {
00324     GuardType guard(pub_sub_maps_lock_);
00325 
00326     assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00327     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00328 
00329     if (rls.is_nil())
00330       rls = make_rch<ReceiveListenerSet>();
00331     rls->insert(local_subscription_id, receive_listener);
00332 
00333     recv_listeners_.insert(std::make_pair(local_subscription_id,
00334                                           receive_listener));
00335   }
00336   return 0;
00337 }

Here is the call graph for this function:

int OpenDDS::DCPS::DataLink::make_reservation ( const RepoId remote_subscription_id,
const RepoId local_publication_id,
const TransportSendListener_wrch send_listener 
)

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Definition at line 262 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, send_listeners_, send_strategy_, and strategy_lock_.

00265 {
00266   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00267 
00268   if (DCPS_debug_level > 9) {
00269     GuidConverter local(local_publication_id), remote(remote_subscription_id);
00270     ACE_DEBUG((LM_DEBUG,
00271                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00272                ACE_TEXT("creating association local publication  %C ")
00273                ACE_TEXT("<--> with remote subscription %C.\n"),
00274                OPENDDS_STRING(local).c_str(),
00275                OPENDDS_STRING(remote).c_str()));
00276   }
00277 
00278   {
00279     GuardType guard(strategy_lock_);
00280 
00281     if (!send_strategy_.is_nil()) {
00282       send_strategy_->link_released(false);
00283     }
00284   }
00285   {
00286     GuardType guard(pub_sub_maps_lock_);
00287 
00288     assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00289     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00290 
00291     if (rls.is_nil())
00292       rls = make_rch<ReceiveListenerSet>();
00293     rls->insert(local_publication_id, TransportReceiveListener_rch());
00294 
00295     send_listeners_.insert(std::make_pair(local_publication_id,send_listener));
00296   }
00297   return 0;
00298 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataLink::notify ( ConnectionNotice  notice  ) 

Notify the datawriters and datareaders that the connection is disconnected, lost, or reconnected. The datareader/datawriter will notify the corresponding listener.

Definition at line 692 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, connection_notice_as_str(), DBG_ENTRY_LVL, DISCONNECTED, LM_DEBUG, LM_ERROR, LOST, OPENDDS_STRING, pub_sub_maps_lock_, RECONNECTED, recv_listeners_, send_listeners_, OpenDDS::DCPS::set_to_seq(), OpenDDS::DCPS::Transport_debug_level, and VDBG.

00693 {
00694   DBG_ENTRY_LVL("DataLink", "notify", 6);
00695 
00696   VDBG((LM_DEBUG,
00697         ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00698         this,
00699         connection_notice_as_str(notice)));
00700 
00701   GuardType guard(this->pub_sub_maps_lock_);
00702 
00703   // Notify the datawriters
00704   // the lost publications due to a connection problem.
00705   for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00706        itr != send_listeners_.end(); ++itr) {
00707 
00708     TransportSendListener_rch tsl = itr->second.lock();
00709 
00710     if (tsl) {
00711       if (Transport_debug_level > 0) {
00712         GuidConverter converter(itr->first);
00713         ACE_DEBUG((LM_DEBUG,
00714                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00715                    ACE_TEXT("notify pub %C %C.\n"),
00716                    OPENDDS_STRING(converter).c_str(),
00717                    connection_notice_as_str(notice)));
00718       }
00719       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00720       if (local_it == assoc_by_local_.end()) {
00721         if (Transport_debug_level) {
00722           GuidConverter converter(itr->first);
00723           ACE_DEBUG((LM_DEBUG,
00724                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00725                      ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00726                      OPENDDS_STRING(converter).c_str(),
00727                      connection_notice_as_str(notice)));
00728         }
00729         break;
00730       }
00731       const RepoIdSet& rids = local_it->second;
00732 
00733       ReaderIdSeq subids;
00734       set_to_seq(rids, subids);
00735 
00736       switch (notice) {
00737       case DISCONNECTED:
00738         tsl->notify_publication_disconnected(subids);
00739         break;
00740 
00741       case RECONNECTED:
00742         tsl->notify_publication_reconnected(subids);
00743         break;
00744 
00745       case LOST:
00746         tsl->notify_publication_lost(subids);
00747         break;
00748 
00749       default:
00750         ACE_ERROR((LM_ERROR,
00751                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00752                    ACE_TEXT("unknown notice to TransportSendListener\n")));
00753         break;
00754       }
00755 
00756     } else {
00757       if (Transport_debug_level > 0) {
00758         GuidConverter converter(itr->first);
00759         ACE_DEBUG((LM_DEBUG,
00760                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00761                    ACE_TEXT("not notify pub %C %C \n"),
00762                    OPENDDS_STRING(converter).c_str(),
00763                    connection_notice_as_str(notice)));
00764       }
00765     }
00766 
00767   }
00768 
00769   // Notify the datareaders registered with TransportImpl
00770   // the lost subscriptions due to a connection problem.
00771   for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00772        itr != recv_listeners_.end(); ++itr) {
00773 
00774     TransportReceiveListener_rch trl = itr->second.lock();
00775 
00776     if (trl) {
00777       if (Transport_debug_level > 0) {
00778         GuidConverter converter(itr->first);
00779         ACE_DEBUG((LM_DEBUG,
00780                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00781                    ACE_TEXT("notify sub %C %C.\n"),
00782                    OPENDDS_STRING(converter).c_str(),
00783                    connection_notice_as_str(notice)));
00784       }
00785       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00786       if (local_it == assoc_by_local_.end()) {
00787         if (Transport_debug_level) {
00788           GuidConverter converter(itr->first);
00789           ACE_DEBUG((LM_DEBUG,
00790                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00791                      ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00792                      OPENDDS_STRING(converter).c_str(),
00793                      connection_notice_as_str(notice)));
00794         }
00795         break;
00796       }
00797       const RepoIdSet& rids = local_it->second;
00798 
00799       WriterIdSeq pubids;
00800       set_to_seq(rids, pubids);
00801 
00802       switch (notice) {
00803       case DISCONNECTED:
00804         trl->notify_subscription_disconnected(pubids);
00805         break;
00806 
00807       case RECONNECTED:
00808         trl->notify_subscription_reconnected(pubids);
00809         break;
00810 
00811       case LOST:
00812         trl->notify_subscription_lost(pubids);
00813         break;
00814 
00815       default:
00816         ACE_ERROR((LM_ERROR,
00817                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00818                    ACE_TEXT("unknown notice to datareader.\n")));
00819         break;
00820       }
00821 
00822     } else {
00823       if (Transport_debug_level > 0) {
00824         GuidConverter converter(itr->first);
00825         ACE_DEBUG((LM_DEBUG,
00826                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00827                    ACE_TEXT("not notify sub %C subscription lost.\n"),
00828                    OPENDDS_STRING(converter).c_str()));
00829       }
00830 
00831     }
00832   }
00833 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataLink::notify_reactor ( void   )  [private]

Definition at line 215 of file DataLink.cpp.

References impl_, ACE_Reactor::notify(), ACE_Event_Handler::reactor(), and OpenDDS::DCPS::TransportImpl::reactor_task().

Referenced by cancel_release(), and schedule_stop().

00216 {
00217   TransportReactorTask_rch reactor(impl_.reactor_task());
00218   reactor->get_reactor()->notify(this);
00219 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
RepoIdSet  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
ReceiveListenerSet_rch  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
TransportReceiveListener_wrch  ,
GUID_tKeyLessThan   
) [private]

Map subscription Id value to TransportReceieveListener.

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
TransportSendListener_wrch  ,
GUID_tKeyLessThan   
) [private]

Map publication Id value to TransportSendListener.

OpenDDS::DCPS::DataLink::OPENDDS_VECTOR ( OnStartCallback   )  [protected]
GUIDSeq * OpenDDS::DCPS::DataLink::peer_ids ( const RepoId local_id  )  const [protected]

For a given local RepoId (publication or subscription), return the list of remote peer RepoIds (subscriptions or publications) that this link knows about due to make_reservation().

Definition at line 350 of file DataLink.cpp.

References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::set_to_seq().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), and OpenDDS::DCPS::RtpsUdpDataLink::get_locators().

00351 {
00352   GuardType guard(pub_sub_maps_lock_);
00353 
00354   const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00355 
00356   if (iter == assoc_by_local_.end())
00357     return 0;
00358 
00359   GUIDSeq_var result = new GUIDSeq;
00360   set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00361   return result._retn();
00362 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::pre_stop_i (  )  [virtual]

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::TcpDataLink.

Definition at line 838 of file DataLink.cpp.

References thr_per_con_send_task_.

Referenced by stop().

00839 {
00840   if (this->thr_per_con_send_task_ != 0) {
00841     this->thr_per_con_send_task_->close(1);
00842   }
00843 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::prepare_release (  )  [private]

Save current sub and pub association maps for releasing and create empty maps for new associations.

Definition at line 887 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, assoc_releasing_, LM_ERROR, and pub_sub_maps_lock_.

Referenced by release_resources().

00888 {
00889   GuardType guard(this->pub_sub_maps_lock_);
00890 
00891   if (!assoc_releasing_.empty()) {
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00894                ACE_TEXT("already prepared for release.\n")));
00895     return;
00896   }
00897 
00898   assoc_releasing_ = assoc_by_local_;
00899 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE TransportReceiveListener_rch OpenDDS::DCPS::DataLink::recv_listener_for ( const RepoId sub_id  )  const [private]

Definition at line 357 of file DataLink.inl.

References recv_listeners_.

Referenced by clear_associations().

00358 {
00359   // sub_map_ (and recv_listeners_) are already locked when entering this
00360   // private method.
00361   IdToRecvListenerMap::const_iterator found =
00362     this->recv_listeners_.find(sub_id);
00363   if (found == this->recv_listeners_.end()) {
00364     return TransportReceiveListener_rch();
00365   }
00366   return found->second.lock();
00367 }

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::DataLink::release_remote_i ( const RepoId  )  [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::MulticastDataLink, and OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 326 of file DataLink.h.

Referenced by release_reservations().

00326 {}

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::release_reservations ( RepoId  remote_id,
RepoId  local_id,
DataLinkSetMap &  released_locals 
)

This will release reservations that were made by one of the make_reservation() methods. All we know is that the supplied RepoId is considered to be a remote id. It could be a remote subscriber or a remote publisher.

This gets invoked when a TransportClient::remove_associations() call has been made. Because this DataLink can be shared amongst different TransportClient objects, and different threads could be "managing" the different TransportClient objects, we need to make sure that this release_reservations() works in conjunction with a simultaneous call (in another thread) to one of this DataLink's make_reservation() methods.

Definition at line 372 of file DataLink.cpp.

References ACE_TEXT(), assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, impl_, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::release_datalink(), release_remote_i(), release_reservations_i(), and VDBG_LVL.

00374 {
00375   DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00376 
00377   if (DCPS_debug_level > 9) {
00378     GuidConverter local(local_id);
00379     GuidConverter remote(remote_id);
00380     ACE_DEBUG((LM_DEBUG,
00381                ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00382                ACE_TEXT("releasing association local: %C ")
00383                ACE_TEXT("<--> with remote %C.\n"),
00384                OPENDDS_STRING(local).c_str(),
00385                OPENDDS_STRING(remote).c_str()));
00386   }
00387 
00388   //let the specific class release its reservations
00389   //done this way to prevent deadlock of holding pub_sub_maps_lock_
00390   //then obtaining a specific class lock in release_reservations_i
00391   //which reverses lock ordering of the active send logic of needing
00392   //the specific class lock before obtaining the over arching DataLink
00393   //pub_sub_maps_lock_
00394   this->release_reservations_i(remote_id, local_id);
00395 
00396   bool release_remote_required = false;
00397   {
00398     GuardType guard(this->pub_sub_maps_lock_);
00399 
00400     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00401     if (rls->size() == 1) {
00402       assoc_by_remote_.erase(remote_id);
00403       release_remote_required = true;
00404     } else {
00405       rls->remove(local_id);
00406     }
00407     RepoIdSet& ris = assoc_by_local_[local_id];
00408     if (ris.size() == 1) {
00409       DataLinkSet_rch& links = released_locals[local_id];
00410       if (links.is_nil())
00411         links = make_rch<DataLinkSet>();
00412       links->insert_link(rchandle_from(this));
00413       assoc_by_local_.erase(local_id);
00414     } else {
00415       ris.erase(remote_id);
00416     }
00417 
00418     if (assoc_by_local_.empty()) {
00419       VDBG_LVL((LM_DEBUG,
00420                 ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00421                 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00422 
00423       impl_.release_datalink(this);
00424     }
00425   }
00426   if (release_remote_required)
00427     release_remote_i(remote_id);
00428 }

Here is the call graph for this function:

virtual void OpenDDS::DCPS::DataLink::release_reservations_i ( const RepoId ,
const RepoId  
) [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 327 of file DataLink.h.

Referenced by release_reservations().

00328                                                                   {}

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataLink::release_resources (  ) 

Definition at line 846 of file DataLink.cpp.

References DBG_ENTRY_LVL, impl_, prepare_release(), and OpenDDS::DCPS::TransportImpl::release_link_resources().

00847 {
00848   DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00849 
00850   this->prepare_release();
00851   return impl_.release_link_resources(this);
00852 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 211 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.

00212 {
00213   DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00214 
00215   // This one is easy.  Simply delegate to our TransportSendStrategy
00216   // data member.
00217 
00218   TransportSendStrategy_rch strategy;
00219   {
00220     GuardType guard(this->strategy_lock_);
00221 
00222     strategy = this->send_strategy_;
00223   }
00224 
00225   if (!strategy.is_nil()) {
00226     strategy->remove_all_msgs(pub_id);
00227   }
00228 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_listener ( const RepoId local_id  ) 

Either send or receive listener for this local_id should be removed from internal DataLink structures so it no longer receives events.

Definition at line 308 of file DataLink.inl.

References ACE_TEXT(), LM_DEBUG, OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_listeners_, and OpenDDS::DCPS::Transport_debug_level.

00309 {
00310   GuardType guard(pub_sub_maps_lock_);
00311   {
00312     IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
00313     if (pos != send_listeners_.end()) {
00314       send_listeners_.erase(pos);
00315       if (Transport_debug_level > 5) {
00316         GuidConverter converter(local_id);
00317         ACE_DEBUG((LM_DEBUG,
00318                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00319                    ACE_TEXT("removed %C from send_listeners\n"),
00320                    OPENDDS_STRING(converter).c_str()));
00321       }
00322       return;
00323     }
00324   }
00325   {
00326     IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
00327     if (pos != recv_listeners_.end()) {
00328       recv_listeners_.erase(pos);
00329       if (Transport_debug_level > 5) {
00330         GuidConverter converter(local_id);
00331         ACE_DEBUG((LM_DEBUG,
00332                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00333                    ACE_TEXT("removed %C from recv_listeners\n"),
00334                    OPENDDS_STRING(converter).c_str()));
00335       }
00336       return;
00337     }
00338   }
00339 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataLink::remove_on_start_callback ( const TransportClient_wrch client,
const RepoId remote 
)

Definition at line 119 of file DataLink.cpp.

References OpenDDS::DCPS::remove(), and strategy_lock_.

00120 {
00121   GuardType guard(strategy_lock_);
00122   on_start_callbacks_.erase(
00123     std::remove(on_start_callbacks_.begin(),
00124                 on_start_callbacks_.end(),
00125                 std::make_pair(client, remote)),
00126     on_start_callbacks_.end());
00127 }

Here is the call graph for this function:

ACE_INLINE RemoveResult OpenDDS::DCPS::DataLink::remove_sample ( const DataSampleElement sample,
void *  context 
) [virtual]

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 183 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, send_strategy_, strategy_lock_, thr_per_con_send_task_, and VDBG.

00184 {
00185   DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00186 
00187   if (this->thr_per_con_send_task_ != 0) {
00188     const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00189     if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00190       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00191             "Removed sample from ThreadPerConnection queue.\n"));
00192       return rr;
00193     }
00194   }
00195 
00196   TransportSendStrategy_rch strategy;
00197   {
00198     GuardType guard(this->strategy_lock_);
00199 
00200     strategy = this->send_strategy_;
00201   }
00202 
00203   if (!strategy.is_nil()) {
00204     return strategy->remove_sample(sample, context);
00205   }
00206 
00207   return REMOVE_NOT_FOUND;
00208 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataLink::resume_send (  ) 

The resume_send is used in the case of reconnection on the subscriber's side.

Definition at line 255 of file DataLink.cpp.

References send_strategy_.

00256 {
00257   if (!this->send_strategy_->isDirectMode())
00258     this->send_strategy_->resume_send();
00259 }

void OpenDDS::DCPS::DataLink::schedule_delayed_release (  ) 

Definition at line 431 of file DataLink.cpp.

References datalink_release_delay_, DBG_ENTRY_LVL, ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, schedule_stop(), send_strategy_, and VDBG.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink().

00432 {
00433   DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00434 
00435   VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00436 
00437   // The samples have to be removed at this point, otherwise the samples
00438   // can not be delivered when new association is added and still use
00439   // this connection/datalink.
00440   if (!this->send_strategy_.is_nil()) {
00441     this->send_strategy_->clear();
00442   }
00443 
00444   ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00445   this->schedule_stop(future_release_time);
00446 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::schedule_stop ( const ACE_Time_Value schedule_to_stop_at  ) 

Definition at line 200 of file DataLink.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, notify_reactor(), scheduled_to_stop_at_, stopped_, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and schedule_delayed_release().

00201 {
00202   if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00203     this->scheduled_to_stop_at_ = schedule_to_stop_at;
00204     notify_reactor();
00205     // reactor will invoke our DataLink::handle_exception()
00206   } else {
00207     if (DCPS_debug_level > 0) {
00208       ACE_DEBUG((LM_DEBUG,
00209                  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00210     }
00211   }
00212 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send ( TransportQueueElement element  ) 

Definition at line 99 of file DataLink.inl.

References customize_queue_element(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, handle_send_request_ack(), OpenDDS::DCPS::TransportQueueElement::is_request_ack(), OpenDDS::DCPS::SEND, send_i(), and thr_per_con_send_task_.

Referenced by send_control().

00100 {
00101   DBG_ENTRY_LVL("DataLink","send",6);
00102 
00103   if (element->is_request_ack() &&
00104       this->handle_send_request_ack(element)) {
00105     return;
00106   }
00107 
00108   element = this->customize_queue_element(element);
00109   if (!element) {
00110     return;
00111   }
00112 
00113   if (this->thr_per_con_send_task_ != 0) {
00114     if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
00115       element->data_dropped(true);
00116     }
00117 
00118   } else {
00119     this->send_i(element);
00120 
00121   }
00122 }

Here is the call graph for this function:

Here is the caller graph for this function:

SendControlStatus OpenDDS::DCPS::DataLink::send_control ( const DataSampleHeader header,
Message_Block_Ptr  data 
)

This allows a subclass to send transport control samples over this DataLink. This is useful for sending transport-specific control messages between one or more endpoints under this DataLink's control.

Definition at line 516 of file DataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, send(), OpenDDS::DCPS::SEND_CONTROL_OK, send_response_listener_, send_start(), send_stop(), and OpenDDS::DCPS::SendResponseListener::track_message().

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00517 {
00518   DBG_ENTRY_LVL("DataLink", "send_control", 6);
00519 
00520   TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
00521                                        GUID_UNKNOWN, &send_response_listener_,
00522                                        header, move(message));
00523 
00524   send_response_listener_.track_message();
00525 
00526   RepoId senderId(header.publication_id_);
00527   send_start();
00528   send(elem);
00529   send_stop(senderId);
00530 
00531   return SEND_CONTROL_OK;
00532 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send_final_acks ( const RepoId readerid  )  [virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 387 of file DataLink.inl.

00388 { }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_i ( TransportQueueElement element,
bool  relink = true 
) [protected, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 125 of file DataLink.inl.

References OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, send_strategy_, and strategy_lock_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), send(), and OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message().

00126 {
00127   DBG_ENTRY_LVL("DataLink","send_i",6);
00128   // This one is easy.  Simply delegate to our TransportSendStrategy
00129   // data member.
00130 
00131   TransportSendStrategy_rch strategy;
00132   {
00133     GuardType guard(this->strategy_lock_);
00134 
00135     strategy = this->send_strategy_;
00136   }
00137 
00138   if (strategy) {
00139     strategy->send(element, relink);
00140   } else {
00141     element->data_dropped(true);
00142   }
00143 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE TransportSendListener_rch OpenDDS::DCPS::DataLink::send_listener_for ( const RepoId pub_id  )  const [private]

Definition at line 343 of file DataLink.inl.

References send_listeners_.

Referenced by clear_associations().

00344 {
00345   // pub_map_ (and send_listeners_) are already locked when entering this
00346   // private method.
00347   IdToSendListenerMap::const_iterator found =
00348     this->send_listeners_.find(pub_id);
00349   if (found == this->send_listeners_.end()) {
00350     return TransportSendListener_rch();
00351   }
00352   return found->second.lock();
00353 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start (  ) 

Called by the TransportClient objects that reference this DataLink. Used by the TransportClient to send a sample, or to send a control message. These functions either give the request to the PerThreadConnectionSendTask when thread_per_connection configuration is true or just simply delegate to the send strategy.

Definition at line 68 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_START, send_start_i(), and thr_per_con_send_task_.

Referenced by send_control().

00069 {
00070   DBG_ENTRY_LVL("DataLink","send_start",6);
00071 
00072   if (this->thr_per_con_send_task_ != 0) {
00073     this->thr_per_con_send_task_->add_request(SEND_START);
00074 
00075   } else
00076     this->send_start_i();
00077 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start_i (  )  [protected]

The implementation of the functions that accomplish the sample or control message delivery. IThey just simply delegate to the send strategy.

Definition at line 80 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_start().

00081 {
00082   DBG_ENTRY_LVL("DataLink","send_start_i",6);
00083   // This one is easy.  Simply delegate to our TransportSendStrategy
00084   // data member.
00085 
00086   TransportSendStrategy_rch strategy;
00087   {
00088     GuardType guard(this->strategy_lock_);
00089 
00090     strategy = this->send_strategy_;
00091   }
00092 
00093   if (!strategy.is_nil()) {
00094     strategy->send_start();
00095   }
00096 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop ( RepoId  repoId  ) 

Definition at line 146 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_STOP, send_stop_i(), and thr_per_con_send_task_.

Referenced by send_control().

00147 {
00148   DBG_ENTRY_LVL("DataLink","send_stop",6);
00149 
00150   if (this->thr_per_con_send_task_ != 0) {
00151     this->thr_per_con_send_task_->add_request(SEND_STOP);
00152 
00153   } else
00154     this->send_stop_i(repoId);
00155 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop_i ( RepoId  repoId  )  [protected]

Definition at line 158 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, and strategy_lock_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_stop().

00159 {
00160   DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00161   // This one is easy.  Simply delegate to our TransportSendStrategy
00162   // data member.
00163 
00164   TransportSendStrategy_rch strategy;
00165   {
00166     GuardType guard(this->strategy_lock_);
00167 
00168     strategy = this->send_strategy_;
00169   }
00170 
00171   if (!strategy.is_nil()) {
00172     strategy->send_stop(repoId);
00173   }
00174 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::set_dscp_codepoint ( int  cp,
ACE_SOCK socket 
)

The following IPV6 code was lifted in spirit from the RTCORBA implementation of setting the DiffServ codepoint.

Definition at line 949 of file DataLink.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, defined(), ACE_SOCK::get_local_addr(), ACE_Addr::get_type(), LM_DEBUG, LM_ERROR, and ACE_SOCK::set_option().

Referenced by OpenDDS::DCPS::UdpDataLink::open().

00950 {
00951   /**
00952    * The following IPV6 code was lifted in spirit from the RTCORBA
00953    * implementation of setting the DiffServ codepoint.
00954    */
00955   int result = 0;
00956 
00957   // Shift the code point up to bits, so that we only use the DS field
00958   int tos = cp << 2;
00959 
00960   const char* which = "IPV4 TOS";
00961 #if defined (ACE_HAS_IPV6)
00962   ACE_INET_Addr local_address;
00963 
00964   if (socket.get_local_addr(local_address) == -1) {
00965     return;
00966 
00967   } else if (local_address.get_type() == AF_INET6)
00968 #if !defined (IPV6_TCLASS)
00969   {
00970     if (DCPS_debug_level > 0) {
00971       ACE_ERROR((LM_ERROR,
00972                  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00973                  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00974                  cp));
00975     }
00976 
00977     return;
00978   }
00979 
00980 #else /* IPV6_TCLASS */
00981   {
00982     which = "IPV6 TCLASS";
00983     result = socket.set_option(
00984                IPPROTO_IPV6,
00985                IPV6_TCLASS,
00986                &tos,
00987                sizeof(tos));
00988 
00989   } else // This is a bit tricky and might be hard to follow...
00990 
00991 #endif /* IPV6_TCLASS */
00992 #endif /* ACE_HAS_IPV6 */
00993 
00994 #ifdef IP_TOS
00995   result = socket.set_option(
00996              IPPROTO_IP,
00997              IP_TOS,
00998              &tos,
00999              sizeof(tos));
01000 
01001   if ((result == -1) && (errno != ENOTSUP)
01002 #ifdef WSAEINVAL
01003       && (errno != WSAEINVAL)
01004 #endif
01005      ) {
01006 #endif // IP_TOS
01007     ACE_DEBUG((LM_DEBUG,
01008                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01009                ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01010                ACE_TEXT("try running as superuser.\n"),
01011                which,
01012                cp));
01013 #ifdef IP_TOS
01014   } else if (DCPS_debug_level > 4) {
01015     ACE_DEBUG((LM_DEBUG,
01016                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01017                ACE_TEXT("set %C codepoint to %d.\n"),
01018                which,
01019                cp));
01020   }
01021 #endif
01022 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::set_scheduling_release ( bool  scheduling_release  ) 

Definition at line 177 of file DataLink.inl.

References scheduling_release_.

Referenced by cancel_release(), OpenDDS::DCPS::TcpTransport::release_datalink(), and transport_shutdown().

00178 {
00179   this->scheduling_release_ = scheduling_release;
00180 }

Here is the caller graph for this function:

ACE_INLINE int OpenDDS::DCPS::DataLink::start ( const TransportSendStrategy_rch send_strategy,
const TransportStrategy_rch receive_strategy 
) [protected]

This is how the subclass "announces" to this DataLink base class that this DataLink has now been "connected" and should start the supplied strategy objects. This start method is also going to keep a "copy" of the references to the strategy objects. Also note that it is acceptable to pass-in a NULL (0) TransportReceiveStrategy*, but it is assumed that the TransportSendStrategy* argument is not NULL.

If the start() method fails to start either strategy, then a -1 is returned. Otherwise, a 0 is returned. In the failure case, if one of the strategy objects was started successfully, then it will be stopped before the start() method returns -1.

Definition at line 238 of file DataLink.inl.

References DBG_ENTRY_LVL, invoke_on_start_callbacks(), OpenDDS::DCPS::RcHandle< T >::is_nil(), receive_strategy_, send_strategy_, started_, and strategy_lock_.

Referenced by OpenDDS::DCPS::TcpDataLink::connect(), OpenDDS::DCPS::MulticastDataLink::join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ShmemDataLink::open(), and OpenDDS::DCPS::RtpsUdpDataLink::open().

00240 {
00241   DBG_ENTRY_LVL("DataLink","start",6);
00242 
00243   // We assume that the send_strategy is not NULL, but the receive_strategy
00244   // is allowed to be NULL.
00245 
00246   // Attempt to start the strategies, and if there is a start() failure,
00247   // make sure to stop() any strategy that was already start()'ed.
00248   if (send_strategy->start() != 0) {
00249     // Failed to start the TransportSendStrategy.
00250     invoke_on_start_callbacks(false);
00251     return -1;
00252   }
00253 
00254   if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00255     // Failed to start the TransportReceiveStrategy.
00256 
00257     // Remember to stop() the TransportSendStrategy since we did start it,
00258     // and now need to "undo" that action.
00259     send_strategy->stop();
00260     invoke_on_start_callbacks(false);
00261     return -1;
00262   }
00263 
00264   // We started both strategy objects.  Save them to data members since
00265   // we will now take ownership of them.
00266   {
00267     GuardType guard(this->strategy_lock_);
00268 
00269     this->send_strategy_    = send_strategy;
00270     this->receive_strategy_ = receive_strategy;
00271   }
00272   invoke_on_start_callbacks(true);
00273   {
00274     //catch any associations added during initial invoke_on_start_callbacks
00275     //only after first use_datalink has resolved does datalink's state truly
00276     //change to started, thus can't let pending associations proceed normally yet
00277     GuardType guard(this->strategy_lock_);
00278     this->started_ = true;
00279   }
00280   //Now state transitioned to started so no new on_start_callbacks will be added
00281   //so resolve any added during transition to started.
00282   invoke_on_start_callbacks(true);
00283   return 0;
00284 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::stop ( void   ) 

The stop method is used to stop the DataLink prior to shutdown.

Definition at line 222 of file DataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::is_nil(), pre_stop_i(), receive_strategy_, OpenDDS::DCPS::RcHandle< T >::reset(), scheduled_to_stop_at_, send_strategy_, stop_i(), stopped_, strategy_lock_, and ACE_Time_Value::zero.

Referenced by handle_exception(), handle_timeout(), OpenDDS::DCPS::UdpTransport::release_datalink(), OpenDDS::DCPS::ShmemTransport::release_datalink(), and transport_shutdown().

00223 {
00224   this->pre_stop_i();
00225 
00226   TransportSendStrategy_rch send_strategy;
00227   TransportStrategy_rch recv_strategy;
00228 
00229   {
00230     GuardType guard(this->strategy_lock_);
00231 
00232     if (this->stopped_) return;
00233 
00234     send_strategy = this->send_strategy_;
00235     this->send_strategy_.reset();
00236 
00237     recv_strategy = this->receive_strategy_;
00238     this->receive_strategy_.reset();
00239   }
00240 
00241   if (!send_strategy.is_nil()) {
00242     send_strategy->stop();
00243   }
00244 
00245   if (!recv_strategy.is_nil()) {
00246     recv_strategy->stop();
00247   }
00248 
00249   this->stop_i();
00250   this->stopped_ = true;
00251   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00252 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::stop_i (  )  [protected, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, OpenDDS::DCPS::TcpDataLink, and OpenDDS::DCPS::UdpDataLink.

Definition at line 470 of file DataLink.cpp.

References DBG_ENTRY_LVL.

Referenced by stop().

00471 {
00472   DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00473 }

Here is the caller graph for this function:

GUIDSeq * OpenDDS::DCPS::DataLink::target_intersection ( const RepoId pub_id,
const GUIDSeq in,
size_t n_subs 
)

For a given publication "pub_id", store the total number of corresponding subscriptions in "n_subs" and given a set of subscriptions (the "in" sequence), return the subset of the input set "in" which are targets of this DataLink (see is_target()).

Definition at line 862 of file DataLink.cpp.

References assoc_by_local_, len, pub_sub_maps_lock_, and OpenDDS::DCPS::push_back().

00864 {
00865   GUIDSeq_var res;
00866   GuardType guard(this->pub_sub_maps_lock_);
00867   AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00868 
00869   if (iter != assoc_by_local_.end()) {
00870     n_subs = iter->second.size();
00871     const CORBA::ULong len = in.length();
00872 
00873     for (CORBA::ULong i(0); i < len; ++i) {
00874       if (iter->second.count(in[i])) {
00875         if (res.ptr() == 0) {
00876           res = new GUIDSeq;
00877         }
00878 
00879         push_back(res.inout(), in[i]);
00880       }
00881     }
00882   }
00883 
00884   return res._retn();
00885 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLink::terminate_send (  ) 

Definition at line 301 of file DataLink.inl.

References send_strategy_.

00302 {
00303   this->send_strategy_->terminate_send(false);
00304 }

ACE_INLINE Priority OpenDDS::DCPS::DataLink::transport_priority (  )  const

Definition at line 28 of file DataLink.inl.

References transport_priority_.

00029 {
00030   return this->transport_priority_;
00031 }

ACE_INLINE Priority & OpenDDS::DCPS::DataLink::transport_priority (  ) 

Accessors for the TRANSPORT_PRIORITY value associated with this link.

Definition at line 21 of file DataLink.inl.

References transport_priority_.

Referenced by OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ThreadPerConnectionSendTask::open(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00022 {
00023   return this->transport_priority_;
00024 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLink::transport_shutdown (  ) 

Our TransportImpl will inform us if it is being shutdown() by calling this method.

Definition at line 674 of file DataLink.cpp.

References ACE_Reactor_Timer_Interface::cancel_timer(), DBG_ENTRY_LVL, impl_, ACE_Event_Handler::reactor(), scheduled_to_stop_at_, set_scheduling_release(), stop(), OpenDDS::DCPS::TransportImpl::timer(), and ACE_Time_Value::zero.

00675 {
00676   DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00677 
00678   //this->cancel_release();
00679   this->set_scheduling_release(false);
00680   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00681 
00682   ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00683   reactor->cancel_timer(this);
00684 
00685   this->stop();
00686   // this->send_listeners_.clear();
00687   // this->recv_listeners_.clear();
00688   // Drop our reference to the TransportImpl object
00689 }

Here is the call graph for this function:


Friends And Related Function Documentation

friend class DataLinkCleanupTask [friend]

Definition at line 69 of file DataLink.h.

OpenDDS_Dcps_Export std::ostream& operator<< ( std::ostream &  str,
const DataLink value 
) [friend]

Convenience function for diagnostic information.

friend class ThreadPerConnectionSendTask [friend]

Definition at line 290 of file DataLink.h.

Referenced by DataLink().


Member Data Documentation

Definition at line 383 of file DataLink.h.

Referenced by clear_associations(), and prepare_release().

Configurable delay in milliseconds that the datalink should be released after all associations are removed.

Definition at line 402 of file DataLink.h.

Referenced by DataLink(), datalink_release_delay(), and schedule_delayed_release().

Definition at line 407 of file DataLink.h.

Referenced by create_control(), and DataLink().

If default_listener_ is not null and this DataLink receives a sample from a publication GUID that's not in pub_map_, it will call data_received() on the default_listener_.

Definition at line 361 of file DataLink.h.

Referenced by data_received_i(), and default_listener().

The id for this DataLink.

Definition at line 375 of file DataLink.h.

Referenced by DataLink(), and id().

A (smart) pointer to the TransportImpl that created this DataLink.

Definition at line 372 of file DataLink.h.

Referenced by handle_exception(), handle_timeout(), impl(), notify_reactor(), release_reservations(), release_resources(), and transport_shutdown().

Is pub or sub ?

Definition at line 412 of file DataLink.h.

Referenced by is_active(), and OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().

Is remote attached to same transport ?

Definition at line 410 of file DataLink.h.

Referenced by is_loopback(), and OpenDDS::DCPS::UdpDataLink::open().

Allocators for data and message blocks used by transport control samples when send_control is called.

Definition at line 406 of file DataLink.h.

Referenced by create_control(), and DataLink().

IdToRecvListenerMap OpenDDS::DCPS::DataLink::recv_listeners_ [private]

Definition at line 356 of file DataLink.h.

Referenced by make_reservation(), notify(), recv_listener_for(), and remove_listener().

Definition at line 388 of file DataLink.h.

Referenced by cancel_release(), handle_exception(), and set_scheduling_release().

IdToSendListenerMap OpenDDS::DCPS::DataLink::send_listeners_ [private]

Definition at line 352 of file DataLink.h.

Referenced by make_reservation(), notify(), remove_listener(), and send_listener_for().

Listener for TransportSendControlElements created in send_control.

Definition at line 416 of file DataLink.h.

Referenced by send_control().

Definition at line 413 of file DataLink.h.

Referenced by add_on_start_callback(), and start().

A boolean indicating if the DataLink has been stopped. This value is protected by the strategy_lock_.

Definition at line 347 of file DataLink.h.

Referenced by cancel_release(), schedule_stop(), and stop().

The task used to do the sending. This ThreadPerConnectionSendTask object is created when the thread_per_connection configuration is true. It only dedicate to this datalink.

Definition at line 380 of file DataLink.h.

Referenced by DataLink(), pre_stop_i(), remove_sample(), send(), send_start(), send_stop(), and ~DataLink().

TRANSPORT_PRIORITY value associated with the link.

Definition at line 386 of file DataLink.h.

Referenced by transport_priority().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1