OpenDDS::DCPS::DataLink Class Reference

#include <DataLink.h>

Inheritance diagram for OpenDDS::DCPS::DataLink:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataLink:

Collaboration graph
[legend]
List of all members.

Public Types

typedef std::pair< TransportClient *,
RepoId
OnStartCallback
 DISCONNECTED
 RECONNECTED
 LOST
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }

Public Member Functions

 DataLink (TransportImpl *impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object.
virtual ~DataLink ()
int handle_exception (ACE_HANDLE)
void schedule_stop (ACE_Time_Value &schedule_to_stop_at)
void stop ()
 The stop method is used to stop the DataLink prior to shutdown.
void resume_send ()
int make_reservation (const RepoId &remote_subscription_id, const RepoId &local_publication_id, TransportSendListener *send_listener)
int make_reservation (const RepoId &remote_publication_id, const RepoId &local_subcription_id, TransportReceiveListener *receive_listener)
void release_reservations (RepoId remote_id, RepoId local_id, DataLinkSetMap &released_locals)
void schedule_delayed_release ()
const ACE_Time_Value & datalink_release_delay () const
void remove_listener (const RepoId &local_id)
void send_start ()
void send (TransportQueueElement *element)
void send_stop (RepoId repoId)
RemoveResult remove_sample (const DataSampleElement *sample)
void remove_all_msgs (RepoId pub_id)
int data_received (ReceivedDataSample &sample, const RepoId &readerId=GUID_UNKNOWN)
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object.
void transport_shutdown ()
void notify (ConnectionNotice notice)
void notify_connection_deleted ()
virtual bool issues_on_deleted_callback () const
virtual void pre_stop_i ()
bool release_resources ()
void terminate_send ()
bool is_target (const RepoId &remote_sub_id)
void clear_associations ()
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
Prioritytransport_priority ()
Priority transport_priority () const
bool & is_loopback ()
bool is_loopback () const
bool & is_active ()
bool is_active () const
bool cancel_release ()
ACE_Message_Block * create_control (char submessage_id, DataSampleHeader &header, ACE_Message_Block *data)
SendControlStatus send_control (const DataSampleHeader &header, ACE_Message_Block *data)
GUIDSeqtarget_intersection (const RepoId &pub_id, const GUIDSeq &in, size_t &n_subs)
TransportImpl_rch impl () const
void default_listener (TransportReceiveListener *trl)
TransportReceiveListenerdefault_listener () const
bool add_on_start_callback (TransportClient *client, const RepoId &remote)
void remove_on_start_callback (TransportClient *client, const RepoId &remote)
void invoke_on_start_callbacks (bool success)
void set_scheduling_release (bool scheduling_release)
virtual void send_final_acks (const RepoId &readerid)

Protected Types

typedef ACE_Guard< LockTypeGuardType

Protected Member Functions

int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy)
virtual void stop_i ()
void send_start_i ()
virtual void send_i (TransportQueueElement *element, bool relink=true)
void send_stop_i (RepoId repoId)
GUIDSeqpeer_ids (const RepoId &local_id) const
 OPENDDS_VECTOR (OnStartCallback) on_start_callbacks_

Static Protected Member Functions

static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods.

Protected Attributes

TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink.
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
LockType strategy_lock_
ACE_Time_Value datalink_release_delay_
TransportSendControlElementAllocatorsend_control_allocator_
MessageBlockAllocatormb_allocator_
DataBlockAllocatordb_allocator_
bool is_loopback_
 Is remote attached to same transport ?
bool is_active_
 Is pub or sub ?
bool started_
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control.

Private Types

typedef ACE_SYNCH_MUTEX LockType

Private Member Functions

const char * connection_notice_as_str (ConnectionNotice notice)
 Helper function to output the enum as a string to help debugging.
TransportSendListenersend_listener_for (const RepoId &pub_id) const
TransportReceiveListenerrecv_listener_for (const RepoId &sub_id) const
void prepare_release ()
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
virtual void release_remote_i (const RepoId &)
virtual void release_reservations_i (const RepoId &, const RepoId &)
void data_received_i (ReceivedDataSample &sample, const RepoId &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
typedef OPENDDS_MAP_CMP (RepoId, TransportSendListener *, GUID_tKeyLessThan) IdToSendListenerMap
 Map publication Id value to TransportSendListener.
typedef OPENDDS_MAP_CMP (RepoId, TransportReceiveListener *, GUID_tKeyLessThan) IdToRecvListenerMap
 Map subscription Id value to TransportReceieveListener.
typedef OPENDDS_MAP_CMP (RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote
typedef OPENDDS_MAP_CMP (RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal

Private Attributes

bool stopped_
ACE_Time_Value scheduled_to_stop_at_
IdToSendListenerMap send_listeners_
IdToRecvListenerMap recv_listeners_
TransportReceiveListenerdefault_listener_
LockType pub_sub_maps_lock_
AssocByRemote assoc_by_remote_
AssocByLocal assoc_by_local_
LockType released_assoc_by_local_lock_
AssocByLocal released_assoc_by_local_
TransportImpl_rch impl_
 A (smart) pointer to the TransportImpl that created this DataLink.
ACE_UINT64 id_
 The id for this DataLink.
ThreadPerConnectionSendTaskthr_per_con_send_task_
AssocByLocal assoc_releasing_
Priority transport_priority_
 TRANSPORT_PRIORITY value associated with the link.
bool scheduling_release_

Friends

class DataLinkCleanupTask
class ThreadPerConnectionSendTask
OpenDDS_Dcps_Export std::ostream & operator<< (std::ostream &str, const DataLink &value)
 Convenience function for diagnostic information.

Detailed Description

This class manages the reservations based on the associations between datareader and datawriter. It basically delegate the samples to send strategy for sending and deliver the samples received by receive strategy to the listener.

Notes about object ownership: 1) Own the send strategy object and receive strategy object. 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection is enabled.

Definition at line 62 of file DataLink.h.


Member Typedef Documentation

typedef ACE_Guard<LockType> OpenDDS::DCPS::DataLink::GuardType [protected]

Definition at line 389 of file DataLink.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLink::LockType [private]

Definition at line 331 of file DataLink.h.

typedef std::pair<TransportClient*, RepoId> OpenDDS::DCPS::DataLink::OnStartCallback

Definition at line 247 of file DataLink.h.


Member Enumeration Documentation

enum OpenDDS::DCPS::DataLink::ConnectionNotice

Enumerator:
DISCONNECTED 
RECONNECTED 
LOST 

Definition at line 70 of file DataLink.h.

00070                         {
00071     DISCONNECTED,
00072     RECONNECTED,
00073     LOST
00074   };


Constructor & Destructor Documentation

OpenDDS::DCPS::DataLink::DataLink ( TransportImpl impl,
Priority  priority,
bool  is_loopback,
bool  is_active 
)

Only called by our TransportImpl object.

A DataLink object is always created by a TransportImpl object. Thus, the TransportImpl object passed-in here is the object that created this DataLink. The ability to specify a priority for individual links is included for construction so its value can be available for activating any threads.

Definition at line 41 of file DataLink.cpp.

References datalink_release_delay_, db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_next_datalink_id(), id_, impl(), impl_, mb_allocator_, send_control_allocator_, thr_per_con_send_task_, and ThreadPerConnectionSendTask.

00043   : stopped_(false),
00044     scheduled_to_stop_at_(ACE_Time_Value::zero),
00045     default_listener_(0),
00046     thr_per_con_send_task_(0),
00047     transport_priority_(priority),
00048     scheduling_release_(false),
00049     send_control_allocator_(0),
00050     mb_allocator_(0),
00051     db_allocator_(0),
00052     is_loopback_(is_loopback),
00053     is_active_(is_active),
00054     started_(false),
00055     send_response_listener_("DataLink")
00056 {
00057   DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00058 
00059   impl->_add_ref();
00060   this->impl_ = impl;
00061 
00062   datalink_release_delay_.sec(this->impl_->config_->datalink_release_delay_ / 1000);
00063   datalink_release_delay_.usec(this->impl_->config_->datalink_release_delay_ % 1000 * 1000);
00064 
00065   id_ = DataLink::get_next_datalink_id();
00066 
00067   if (this->impl_->config_->thread_per_connection_) {
00068     this->thr_per_con_send_task_ = new ThreadPerConnectionSendTask(this);
00069 
00070     if (this->thr_per_con_send_task_->open() == -1) {
00071       ACE_ERROR((LM_ERROR,
00072                  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00073                  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00074 
00075     } else if (DCPS_debug_level > 4) {
00076       ACE_DEBUG((LM_DEBUG,
00077                  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00078                  ACE_TEXT("started new thread to send data with.\n")));
00079     }
00080   }
00081 
00082   // Initialize transport control sample allocators:
00083   size_t control_chunks = this->impl_->config_->datalink_control_chunks_;
00084 
00085   this->send_control_allocator_ =
00086     new TransportSendControlElementAllocator(control_chunks);
00087 
00088   this->mb_allocator_ = new MessageBlockAllocator(control_chunks);
00089   this->db_allocator_ = new DataBlockAllocator(control_chunks);
00090 }

OpenDDS::DCPS::DataLink::~DataLink (  )  [virtual]

Definition at line 92 of file DataLink.cpp.

References assoc_by_local_, OpenDDS::DCPS::ThreadPerConnectionSendTask::close(), db_allocator_, DBG_ENTRY_LVL, mb_allocator_, send_control_allocator_, and thr_per_con_send_task_.

00093 {
00094   DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00095 
00096   if (!assoc_by_local_.empty()) {
00097     ACE_DEBUG((LM_WARNING,
00098                ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00099                ACE_TEXT("link still in use by %d entities when deleted!\n"),
00100                this, assoc_by_local_.size()));
00101   }
00102 
00103   delete this->db_allocator_;
00104   delete this->mb_allocator_;
00105 
00106   delete this->send_control_allocator_;
00107 
00108   if (this->thr_per_con_send_task_ != 0) {
00109     this->thr_per_con_send_task_->close(1);
00110     delete this->thr_per_con_send_task_;
00111   }
00112 }


Member Function Documentation

ACE_INLINE bool OpenDDS::DCPS::DataLink::add_on_start_callback ( TransportClient client,
const RepoId remote 
)

Definition at line 285 of file DataLink.inl.

References OpenDDS::DCPS::RcHandle< T >::is_nil(), send_strategy_, started_, and strategy_lock_.

00286 {
00287   GuardType guard(strategy_lock_);
00288 
00289   if (started_ && !send_strategy_.is_nil()) {
00290     return false; // link already started
00291   }
00292   on_start_callbacks_.push_back(std::make_pair(client, remote));
00293 
00294   return true;
00295 }

bool OpenDDS::DCPS::DataLink::cancel_release (  ) 

Definition at line 441 of file DataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, scheduled_to_stop_at_, scheduling_release_, set_scheduling_release(), and stopped_.

00442 {
00443   DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00444   if (stopped_) {
00445     if (DCPS_debug_level > 0) {
00446       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00447     }
00448     return false;
00449   }
00450   if (scheduling_release_) {
00451     if (DCPS_debug_level > 0) {
00452       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00453     }
00454     this->set_scheduling_release(false);
00455     this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00456     TransportReactorTask_rch reactor(this->impl_->reactor_task());
00457     reactor->get_reactor()->notify(this);
00458   }
00459   return true;
00460 }

void OpenDDS::DCPS::DataLink::clear_associations (  ) 

This is called by DataLinkCleanupTask thread to remove the associations based on the snapshot in release_resources().

Definition at line 915 of file DataLink.cpp.

References assoc_releasing_, recv_listener_for(), OpenDDS::DCPS::TransportReceiveListener::remove_associations(), send_listener_for(), and OpenDDS::DCPS::set_to_seq().

00916 {
00917   for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00918        iter != assoc_releasing_.end(); ++iter) {
00919     TransportSendListener* const tsl = send_listener_for(iter->first);
00920     if (tsl) {
00921       ReaderIdSeq sub_ids;
00922       set_to_seq(iter->second, sub_ids);
00923       tsl->remove_associations(sub_ids, false);
00924       continue;
00925     }
00926     TransportReceiveListener* const trl = recv_listener_for(iter->first);
00927     if (trl) {
00928       WriterIdSeq pub_ids;
00929       set_to_seq(iter->second, pub_ids);
00930       trl->remove_associations(pub_ids, false);
00931     }
00932   }
00933   assoc_releasing_.clear();
00934 }

ACE_INLINE const char * OpenDDS::DCPS::DataLink::connection_notice_as_str ( ConnectionNotice  notice  )  [private]

Helper function to output the enum as a string to help debugging.

Definition at line 310 of file DataLink.inl.

Referenced by notify().

00311 {
00312   static const char* NoticeStr[] = { "DISCONNECTED",
00313                                      "RECONNECTED",
00314                                      "LOST"
00315                                    };
00316 
00317   return NoticeStr [notice];
00318 }

ACE_Message_Block * OpenDDS::DCPS::DataLink::create_control ( char  submessage_id,
DataSampleHeader header,
ACE_Message_Block *  data 
)

This allows a subclass to easily create a transport control sample to send via send_control.

Definition at line 469 of file DataLink.cpp.

References db_allocator_, DBG_ENTRY_LVL, header, mb_allocator_, and OpenDDS::DCPS::TRANSPORT_CONTROL.

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00472 {
00473   DBG_ENTRY_LVL("DataLink", "create_control", 6);
00474 
00475   header.byte_order_ = ACE_CDR_BYTE_ORDER;
00476   header.message_id_ = TRANSPORT_CONTROL;
00477   header.submessage_id_ = submessage_id;
00478   header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00479 
00480   ACE_Message_Block* message;
00481   ACE_NEW_MALLOC_RETURN(message,
00482                         static_cast<ACE_Message_Block*>(
00483                           this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00484                         ACE_Message_Block(header.max_marshaled_size(),
00485                                           ACE_Message_Block::MB_DATA,
00486                                           data,
00487                                           0,  // data
00488                                           0,  // allocator_strategy
00489                                           0,  // locking_strategy
00490                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00491                                           ACE_Time_Value::zero,
00492                                           ACE_Time_Value::max_time,
00493                                           this->db_allocator_,
00494                                           this->mb_allocator_),
00495                         0);
00496 
00497   *message << header;
00498 
00499   return message;
00500 }

virtual TransportQueueElement* OpenDDS::DCPS::DataLink::customize_queue_element ( TransportQueueElement element  )  [inline, private, virtual]

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 316 of file DataLink.h.

Referenced by send().

00318   {
00319     return element;
00320   }

int OpenDDS::DCPS::DataLink::data_received ( ReceivedDataSample sample,
const RepoId readerId = GUID_UNKNOWN 
)

This is called by our TransportReceiveStrategy object when it has received a complete data sample. This method will cause the appropriate TransportReceiveListener objects to be told that data_received(). If readerId is not GUID_UNKNOWN, only the TransportReceiveListener with that ID (if one exists) will receive the data.

Definition at line 527 of file DataLink.cpp.

References data_received_i(), and OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::MulticastDataLink::sample_received().

00529 {
00530   data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00531   return 0;
00532 }

void OpenDDS::DCPS::DataLink::data_received_i ( ReceivedDataSample sample,
const RepoId readerId,
const RepoIdSet &  incl_excl,
ReceiveListenerSet::ConstrainReceiveSet  constrain 
) [private]

Definition at line 541 of file DataLink.cpp.

References assoc_by_remote_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::DataSampleHeader::content_filter_entries_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, default_listener_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceiveListenerSet::SET_EXCLUDED, OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by data_received(), and data_received_include().

00545 {
00546   DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00547   // Which remote publication sent this message?
00548   const RepoId& publication_id = sample.header_.publication_id_;
00549 
00550   // Locate the set of TransportReceiveListeners associated with this
00551   // DataLink that are interested in hearing about any samples received
00552   // from the remote publisher_id.
00553   if (DCPS_debug_level > 9) {
00554     const GuidConverter converter(publication_id);
00555     const GuidConverter reader(readerId);
00556     ACE_DEBUG((LM_DEBUG,
00557                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00558                ACE_TEXT("from publication %C received sample: %C to readerId %C (%s).\n"),
00559                OPENDDS_STRING(converter).c_str(),
00560                to_string(sample.header_).c_str(),
00561                OPENDDS_STRING(reader).c_str(),
00562                constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00563   }
00564 
00565   if (Transport_debug_level > 9) {
00566     const GuidConverter converter(publication_id);
00567     ACE_DEBUG((LM_DEBUG,
00568                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569                ACE_TEXT("from publication %C received sample: %C.\n"),
00570                OPENDDS_STRING(converter).c_str(),
00571                to_string(sample.header_).c_str()));
00572   }
00573 
00574   ReceiveListenerSet_rch listener_set;
00575   {
00576     GuardType guard(this->pub_sub_maps_lock_);
00577     AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00578     if (iter != assoc_by_remote_.end())
00579       listener_set = iter->second;
00580 
00581     if (listener_set.is_nil() && this->default_listener_) {
00582       this->default_listener_->data_received(sample);
00583       return;
00584     }
00585   }
00586 
00587   if (listener_set.is_nil()) {
00588     // Nobody has any interest in this message.  Drop it on the floor.
00589     if (Transport_debug_level > 4) {
00590       const GuidConverter converter(publication_id);
00591       ACE_DEBUG((LM_DEBUG,
00592                  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00593                  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00594                  OPENDDS_STRING(converter).c_str()));
00595     }
00596 
00597     return;
00598   }
00599 
00600   if (readerId != GUID_UNKNOWN) {
00601     listener_set->data_received(sample, readerId);
00602     return;
00603   }
00604 
00605 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00606 
00607   if (sample.header_.content_filter_
00608       && sample.header_.content_filter_entries_.length()) {
00609     ReceiveListenerSet subset(*listener_set.in());
00610     subset.remove_all(sample.header_.content_filter_entries_);
00611     subset.data_received(sample, incl_excl, constrain);
00612 
00613   } else {
00614 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00615 
00616     if (DCPS_debug_level > 9) {
00617       // Just get the set to do our dirty work by having it iterate over its
00618       // collection of TransportReceiveListeners, and invoke the data_received()
00619       // method on each one.
00620       OPENDDS_STRING included_ids;
00621       bool first = true;
00622       RepoIdSet::const_iterator iter = incl_excl.begin();
00623       while(iter != incl_excl.end()) {
00624         included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00625         first = false;
00626         ++iter;
00627       }
00628       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %s ids:%C\n",
00629                  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00630     }
00631     listener_set->data_received(sample, incl_excl, constrain);
00632 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00633   }
00634 
00635 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00636 }

void OpenDDS::DCPS::DataLink::data_received_include ( ReceivedDataSample sample,
const RepoIdSet &  incl 
)

Varation of data_received() that allows for excluding a subset of readers by specifying which readers specifically should receive. Any reader ID that does not appear in the include set will be skipped.

Definition at line 535 of file DataLink.cpp.

References data_received_i(), OpenDDS::DCPS::GUID_UNKNOWN, and OpenDDS::DCPS::ReceiveListenerSet::SET_INCLUDED.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample().

00536 {
00537   data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00538 }

ACE_INLINE const ACE_Time_Value & OpenDDS::DCPS::DataLink::datalink_release_delay (  )  const

Definition at line 68 of file DataLink.inl.

References datalink_release_delay_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00069 {
00070   return this->datalink_release_delay_;
00071 }

ACE_INLINE TransportReceiveListener * OpenDDS::DCPS::DataLink::default_listener (  )  const

Definition at line 398 of file DataLink.inl.

References default_listener_.

00399 {
00400   GuardType guard(this->pub_sub_maps_lock_);
00401   return this->default_listener_;
00402 }

ACE_INLINE void OpenDDS::DCPS::DataLink::default_listener ( TransportReceiveListener trl  ) 

Definition at line 390 of file DataLink.inl.

References default_listener_.

00391 {
00392   GuardType guard(this->pub_sub_maps_lock_);
00393   this->default_listener_ = trl;
00394 }

ACE_UINT64 OpenDDS::DCPS::DataLink::get_next_datalink_id (  )  [static, protected]

Used to provide unique Ids to all DataLink methods.

Definition at line 640 of file DataLink.cpp.

References id().

Referenced by DataLink().

00641 {
00642   static ACE_UINT64 next_id = 0;
00643   static LockType lock;
00644 
00645   ACE_UINT64 id;
00646   {
00647     GuardType guard(lock);
00648     id = next_id++;
00649 
00650     if (0 == next_id) {
00651       ACE_ERROR((LM_ERROR,
00652                  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00653                  ACE_TEXT("has rolled over and is reusing ids!\n")));
00654     }
00655   }
00656 
00657   return id;
00658 }

int OpenDDS::DCPS::DataLink::handle_close ( ACE_HANDLE  h,
ACE_Reactor_Mask  m 
)

Definition at line 952 of file DataLink.cpp.

References handle_timeout().

00953 {
00954   if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00955     // Reactor is shutting down with this timer still pending.
00956     // Take the same cleanup actions as if the timeout had expired.
00957     handle_timeout(ACE_Time_Value::zero, 0);
00958   }
00959 
00960   return 0;
00961 }

int OpenDDS::DCPS::DataLink::handle_exception ( ACE_HANDLE   ) 

Definition at line 142 of file DataLink.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), OpenDDS::DCPS::DCPS_debug_level, handle_timeout(), impl_, scheduled_to_stop_at_, and stop().

00143 {
00144   if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00145     if (DCPS_debug_level > 0) {
00146       ACE_DEBUG((LM_DEBUG,
00147                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00148     }
00149     if (this->impl_ != 0) {
00150       ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00151       if (reactor->cancel_timer(this) > 0) {
00152         if (DCPS_debug_level > 0) {
00153           ACE_DEBUG((LM_DEBUG,
00154                      ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00155         }
00156       }
00157     } else {
00158       if (DCPS_debug_level > 0) {
00159         ACE_DEBUG((LM_DEBUG,
00160                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - impl_ == 0\n")));
00161       }
00162     }
00163     this->_remove_ref();
00164     return 0;
00165   } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00166     if (this->scheduling_release_) {
00167       if (DCPS_debug_level > 0) {
00168         ACE_DEBUG((LM_DEBUG,
00169                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00170       }
00171       this->handle_timeout(ACE_Time_Value::zero, 0);
00172       return 0;
00173     }
00174     if (DCPS_debug_level > 0) {
00175       ACE_DEBUG((LM_DEBUG,
00176                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00177     }
00178     this->stop();
00179     this->_remove_ref();
00180     return 0;
00181   } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
00182     if (DCPS_debug_level > 0) {
00183       ACE_DEBUG((LM_DEBUG,
00184                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00185     }
00186     this->_add_ref();
00187     ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00188     ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00189     reactor->schedule_timer(this, 0, future_release_time);
00190   }
00191   return 0;
00192 }

int OpenDDS::DCPS::DataLink::handle_timeout ( const ACE_Time_Value &  tv,
const void *  arg 
)

Definition at line 937 of file DataLink.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), assoc_by_local_, assoc_by_remote_, impl_, and VDBG_LVL.

Referenced by handle_close(), and handle_exception().

00938 {
00939   if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00940     VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00941     this->impl_->unbind_link(this);
00942 
00943     if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00944       this->stop();
00945     }
00946   }
00947   this->_remove_ref();
00948   return 0;
00949 }

ACE_INLINE DataLinkIdType OpenDDS::DCPS::DataLink::id (  )  const

Obtain a unique identifier for this DataLink object.

Definition at line 228 of file DataLink.inl.

References DBG_ENTRY_LVL, and id_.

Referenced by get_next_datalink_id().

00229 {
00230   DBG_ENTRY_LVL("DataLink","id",6);
00231   return id_;
00232 }

TransportImpl_rch OpenDDS::DCPS::DataLink::impl (  )  const

Definition at line 115 of file DataLink.cpp.

References impl_.

Referenced by OpenDDS::DCPS::UdpDataLink::control_received(), DataLink(), OpenDDS::DCPS::ShmemDataLink::local_address(), OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::ShmemDataLink::signal_semaphore().

00116 {
00117   return impl_;
00118 }

void OpenDDS::DCPS::DataLink::invoke_on_start_callbacks ( bool  success  ) 

Definition at line 121 of file DataLink.cpp.

References strategy_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received(), and start().

00122 {
00123   const DataLink_rch link(success ? this : 0, false);
00124 
00125   while (true) {
00126     GuardType guard(strategy_lock_);
00127 
00128     if (on_start_callbacks_.empty()) {
00129       break;
00130     }
00131 
00132     OnStartCallback last_callback = on_start_callbacks_.back();
00133     on_start_callbacks_.pop_back();
00134 
00135     guard.release();
00136     last_callback.first->use_datalink(last_callback.second, link);
00137   }
00138 }

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_active (  )  const

Definition at line 62 of file DataLink.inl.

References is_active_.

00063 {
00064   return this->is_active_;
00065 }

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_active (  ) 

Definition at line 55 of file DataLink.inl.

References is_active_.

Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::TcpTransport::release_datalink(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00056 {
00057   return this->is_active_;
00058 }

ACE_INLINE bool OpenDDS::DCPS::DataLink::is_loopback (  )  const

Definition at line 48 of file DataLink.inl.

References is_loopback_.

00049 {
00050   return this->is_loopback_;
00051 }

ACE_INLINE bool & OpenDDS::DCPS::DataLink::is_loopback (  ) 

Definition at line 41 of file DataLink.inl.

References is_loopback_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00042 {
00043   return this->is_loopback_;
00044 }

bool OpenDDS::DCPS::DataLink::is_target ( const RepoId remote_sub_id  ) 

This is called on publisher side to see if this link communicates with the provided sub.

Definition at line 869 of file DataLink.cpp.

References assoc_by_remote_.

00870 {
00871   GuardType guard(this->pub_sub_maps_lock_);
00872   return assoc_by_remote_.count(remote_sub_id);
00873 }

ACE_INLINE bool OpenDDS::DCPS::DataLink::issues_on_deleted_callback (  )  const [virtual]

Reimplemented in OpenDDS::DCPS::TcpDataLink.

Definition at line 20 of file DataLink.inl.

00021 {
00022   return false;
00023 }

int OpenDDS::DCPS::DataLink::make_reservation ( const RepoId remote_publication_id,
const RepoId local_subcription_id,
TransportReceiveListener receive_listener 
)

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Definition at line 296 of file DataLink.cpp.

References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pub_sub_maps_lock_, recv_listeners_, send_strategy_, and strategy_lock_.

00299 {
00300   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00301 
00302   if (DCPS_debug_level > 9) {
00303     GuidConverter local(local_subscription_id), remote(remote_publication_id);
00304     ACE_DEBUG((LM_DEBUG,
00305                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00306                ACE_TEXT("creating association local subscription %C ")
00307                ACE_TEXT("<--> with remote publication  %C.\n"),
00308                OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00309   }
00310 
00311   {
00312     GuardType guard(strategy_lock_);
00313 
00314     if (!send_strategy_.is_nil()) {
00315       send_strategy_->link_released(false);
00316     }
00317   }
00318   {
00319     GuardType guard(pub_sub_maps_lock_);
00320 
00321     assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00322     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00323 
00324     if (rls.is_nil())
00325       rls = new ReceiveListenerSet;
00326     rls->insert(local_subscription_id, receive_listener);
00327 
00328     recv_listeners_[local_subscription_id] = receive_listener;
00329   }
00330   return 0;
00331 }

int OpenDDS::DCPS::DataLink::make_reservation ( const RepoId remote_subscription_id,
const RepoId local_publication_id,
TransportSendListener send_listener 
)

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Definition at line 257 of file DataLink.cpp.

References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, pub_sub_maps_lock_, send_listeners_, send_strategy_, and strategy_lock_.

00260 {
00261   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00262 
00263   if (DCPS_debug_level > 9) {
00264     GuidConverter local(local_publication_id), remote(remote_subscription_id);
00265     ACE_DEBUG((LM_DEBUG,
00266                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00267                ACE_TEXT("creating association local publication  %C ")
00268                ACE_TEXT("<--> with remote subscription %C.\n"),
00269                OPENDDS_STRING(local).c_str(),
00270                OPENDDS_STRING(remote).c_str()));
00271   }
00272 
00273   {
00274     GuardType guard(strategy_lock_);
00275 
00276     if (!send_strategy_.is_nil()) {
00277       send_strategy_->link_released(false);
00278     }
00279   }
00280   {
00281     GuardType guard(pub_sub_maps_lock_);
00282 
00283     assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00284     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00285 
00286     if (rls.is_nil())
00287       rls = new ReceiveListenerSet;
00288     rls->insert(local_publication_id, 0);
00289 
00290     send_listeners_[local_publication_id] = send_listener;
00291   }
00292   return 0;
00293 }

void OpenDDS::DCPS::DataLink::notify ( ConnectionNotice  notice  ) 

Notify the datawriters and datareaders that the connection is disconnected, lost, or reconnected. The datareader/datawriter will notify the corresponding listener.

Definition at line 682 of file DataLink.cpp.

References assoc_by_local_, connection_notice_as_str(), DBG_ENTRY_LVL, DISCONNECTED, LOST, OPENDDS_STRING, RECONNECTED, recv_listeners_, send_listeners_, OpenDDS::DCPS::set_to_seq(), OpenDDS::DCPS::Transport_debug_level, and VDBG.

00683 {
00684   DBG_ENTRY_LVL("DataLink", "notify", 6);
00685 
00686   VDBG((LM_DEBUG,
00687         ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00688         this,
00689         connection_notice_as_str(notice)));
00690 
00691   GuardType guard(this->pub_sub_maps_lock_);
00692 
00693   // Notify the datawriters
00694   // the lost publications due to a connection problem.
00695   for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00696        itr != send_listeners_.end(); ++itr) {
00697 
00698     TransportSendListener* tsl = itr->second;
00699 
00700     if (tsl != 0) {
00701       if (Transport_debug_level > 0) {
00702         GuidConverter converter(itr->first);
00703         ACE_DEBUG((LM_DEBUG,
00704                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00705                    ACE_TEXT("notify pub %C %C.\n"),
00706                    OPENDDS_STRING(converter).c_str(),
00707                    connection_notice_as_str(notice)));
00708       }
00709       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00710       if (local_it == assoc_by_local_.end()) {
00711         if (Transport_debug_level) {
00712           GuidConverter converter(itr->first);
00713           ACE_DEBUG((LM_DEBUG,
00714                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00715                      ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00716                      OPENDDS_STRING(converter).c_str(),
00717                      connection_notice_as_str(notice)));
00718         }
00719         break;
00720       }
00721       const RepoIdSet& rids = local_it->second;
00722 
00723       ReaderIdSeq subids;
00724       set_to_seq(rids, subids);
00725 
00726       switch (notice) {
00727       case DISCONNECTED:
00728         tsl->notify_publication_disconnected(subids);
00729         break;
00730 
00731       case RECONNECTED:
00732         tsl->notify_publication_reconnected(subids);
00733         break;
00734 
00735       case LOST:
00736         tsl->notify_publication_lost(subids);
00737         break;
00738 
00739       default:
00740         ACE_ERROR((LM_ERROR,
00741                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00742                    ACE_TEXT("unknown notice to TransportSendListener\n")));
00743         break;
00744       }
00745 
00746     } else {
00747       if (Transport_debug_level > 0) {
00748         GuidConverter converter(itr->first);
00749         ACE_DEBUG((LM_DEBUG,
00750                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00751                    ACE_TEXT("not notify pub %C %C \n"),
00752                    OPENDDS_STRING(converter).c_str(),
00753                    connection_notice_as_str(notice)));
00754       }
00755     }
00756 
00757   }
00758 
00759   // Notify the datareaders registered with TransportImpl
00760   // the lost subscriptions due to a connection problem.
00761   for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00762        itr != recv_listeners_.end(); ++itr) {
00763 
00764     TransportReceiveListener* trl = itr->second;
00765 
00766     if (trl != 0) {
00767       if (Transport_debug_level > 0) {
00768         GuidConverter converter(itr->first);
00769         ACE_DEBUG((LM_DEBUG,
00770                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00771                    ACE_TEXT("notify sub %C %C.\n"),
00772                    OPENDDS_STRING(converter).c_str(),
00773                    connection_notice_as_str(notice)));
00774       }
00775       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00776       if (local_it == assoc_by_local_.end()) {
00777         if (Transport_debug_level) {
00778           GuidConverter converter(itr->first);
00779           ACE_DEBUG((LM_DEBUG,
00780                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00781                      ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00782                      OPENDDS_STRING(converter).c_str(),
00783                      connection_notice_as_str(notice)));
00784         }
00785         break;
00786       }
00787       const RepoIdSet& rids = local_it->second;
00788 
00789       WriterIdSeq pubids;
00790       set_to_seq(rids, pubids);
00791 
00792       switch (notice) {
00793       case DISCONNECTED:
00794         trl->notify_subscription_disconnected(pubids);
00795         break;
00796 
00797       case RECONNECTED:
00798         trl->notify_subscription_reconnected(pubids);
00799         break;
00800 
00801       case LOST:
00802         trl->notify_subscription_lost(pubids);
00803         break;
00804 
00805       default:
00806         ACE_ERROR((LM_ERROR,
00807                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00808                    ACE_TEXT("unknown notice to datareader.\n")));
00809         break;
00810       }
00811 
00812     } else {
00813       if (Transport_debug_level > 0) {
00814         GuidConverter converter(itr->first);
00815         ACE_DEBUG((LM_DEBUG,
00816                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00817                    ACE_TEXT("not notify sub %C subscription lost.\n"),
00818                    OPENDDS_STRING(converter).c_str()));
00819       }
00820 
00821     }
00822   }
00823 }

void OpenDDS::DCPS::DataLink::notify_connection_deleted (  ) 

Definition at line 825 of file DataLink.cpp.

References OpenDDS::DCPS::TransportReceiveListener::notify_connection_deleted(), OpenDDS::DCPS::TransportSendListener::notify_connection_deleted(), recv_listener_for(), released_assoc_by_local_, and send_listener_for().

00826 {
00827   GuardType guard(this->released_assoc_by_local_lock_);
00828   for (AssocByLocal::iterator iter = released_assoc_by_local_.begin();
00829        iter != released_assoc_by_local_.end(); ++iter) {
00830       TransportSendListener* const tsl = send_listener_for(iter->first);
00831       if (tsl) {
00832         for (RepoIdSet::iterator ris_it = iter->second.begin();
00833              ris_it != iter->second.end(); ++ris_it) {
00834           tsl->notify_connection_deleted(*ris_it);
00835         }
00836         iter->second.clear();
00837         continue;
00838       }
00839       TransportReceiveListener* const trl = recv_listener_for(iter->first);
00840       if (trl) {
00841         for (RepoIdSet::iterator ris_it = iter->second.begin();
00842              ris_it != iter->second.end(); ++ris_it) {
00843           trl->notify_connection_deleted(*ris_it);
00844         }
00845         iter->second.clear();
00846       }
00847   }
00848 }

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
RepoIdSet  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
ReceiveListenerSet_rch  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
TransportReceiveListener ,
GUID_tKeyLessThan   
) [private]

Map subscription Id value to TransportReceieveListener.

typedef OpenDDS::DCPS::DataLink::OPENDDS_MAP_CMP ( RepoId  ,
TransportSendListener ,
GUID_tKeyLessThan   
) [private]

Map publication Id value to TransportSendListener.

OpenDDS::DCPS::DataLink::OPENDDS_VECTOR ( OnStartCallback   )  [protected]

GUIDSeq * OpenDDS::DCPS::DataLink::peer_ids ( const RepoId local_id  )  const [protected]

For a given local RepoId (publication or subscription), return the list of remote peer RepoIds (subscriptions or publications) that this link knows about due to make_reservation().

Definition at line 344 of file DataLink.cpp.

References assoc_by_local_, pub_sub_maps_lock_, and OpenDDS::DCPS::set_to_seq().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::get_locators(), and OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos().

00345 {
00346   GuardType guard(pub_sub_maps_lock_);
00347 
00348   const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00349 
00350   if (iter == assoc_by_local_.end())
00351     return 0;
00352 
00353   GUIDSeq_var result = new GUIDSeq;
00354   set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00355   return result._retn();
00356 }

void OpenDDS::DCPS::DataLink::pre_stop_i (  )  [virtual]

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink, and OpenDDS::DCPS::TcpDataLink.

Definition at line 851 of file DataLink.cpp.

References OpenDDS::DCPS::ThreadPerConnectionSendTask::close(), and thr_per_con_send_task_.

Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i(), and stop().

00852 {
00853   if (this->thr_per_con_send_task_ != 0) {
00854     this->thr_per_con_send_task_->close(1);
00855   }
00856 }

void OpenDDS::DCPS::DataLink::prepare_release (  )  [private]

Save current sub and pub association maps for releasing and create empty maps for new associations.

Definition at line 901 of file DataLink.cpp.

References assoc_by_local_, and assoc_releasing_.

Referenced by release_resources().

00902 {
00903   GuardType guard(this->pub_sub_maps_lock_);
00904 
00905   if (!assoc_releasing_.empty()) {
00906     ACE_ERROR((LM_ERROR,
00907                ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00908                ACE_TEXT("already prepared for release.\n")));
00909     return;
00910   }
00911 
00912   assoc_releasing_ = assoc_by_local_;
00913 }

ACE_INLINE TransportReceiveListener * OpenDDS::DCPS::DataLink::recv_listener_for ( const RepoId sub_id  )  const [private]

Definition at line 376 of file DataLink.inl.

References recv_listeners_.

Referenced by clear_associations(), and notify_connection_deleted().

00377 {
00378   // sub_map_ (and recv_listeners_) are already locked when entering this
00379   // private method.
00380   IdToRecvListenerMap::const_iterator found =
00381     this->recv_listeners_.find(sub_id);
00382   if (found == this->recv_listeners_.end()) {
00383     return 0;
00384   }
00385   return found->second;
00386 }

virtual void OpenDDS::DCPS::DataLink::release_remote_i ( const RepoId  )  [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::MulticastDataLink, and OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 322 of file DataLink.h.

Referenced by release_reservations().

00322 {}

void OpenDDS::DCPS::DataLink::release_reservations ( RepoId  remote_id,
RepoId  local_id,
DataLinkSetMap &  released_locals 
)

This will release reservations that were made by one of the make_reservation() methods. All we know is that the supplied RepoId is considered to be a remote id. It could be a remote subscriber or a remote publisher.

Definition at line 366 of file DataLink.cpp.

References assoc_by_local_, assoc_by_remote_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, impl_, OPENDDS_STRING, release_remote_i(), release_reservations_i(), released_assoc_by_local_, and VDBG_LVL.

00368 {
00369   DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00370 
00371   if (DCPS_debug_level > 9) {
00372     GuidConverter local(local_id);
00373     GuidConverter remote(remote_id);
00374     ACE_DEBUG((LM_DEBUG,
00375                ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00376                ACE_TEXT("releasing association local: %C ")
00377                ACE_TEXT("<--> with remote %C.\n"),
00378                OPENDDS_STRING(local).c_str(),
00379                OPENDDS_STRING(remote).c_str()));
00380   }
00381 
00382   //let the specific class release its reservations
00383   //done this way to prevent deadlock of holding pub_sub_maps_lock_
00384   //then obtaining a specific class lock in release_reservations_i
00385   //which reverses lock ordering of the active send logic of needing
00386   //the specific class lock before obtaining the over arching DataLink
00387   //pub_sub_maps_lock_
00388   this->release_reservations_i(remote_id, local_id);
00389 
00390   GuardType guard(this->pub_sub_maps_lock_);
00391 
00392   ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00393   if (rls->size() == 1) {
00394     assoc_by_remote_.erase(remote_id);
00395     release_remote_i(remote_id);
00396   } else {
00397     rls->remove(local_id);
00398   }
00399   RepoIdSet& ris = assoc_by_local_[local_id];
00400   if (ris.size() == 1) {
00401     DataLinkSet_rch& links = released_locals[local_id];
00402     if (links.is_nil())
00403       links = new DataLinkSet;
00404     links->insert_link(this);
00405     {
00406       GuardType guard(this->released_assoc_by_local_lock_);
00407       released_assoc_by_local_[local_id].insert(remote_id);
00408     }
00409     assoc_by_local_.erase(local_id);
00410   } else {
00411     ris.erase(remote_id);
00412   }
00413 
00414   if (assoc_by_local_.empty()) {
00415     VDBG_LVL((LM_DEBUG,
00416               ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00417               ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00418     this->impl_->release_datalink(this);
00419   }
00420 }

virtual void OpenDDS::DCPS::DataLink::release_reservations_i ( const RepoId ,
const RepoId  
) [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 323 of file DataLink.h.

Referenced by release_reservations().

00324                                                                   {}

bool OpenDDS::DCPS::DataLink::release_resources (  ) 

Definition at line 859 of file DataLink.cpp.

References DBG_ENTRY_LVL, impl_, and prepare_release().

00860 {
00861   DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00862 
00863   this->prepare_release();
00864 
00865   return impl_->release_link_resources(this);
00866 }

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 208 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.

00209 {
00210   DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00211 
00212   // This one is easy.  Simply delegate to our TransportSendStrategy
00213   // data member.
00214 
00215   TransportSendStrategy_rch strategy;
00216   {
00217     GuardType guard(this->strategy_lock_);
00218 
00219     strategy = this->send_strategy_;
00220   }
00221 
00222   if (!strategy.is_nil()) {
00223     strategy->remove_all_msgs(pub_id);
00224   }
00225 }

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_listener ( const RepoId local_id  ) 

Either send or receive listener for this local_id should be removed from internal DataLink structures so it no longer receives events.

Definition at line 329 of file DataLink.inl.

References OPENDDS_STRING, recv_listeners_, send_listeners_, and OpenDDS::DCPS::Transport_debug_level.

00330 {
00331   GuardType guard(this->pub_sub_maps_lock_);
00332   {
00333     IdToSendListenerMap::iterator pos = this->send_listeners_.find(local_id);
00334     if (pos != this->send_listeners_.end()) {
00335       this->send_listeners_.erase(pos);
00336       if (Transport_debug_level > 5) {
00337         GuidConverter converter(local_id);
00338         ACE_DEBUG((LM_DEBUG,
00339                    ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from send_listeners\n"),
00340                    OPENDDS_STRING(converter).c_str()));
00341       }
00342       return;
00343     }
00344   }
00345   {
00346     IdToRecvListenerMap::iterator pos = this->recv_listeners_.find(local_id);
00347     if (pos != this->recv_listeners_.end()) {
00348       this->recv_listeners_.erase(pos);
00349       if (Transport_debug_level > 5) {
00350         GuidConverter converter(local_id);
00351         ACE_DEBUG((LM_DEBUG,
00352                    ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from recv_listeners\n"),
00353                    OPENDDS_STRING(converter).c_str()));
00354       }
00355       return;
00356     }
00357   }
00358 }

ACE_INLINE void OpenDDS::DCPS::DataLink::remove_on_start_callback ( TransportClient client,
const RepoId remote 
)

Definition at line 299 of file DataLink.inl.

References OpenDDS::DCPS::remove(), and strategy_lock_.

00300 {
00301   GuardType guard(strategy_lock_);
00302   on_start_callbacks_.erase(std::remove(on_start_callbacks_.begin(),
00303                                         on_start_callbacks_.end(),
00304                                         std::make_pair(client, remote)),
00305                             on_start_callbacks_.end());
00306 }

ACE_INLINE RemoveResult OpenDDS::DCPS::DataLink::remove_sample ( const DataSampleElement sample  ) 

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Definition at line 180 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample(), send_strategy_, thr_per_con_send_task_, and VDBG.

00181 {
00182   DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00183 
00184   if (this->thr_per_con_send_task_ != 0) {
00185     const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00186     if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00187       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00188             "Removed sample from ThreadPerConnection queue.\n"));
00189       return rr;
00190     }
00191   }
00192 
00193   TransportSendStrategy_rch strategy;
00194   {
00195     GuardType guard(this->strategy_lock_);
00196 
00197     strategy = this->send_strategy_;
00198   }
00199 
00200   if (!strategy.is_nil()) {
00201     return strategy->remove_sample(sample);
00202   }
00203 
00204   return REMOVE_NOT_FOUND;
00205 }

void OpenDDS::DCPS::DataLink::resume_send (  ) 

The resume_send is used in the case of reconnection on the subscriber's side.

Definition at line 250 of file DataLink.cpp.

00251 {
00252   if (!this->send_strategy_->isDirectMode())
00253     this->send_strategy_->resume_send();
00254 }

void OpenDDS::DCPS::DataLink::schedule_delayed_release (  ) 

Definition at line 423 of file DataLink.cpp.

References datalink_release_delay_, DBG_ENTRY_LVL, schedule_stop(), and VDBG.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink().

00424 {
00425   DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00426 
00427   VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00428 
00429   // The samples have to be removed at this point, otherwise the samples
00430   // can not be delivered when new association is added and still use
00431   // this connection/datalink.
00432   if (!this->send_strategy_.is_nil()) {
00433     this->send_strategy_->clear();
00434   }
00435 
00436   ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00437   this->schedule_stop(future_release_time);
00438 }

void OpenDDS::DCPS::DataLink::schedule_stop ( ACE_Time_Value &  schedule_to_stop_at  ) 

Definition at line 198 of file DataLink.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::DCPS_debug_level, and scheduled_to_stop_at_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and schedule_delayed_release().

00199 {
00200   if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00201     //Add ref before handing to the reactor
00202     //ref removed in handle_exception or in handle_timeout based on stopping now or delayed
00203     this->_add_ref();
00204     this->scheduled_to_stop_at_ = schedule_to_stop_at;
00205     TransportReactorTask_rch reactor(this->impl_->reactor_task());
00206     reactor->get_reactor()->notify(this);
00207     // reactor will invoke our DataLink::handle_exception()
00208   } else {
00209     if (DCPS_debug_level > 0) {
00210       ACE_DEBUG((LM_DEBUG,
00211                  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00212     }
00213   }
00214 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send ( TransportQueueElement element  ) 

Definition at line 105 of file DataLink.inl.

References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), customize_queue_element(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND, send_i(), and thr_per_con_send_task_.

Referenced by send_control().

00106 {
00107   DBG_ENTRY_LVL("DataLink","send",6);
00108 
00109   element = this->customize_queue_element(element);
00110   if (!element) {
00111     return;
00112   }
00113 
00114   if (this->thr_per_con_send_task_ != 0) {
00115     this->thr_per_con_send_task_->add_request(SEND, element);
00116 
00117   } else {
00118     this->send_i(element);
00119 
00120   }
00121 }

SendControlStatus OpenDDS::DCPS::DataLink::send_control ( const DataSampleHeader header,
ACE_Message_Block *  data 
)

This allows a subclass to send transport control samples over this DataLink. This is useful for sending transport-specific control messages between one or more endpoints under this DataLink's control.

Definition at line 503 of file DataLink.cpp.

References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, OpenDDS::DCPS::GUID_UNKNOWN, header, send(), send_control_allocator_, OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::SEND_CONTROL_OK, send_response_listener_, send_start(), send_stop(), and OpenDDS::DCPS::SendResponseListener::track_message().

Referenced by OpenDDS::DCPS::MulticastSession::send_control(), and OpenDDS::DCPS::MulticastDataLink::syn_received_no_session().

00504 {
00505   DBG_ENTRY_LVL("DataLink", "send_control", 6);
00506 
00507   TransportSendControlElement* const elem =
00508     TransportSendControlElement::alloc(1, // initial_count
00509                                        GUID_UNKNOWN, &send_response_listener_,
00510                                        header, message, send_control_allocator_);
00511   if (!elem) return SEND_CONTROL_ERROR;
00512 
00513   send_response_listener_.track_message();
00514 
00515   RepoId senderId(header.publication_id_);
00516   send_start();
00517   send(elem);
00518   send_stop(senderId);
00519 
00520   return SEND_CONTROL_OK;
00521 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_final_acks ( const RepoId readerid  )  [virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 406 of file DataLink.inl.

00407 { }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_i ( TransportQueueElement element,
bool  relink = true 
) [protected, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpDataLink.

Definition at line 124 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and OpenDDS::DCPS::RtpsUdpDataLink::send_i().

00125 {
00126   DBG_ENTRY_LVL("DataLink","send_i",6);
00127   // This one is easy.  Simply delegate to our TransportSendStrategy
00128   // data member.
00129 
00130   TransportSendStrategy_rch strategy;
00131   {
00132     GuardType guard(this->strategy_lock_);
00133 
00134     strategy = this->send_strategy_;
00135   }
00136 
00137   if (!strategy.is_nil()) {
00138     strategy->send(element, relink);
00139   }
00140 }

ACE_INLINE TransportSendListener * OpenDDS::DCPS::DataLink::send_listener_for ( const RepoId pub_id  )  const [private]

Definition at line 362 of file DataLink.inl.

References send_listeners_.

Referenced by clear_associations(), and notify_connection_deleted().

00363 {
00364   // pub_map_ (and send_listeners_) are already locked when entering this
00365   // private method.
00366   IdToSendListenerMap::const_iterator found =
00367     this->send_listeners_.find(pub_id);
00368   if (found == this->send_listeners_.end()) {
00369     return 0;
00370   }
00371   return found->second;
00372 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start (  ) 

Called by the TransportClient objects that reference this DataLink. Used by the TransportClient to send a sample, or to send a control message. These functions either give the request to the PerThreadConnectionSendTask when thread_per_connection configuration is true or just simply delegate to the send strategy.

Definition at line 74 of file DataLink.inl.

References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_START, send_start_i(), and thr_per_con_send_task_.

Referenced by send_control().

00075 {
00076   DBG_ENTRY_LVL("DataLink","send_start",6);
00077 
00078   if (this->thr_per_con_send_task_ != 0) {
00079     this->thr_per_con_send_task_->add_request(SEND_START);
00080 
00081   } else
00082     this->send_start_i();
00083 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_start_i (  )  [protected]

The implementation of the functions that accomplish the sample or control message delivery. IThey just simply delegate to the send strategy.

Definition at line 86 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_start().

00087 {
00088   DBG_ENTRY_LVL("DataLink","send_start_i",6);
00089   // This one is easy.  Simply delegate to our TransportSendStrategy
00090   // data member.
00091 
00092   TransportSendStrategy_rch strategy;
00093   {
00094     GuardType guard(this->strategy_lock_);
00095 
00096     strategy = this->send_strategy_;
00097   }
00098 
00099   if (!strategy.is_nil()) {
00100     strategy->send_start();
00101   }
00102 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop ( RepoId  repoId  ) 

Definition at line 143 of file DataLink.inl.

References OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request(), DBG_ENTRY_LVL, OpenDDS::DCPS::SEND_STOP, send_stop_i(), and thr_per_con_send_task_.

Referenced by send_control().

00144 {
00145   DBG_ENTRY_LVL("DataLink","send_stop",6);
00146 
00147   if (this->thr_per_con_send_task_ != 0) {
00148     this->thr_per_con_send_task_->add_request(SEND_STOP);
00149 
00150   } else
00151     this->send_stop_i(repoId);
00152 }

ACE_INLINE void OpenDDS::DCPS::DataLink::send_stop_i ( RepoId  repoId  )  [protected]

Definition at line 155 of file DataLink.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), and send_strategy_.

Referenced by OpenDDS::DCPS::ThreadPerConnectionSendTask::execute(), and send_stop().

00156 {
00157   DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00158   // This one is easy.  Simply delegate to our TransportSendStrategy
00159   // data member.
00160 
00161   TransportSendStrategy_rch strategy = 0;
00162   {
00163     GuardType guard(this->strategy_lock_);
00164 
00165     strategy = this->send_strategy_;
00166   }
00167 
00168   if (!strategy.is_nil()) {
00169     strategy->send_stop(repoId);
00170   }
00171 }

void OpenDDS::DCPS::DataLink::set_dscp_codepoint ( int  cp,
ACE_SOCK &  socket 
)

Definition at line 964 of file DataLink.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

Referenced by OpenDDS::DCPS::UdpDataLink::open().

00965 {
00966   /**
00967    * The following IPV6 code was lifted in spirit from the RTCORBA
00968    * implementation of setting the DiffServ codepoint.
00969    */
00970   int result = 0;
00971 
00972   // Shift the code point up to bits, so that we only use the DS field
00973   int tos = cp << 2;
00974 
00975   const char* which = "IPV4 TOS";
00976 #if defined (ACE_HAS_IPV6)
00977   ACE_INET_Addr local_address;
00978 
00979   if (socket.get_local_addr(local_address) == -1) {
00980     return;
00981 
00982   } else if (local_address.get_type() == AF_INET6)
00983 #if !defined (IPV6_TCLASS)
00984   {
00985     if (DCPS_debug_level > 0) {
00986       ACE_ERROR((LM_ERROR,
00987                  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00988                  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00989                  cp));
00990     }
00991 
00992     return;
00993   }
00994 
00995 #else /* IPV6_TCLASS */
00996   {
00997     which = "IPV6 TCLASS";
00998     result = socket.set_option(
00999                IPPROTO_IPV6,
01000                IPV6_TCLASS,
01001                &tos,
01002                sizeof(tos));
01003 
01004   } else // This is a bit tricky and might be hard to follow...
01005 
01006 #endif /* IPV6_TCLASS */
01007 #endif /* ACE_HAS_IPV6 */
01008 
01009 #ifdef IP_TOS
01010   result = socket.set_option(
01011              IPPROTO_IP,
01012              IP_TOS,
01013              &tos,
01014              sizeof(tos));
01015 
01016   if ((result == -1) && (errno != ENOTSUP)
01017 #ifdef WSAEINVAL
01018       && (errno != WSAEINVAL)
01019 #endif
01020      ) {
01021 #endif // IP_TOS
01022     ACE_DEBUG((LM_DEBUG,
01023                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01024                ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01025                ACE_TEXT("try running as superuser.\n"),
01026                which,
01027                cp));
01028 #ifdef IP_TOS
01029   } else if (DCPS_debug_level > 4) {
01030     ACE_DEBUG((LM_DEBUG,
01031                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01032                ACE_TEXT("set %C codepoint to %d.\n"),
01033                which,
01034                cp));
01035   }
01036 #endif
01037 }

ACE_INLINE void OpenDDS::DCPS::DataLink::set_scheduling_release ( bool  scheduling_release  ) 

Definition at line 174 of file DataLink.inl.

References scheduling_release_.

Referenced by cancel_release(), OpenDDS::DCPS::TcpTransport::release_datalink(), and transport_shutdown().

00175 {
00176   this->scheduling_release_ = scheduling_release;
00177 }

ACE_INLINE int OpenDDS::DCPS::DataLink::start ( const TransportSendStrategy_rch send_strategy,
const TransportStrategy_rch receive_strategy 
) [protected]

This is how the subclass "announces" to this DataLink base class that this DataLink has now been "connected" and should start the supplied strategy objects. This start method is also going to keep a "copy" of the references to the strategy objects. Also note that it is acceptable to pass-in a NULL (0) TransportReceiveStrategy*, but it is assumed that the TransportSendStrategy* argument is not NULL.

If the start() method fails to start either strategy, then a -1 is returned. Otherwise, a 0 is returned. In the failure case, if one of the strategy objects was started successfully, then it will be stopped before the start() method returns -1.

Definition at line 235 of file DataLink.inl.

References DBG_ENTRY_LVL, invoke_on_start_callbacks(), OpenDDS::DCPS::RcHandle< T >::is_nil(), receive_strategy_, send_strategy_, and started_.

Referenced by OpenDDS::DCPS::MulticastDataLink::join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::ShmemDataLink::open(), and OpenDDS::DCPS::RtpsUdpDataLink::open().

00237 {
00238   DBG_ENTRY_LVL("DataLink","start",6);
00239 
00240   // We assume that the send_strategy is not NULL, but the receive_strategy
00241   // is allowed to be NULL.
00242 
00243   // Attempt to start the strategies, and if there is a start() failure,
00244   // make sure to stop() any strategy that was already start()'ed.
00245   if (send_strategy->start() != 0) {
00246     // Failed to start the TransportSendStrategy.
00247     invoke_on_start_callbacks(false);
00248     return -1;
00249   }
00250 
00251   if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00252     // Failed to start the TransportReceiveStrategy.
00253 
00254     // Remember to stop() the TransportSendStrategy since we did start it,
00255     // and now need to "undo" that action.
00256     send_strategy->stop();
00257     invoke_on_start_callbacks(false);
00258     return -1;
00259   }
00260 
00261   // We started both strategy objects.  Save them to data members since
00262   // we will now take ownership of them.
00263   {
00264     GuardType guard(this->strategy_lock_);
00265 
00266     this->send_strategy_    = send_strategy;
00267     this->receive_strategy_ = receive_strategy;
00268   }
00269   invoke_on_start_callbacks(true);
00270   {
00271     //catch any associations added during initial invoke_on_start_callbacks
00272     //only after first use_datalink has resolved does datalink's state truly
00273     //change to started, thus can't let pending associations proceed normally yet
00274     GuardType guard(this->strategy_lock_);
00275     this->started_ = true;
00276   }
00277   //Now state transitioned to started so no new on_start_callbacks will be added
00278   //so resolve any added during transition to started.
00279   invoke_on_start_callbacks(true);
00280   return 0;
00281 }

void OpenDDS::DCPS::DataLink::stop (  ) 

The stop method is used to stop the DataLink prior to shutdown.

Definition at line 217 of file DataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::is_nil(), pre_stop_i(), receive_strategy_, scheduled_to_stop_at_, send_strategy_, stop_i(), and stopped_.

Referenced by handle_exception(), OpenDDS::DCPS::UdpTransport::release_datalink(), OpenDDS::DCPS::ShmemTransport::release_datalink(), and transport_shutdown().

00218 {
00219   this->pre_stop_i();
00220 
00221   TransportSendStrategy_rch send_strategy;
00222   TransportStrategy_rch recv_strategy;
00223 
00224   {
00225     GuardType guard(this->strategy_lock_);
00226 
00227     if (this->stopped_) return;
00228 
00229     send_strategy = this->send_strategy_;
00230     this->send_strategy_ = 0;
00231 
00232     recv_strategy = this->receive_strategy_;
00233     this->receive_strategy_ = 0;
00234   }
00235 
00236   if (!send_strategy.is_nil()) {
00237     send_strategy->stop();
00238   }
00239 
00240   if (!recv_strategy.is_nil()) {
00241     recv_strategy->stop();
00242   }
00243 
00244   this->stop_i();
00245   this->stopped_ = true;
00246   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00247 }

void OpenDDS::DCPS::DataLink::stop_i (  )  [protected, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, OpenDDS::DCPS::TcpDataLink, and OpenDDS::DCPS::UdpDataLink.

Definition at line 463 of file DataLink.cpp.

References DBG_ENTRY_LVL.

Referenced by stop().

00464 {
00465   DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00466 }

GUIDSeq * OpenDDS::DCPS::DataLink::target_intersection ( const RepoId pub_id,
const GUIDSeq in,
size_t &  n_subs 
)

For a given publication "pub_id", store the total number of corresponding subscriptions in "n_subs" and given a set of subscriptions (the "in" sequence), return the subset of the input set "in" which are targets of this DataLink (see is_target()).

Definition at line 876 of file DataLink.cpp.

References assoc_by_local_, and OpenDDS::DCPS::push_back().

00878 {
00879   GUIDSeq_var res;
00880   GuardType guard(this->pub_sub_maps_lock_);
00881   AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00882 
00883   if (iter != assoc_by_local_.end()) {
00884     n_subs = iter->second.size();
00885     const CORBA::ULong len = in.length();
00886 
00887     for (CORBA::ULong i(0); i < len; ++i) {
00888       if (iter->second.count(in[i])) {
00889         if (res.ptr() == 0) {
00890           res = new GUIDSeq;
00891         }
00892 
00893         push_back(res.inout(), in[i]);
00894       }
00895     }
00896   }
00897 
00898   return res._retn();
00899 }

ACE_INLINE void OpenDDS::DCPS::DataLink::terminate_send (  ) 

Definition at line 322 of file DataLink.inl.

References send_strategy_.

00323 {
00324   this->send_strategy_->terminate_send(false);
00325 }

ACE_INLINE Priority OpenDDS::DCPS::DataLink::transport_priority (  )  const

Definition at line 34 of file DataLink.inl.

References transport_priority_.

00035 {
00036   return this->transport_priority_;
00037 }

ACE_INLINE Priority & OpenDDS::DCPS::DataLink::transport_priority (  ) 

Accessors for the TRANSPORT_PRIORITY value associated with this link.

Definition at line 27 of file DataLink.inl.

References transport_priority_.

Referenced by OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00028 {
00029   return this->transport_priority_;
00030 }

void OpenDDS::DCPS::DataLink::transport_shutdown (  ) 

Our TransportImpl will inform us if it is being shutdown() by calling this method.

Definition at line 661 of file DataLink.cpp.

References DBG_ENTRY_LVL, impl_, scheduled_to_stop_at_, set_scheduling_release(), and stop().

00662 {
00663   DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00664 
00665   if (!this->send_strategy_.is_nil()) {
00666     this->send_strategy_->transport_shutdown();
00667   }
00668 
00669   //this->cancel_release();
00670   this->set_scheduling_release(false);
00671   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00672   ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00673   reactor->cancel_timer(this);
00674 
00675   this->stop();
00676 
00677   // Drop our reference to the TransportImpl object
00678   this->impl_ = 0;
00679 }


Friends And Related Function Documentation

friend class DataLinkCleanupTask [friend]

Definition at line 66 of file DataLink.h.

OpenDDS_Dcps_Export std::ostream& operator<< ( std::ostream &  str,
const DataLink value 
) [friend]

Convenience function for diagnostic information.

Definition at line 1041 of file DataLink.cpp.

01042 {
01043   str << "   There are " << value.assoc_by_local_.size()
01044       << " local entities currently using this link comprising following associations:"
01045       << std::endl;
01046 
01047   for (DataLink::AssocByLocal::const_iterator
01048        localId = value.assoc_by_local_.begin();
01049        localId != value.assoc_by_local_.end();
01050        ++localId) {
01051     for (RepoIdSet::const_iterator
01052          remoteId = localId->second.begin();
01053          remoteId != localId->second.end();
01054          ++remoteId) {
01055       str << GuidConverter(localId->first) << " --> "
01056           << GuidConverter(*remoteId) << "   " << std::endl;
01057     }
01058   }
01059   return str;
01060 }

friend class ThreadPerConnectionSendTask [friend]

Definition at line 288 of file DataLink.h.

Referenced by DataLink().


Member Data Documentation

AssocByLocal OpenDDS::DCPS::DataLink::assoc_by_local_ [private]

Definition at line 363 of file DataLink.h.

Referenced by handle_timeout(), make_reservation(), notify(), OpenDDS::DCPS::operator<<(), peer_ids(), prepare_release(), release_reservations(), target_intersection(), and ~DataLink().

AssocByRemote OpenDDS::DCPS::DataLink::assoc_by_remote_ [private]

Definition at line 360 of file DataLink.h.

Referenced by data_received_i(), handle_timeout(), is_target(), make_reservation(), and release_reservations().

AssocByLocal OpenDDS::DCPS::DataLink::assoc_releasing_ [private]

Definition at line 380 of file DataLink.h.

Referenced by clear_associations(), and prepare_release().

ACE_Time_Value OpenDDS::DCPS::DataLink::datalink_release_delay_ [protected]

Configurable delay in milliseconds that the datalink should be released after all associations are removed.

Definition at line 399 of file DataLink.h.

Referenced by DataLink(), datalink_release_delay(), and schedule_delayed_release().

DataBlockAllocator* OpenDDS::DCPS::DataLink::db_allocator_ [protected]

Definition at line 408 of file DataLink.h.

Referenced by create_control(), DataLink(), and ~DataLink().

TransportReceiveListener* OpenDDS::DCPS::DataLink::default_listener_ [private]

If default_listener_ is not null and this DataLink receives a sample from a publication GUID that's not in pub_map_, it will call data_received() on the default_listener_.

Definition at line 355 of file DataLink.h.

Referenced by data_received_i(), and default_listener().

ACE_UINT64 OpenDDS::DCPS::DataLink::id_ [private]

The id for this DataLink.

Definition at line 372 of file DataLink.h.

Referenced by DataLink(), and id().

TransportImpl_rch OpenDDS::DCPS::DataLink::impl_ [private]

A (smart) pointer to the TransportImpl that created this DataLink.

Definition at line 369 of file DataLink.h.

Referenced by DataLink(), handle_exception(), handle_timeout(), impl(), release_reservations(), release_resources(), and transport_shutdown().

bool OpenDDS::DCPS::DataLink::is_active_ [protected]

Is pub or sub ?

Definition at line 413 of file DataLink.h.

Referenced by is_active().

bool OpenDDS::DCPS::DataLink::is_loopback_ [protected]

Is remote attached to same transport ?

Definition at line 411 of file DataLink.h.

Referenced by is_loopback(), and OpenDDS::DCPS::UdpDataLink::open().

MessageBlockAllocator* OpenDDS::DCPS::DataLink::mb_allocator_ [protected]

Allocators for data and message blocks used by transport control samples when send_control is called.

Definition at line 407 of file DataLink.h.

Referenced by create_control(), DataLink(), and ~DataLink().

LockType OpenDDS::DCPS::DataLink::pub_sub_maps_lock_ [mutable, private]

Definition at line 357 of file DataLink.h.

Referenced by make_reservation(), and peer_ids().

TransportStrategy_rch OpenDDS::DCPS::DataLink::receive_strategy_ [protected]

The transport receive strategy object for this DataLink.

Definition at line 286 of file DataLink.h.

Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i(), OpenDDS::DCPS::TcpDataLink::reconnect(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), start(), and stop().

IdToRecvListenerMap OpenDDS::DCPS::DataLink::recv_listeners_ [private]

Definition at line 350 of file DataLink.h.

Referenced by make_reservation(), notify(), recv_listener_for(), and remove_listener().

AssocByLocal OpenDDS::DCPS::DataLink::released_assoc_by_local_ [private]

Definition at line 366 of file DataLink.h.

Referenced by notify_connection_deleted(), and release_reservations().

LockType OpenDDS::DCPS::DataLink::released_assoc_by_local_lock_ [mutable, private]

Definition at line 365 of file DataLink.h.

ACE_Time_Value OpenDDS::DCPS::DataLink::scheduled_to_stop_at_ [private]

Definition at line 342 of file DataLink.h.

Referenced by cancel_release(), handle_exception(), schedule_stop(), stop(), and transport_shutdown().

bool OpenDDS::DCPS::DataLink::scheduling_release_ [private]

Definition at line 385 of file DataLink.h.

Referenced by cancel_release(), and set_scheduling_release().

TransportSendControlElementAllocator* OpenDDS::DCPS::DataLink::send_control_allocator_ [protected]

Allocator for TransportSendControlElements created when send_control is called.

Definition at line 403 of file DataLink.h.

Referenced by DataLink(), send_control(), and ~DataLink().

IdToSendListenerMap OpenDDS::DCPS::DataLink::send_listeners_ [private]

Definition at line 346 of file DataLink.h.

Referenced by make_reservation(), notify(), remove_listener(), and send_listener_for().

SendResponseListener OpenDDS::DCPS::DataLink::send_response_listener_ [protected]

Listener for TransportSendControlElements created in send_control.

Definition at line 417 of file DataLink.h.

Referenced by send_control().

TransportSendStrategy_rch OpenDDS::DCPS::DataLink::send_strategy_ [protected]

The transport send strategy object for this DataLink.

Reimplemented in OpenDDS::DCPS::MulticastDataLink, OpenDDS::DCPS::RtpsUdpDataLink, OpenDDS::DCPS::ShmemDataLink, and OpenDDS::DCPS::UdpDataLink.

Definition at line 392 of file DataLink.h.

Referenced by add_on_start_callback(), make_reservation(), OpenDDS::DCPS::TcpDataLink::reconnect(), remove_all_msgs(), remove_sample(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), send_i(), send_start_i(), send_stop_i(), start(), stop(), and terminate_send().

bool OpenDDS::DCPS::DataLink::started_ [protected]

Definition at line 414 of file DataLink.h.

Referenced by add_on_start_callback(), and start().

bool OpenDDS::DCPS::DataLink::stopped_ [private]

A boolean indicating if the DataLink has been stopped. This value is protected by the strategy_lock_.

Definition at line 341 of file DataLink.h.

Referenced by cancel_release(), and stop().

LockType OpenDDS::DCPS::DataLink::strategy_lock_ [protected]

Definition at line 394 of file DataLink.h.

Referenced by add_on_start_callback(), invoke_on_start_callbacks(), make_reservation(), and remove_on_start_callback().

ThreadPerConnectionSendTask* OpenDDS::DCPS::DataLink::thr_per_con_send_task_ [private]

The task used to do the sending. This ThreadPerConnectionSendTask object is created when the thread_per_connection configuration is true. It only dedicate to this datalink.

Definition at line 377 of file DataLink.h.

Referenced by DataLink(), pre_stop_i(), remove_sample(), send(), send_start(), send_stop(), and ~DataLink().

Priority OpenDDS::DCPS::DataLink::transport_priority_ [private]

TRANSPORT_PRIORITY value associated with the link.

Definition at line 383 of file DataLink.h.

Referenced by transport_priority().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:26 2016 for OpenDDS by  doxygen 1.4.7