OpenDDS::DCPS::DataLinkSet Class Reference

#include <DataLinkSet.h>

Inheritance diagram for OpenDDS::DCPS::DataLinkSet:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataLinkSet:

Collaboration graph
[legend]
List of all members.

Public Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType

Public Member Functions

 DataLinkSet ()
virtual ~DataLinkSet ()
int insert_link (DataLink *link)
void remove_link (const DataLink_rch &link)
void send (DataSampleElement *sample)
 Send to each DataLink in the set.
void send_control (DataSampleElement *sample)
 Send a control message that is wrapped in a DataSampleElement.
SendControlStatus send_control (RepoId pub_id, TransportSendListener *listener, const DataSampleHeader &header, ACE_Message_Block *msg, TransportSendControlElementAllocator *allocator=0)
 Send control message to each DataLink in the set.
void send_response (RepoId sub_id, const DataSampleHeader &header, ACE_Message_Block *response)
bool remove_sample (const DataSampleElement *sample)
bool remove_all_msgs (RepoId pub_id)
void send_start (DataLinkSet *link_set)
void send_stop (RepoId repoId)
DataLinkSetselect_links (const RepoId *remoteIds, const CORBA::ULong num_targets)
bool empty ()
void send_final_acks (const RepoId &readerid)
typedef OPENDDS_MAP (DataLinkIdType, DataLink_rch) MapType
LockTypelock ()
 Accessors for external iteration.
MapType & map ()
TransportSendControlElementAllocatortsce_allocator ()

Private Member Functions

void copy_map_to (MapType &target)
 lock and copy map for lock-free access

Private Attributes

MapType map_
 Hash map for DataLinks.
TransportSendControlElementAllocator send_control_element_allocator_
 Allocator for TransportSendControlElement.
LockType lock_
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_response.

Detailed Description

Definition at line 27 of file DataLinkSet.h.


Member Typedef Documentation

typedef ACE_Guard<LockType> OpenDDS::DCPS::DataLinkSet::GuardType

Definition at line 76 of file DataLinkSet.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLinkSet::LockType

Definition at line 75 of file DataLinkSet.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataLinkSet::DataLinkSet (  ) 

Definition at line 30 of file DataLinkSet.cpp.

References DBG_ENTRY_LVL, NUM_SEND_CONTROL_ELEMENT_CHUNKS, send_control_element_allocator_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by select_links().

00031   : send_control_element_allocator_(NUM_SEND_CONTROL_ELEMENT_CHUNKS)
00032   , send_response_listener_("DataLinkSet")
00033 {
00034   DBG_ENTRY_LVL("DataLinkSet","DataLinkSet",6);
00035 
00036   if (OpenDDS::DCPS::Transport_debug_level > 3) {
00037     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLinkSet send_control_element_allocator %x with %d chunks\n",
00038                &send_control_element_allocator_, NUM_SEND_CONTROL_ELEMENT_CHUNKS));
00039   }
00040 }

OpenDDS::DCPS::DataLinkSet::~DataLinkSet (  )  [virtual]

Definition at line 42 of file DataLinkSet.cpp.

References DBG_ENTRY_LVL.

00043 {
00044   DBG_ENTRY_LVL("DataLinkSet","~DataLinkSet",6);
00045 }


Member Function Documentation

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::copy_map_to ( MapType &  target  )  [private]

lock and copy map for lock-free access

Definition at line 238 of file DataLinkSet.inl.

References map_.

Referenced by send_control().

00239 {
00240   target.clear();
00241 
00242   // Lock the existing map
00243   GuardType guard(this->lock_);
00244 
00245   // Copy to target
00246   for (MapType::iterator itr = map_.begin();
00247        itr != map_.end();
00248        ++itr) {
00249     target.insert(*itr);
00250   }
00251 }

bool OpenDDS::DCPS::DataLinkSet::empty (  ) 

Definition at line 94 of file DataLinkSet.cpp.

References map_.

00095 {
00096   GuardType guard(this->lock_);
00097 
00098   return map_.empty();
00099 }

int OpenDDS::DCPS::DataLinkSet::insert_link ( DataLink link  ) 

Definition at line 48 of file DataLinkSet.cpp.

References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, and map_.

Referenced by OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::TransportClient::send_control_to(), and OpenDDS::DCPS::TransportClient::send_response().

00049 {
00050   DBG_ENTRY_LVL("DataLinkSet","insert_link",6);
00051   DataLink_rch mylink(link, false);
00052   GuardType guard(this->lock_);
00053   return OpenDDS::DCPS::bind(map_, mylink->id(), mylink);
00054 }

LockType& OpenDDS::DCPS::DataLinkSet::lock (  )  [inline]

Accessors for external iteration.

Definition at line 82 of file DataLinkSet.h.

00082 { return lock_; }

MapType& OpenDDS::DCPS::DataLinkSet::map (  )  [inline]

Definition at line 83 of file DataLinkSet.h.

Referenced by OpenDDS::DCPS::TransportClient::transport_detached(), and OpenDDS::DCPS::TransportClient::~TransportClient().

00083 { return map_; }

typedef OpenDDS::DCPS::DataLinkSet::OPENDDS_MAP ( DataLinkIdType  ,
DataLink_rch   
)

ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 181 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, and map_.

Referenced by OpenDDS::DCPS::TransportClient::remove_all_msgs().

00182 {
00183   DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00184   GuardType guard(this->lock_);
00185   const MapType::iterator end = this->map_.end();
00186   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00187     itr->second->remove_all_msgs(pub_id);
00188   }
00189 
00190   return true;
00191 }

void OpenDDS::DCPS::DataLinkSet::remove_link ( const DataLink_rch link  ) 

Definition at line 57 of file DataLinkSet.cpp.

References DBG_ENTRY_LVL, map_, OpenDDS::DCPS::unbind(), and VDBG.

Referenced by OpenDDS::DCPS::TransportClient::disassociate().

00058 {
00059   DBG_ENTRY_LVL("DataLinkSet", "remove_link", 6);
00060   GuardType guard1(this->lock_);
00061   if (unbind(map_, link->id()) != 0) {
00062     // Just report to the log that we tried.
00063     VDBG((LM_DEBUG,
00064           ACE_TEXT("(%P|%t) DataLinkSet::remove_links: ")
00065           ACE_TEXT("link_id %d not found in map.\n"),
00066           link->id()));
00067   }
00068 }

ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_sample ( const DataSampleElement sample  ) 

Definition at line 165 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, map_, and OpenDDS::DCPS::REMOVE_RELEASED.

Referenced by OpenDDS::DCPS::TransportClient::remove_sample().

00166 {
00167   DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00168   GuardType guard(this->lock_);
00169   const MapType::iterator end = this->map_.end();
00170   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00171 
00172     if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
00173       return true;
00174     }
00175   }
00176 
00177   return false;
00178 }

OpenDDS::DCPS::DataLinkSet * OpenDDS::DCPS::DataLinkSet::select_links ( const RepoId remoteIds,
const CORBA::ULong  num_targets 
)

Definition at line 71 of file DataLinkSet.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::bind(), DataLinkSet(), DBG_ENTRY_LVL, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00073 {
00074   DBG_ENTRY_LVL("DataLinkSet","select_links",6);
00075 
00076   DataLinkSet_rch selected_links = new DataLinkSet();
00077   GuardType guard(this->lock_);
00078   for (MapType::iterator itr = map_.begin();
00079        itr != map_.end();
00080        ++itr) {
00081     for (CORBA::ULong i = 0; i < num_targets; ++i) {
00082       if (itr->second->is_target(remoteIds[i])) {
00083         OpenDDS::DCPS::bind(selected_links->map_,
00084              itr->second->id(), itr->second);
00085         break;
00086       }
00087     }
00088   }
00089 
00090   return selected_links._retn();
00091 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send ( DataSampleElement sample  ) 

Send to each DataLink in the set.

Definition at line 22 of file DataLinkSet.inl.

References OpenDDS::DCPS::DataSampleHeader::add_cfentries(), OpenDDS::DCPS::TransportCustomizedElement::alloc(), OpenDDS::DCPS::TransportSendElement::alloc(), OpenDDS::DCPS::CONTENT_FILTER_FLAG, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_filter_per_link(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::DataSampleElement::get_transport_customized_element_allocator(), map_, OpenDDS::DCPS::TransportCustomizedElement::set_msg(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and VDBG_LVL.

00023 {
00024   DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026             sample), 5);
00027   GuardType guard(this->lock_);
00028   TransportSendElement* send_element =
00029     TransportSendElement::alloc(static_cast<int>(map_.size()), sample);
00030 
00031 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00032   const bool customHeader =
00033     DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00034 #endif
00035 
00036   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037 
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039     if (customHeader) {
00040       typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041       FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042       GUIDSeq* guids = 0;
00043       if (fi != sample->get_filter_per_link().end()) {
00044         guids = fi->second.ptr();
00045       }
00046 
00047       VDBG_LVL((LM_DEBUG,
00048         "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049         itr->second.in(), guids ? guids->length() : 0), 5);
00050 
00051       ACE_Message_Block* mb = sample->get_sample()->duplicate();
00052 
00053       DataSampleHeader::add_cfentries(guids, mb);
00054 
00055       TransportCustomizedElement* tce =
00056         TransportCustomizedElement::alloc(send_element, false,
00057           sample->get_transport_customized_element_allocator());
00058       tce->set_msg(mb); // tce now owns ACE_Message_Block chain
00059 
00060       itr->second->send(tce);
00061 
00062     } else {
00063 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00064 
00065       // Tell the DataLink to send it.
00066       itr->second->send(send_element);
00067 
00068 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00069     }
00070 #endif
00071   }
00072 }

ACE_INLINE OpenDDS::DCPS::SendControlStatus OpenDDS::DCPS::DataLinkSet::send_control ( RepoId  pub_id,
TransportSendListener listener,
const DataSampleHeader header,
ACE_Message_Block *  msg,
TransportSendControlElementAllocator allocator = 0 
)

Send control message to each DataLink in the set.

Definition at line 94 of file DataLinkSet.inl.

References OpenDDS::DCPS::TransportSendControlElement::alloc(), OpenDDS::DCPS::TransportSendListener::control_delivered(), copy_map_to(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, header, OPENDDS_STRING, send_control_element_allocator_, OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::SEND_CONTROL_OK.

00099 {
00100   DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00101   //Optimized - use cached allocator.
00102 
00103   MapType dup_map;
00104   copy_map_to(dup_map);
00105 
00106   if (dup_map.empty()) {
00107     // similar to the "no links" case in TransportClient::send()
00108     if (DCPS_debug_level > 4) {
00109       const GuidConverter converter(pub_id);
00110       ACE_DEBUG((LM_DEBUG,
00111                  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00112                  ACE_TEXT("no links for publication %C, ")
00113                  ACE_TEXT("not sending control message.\n"),
00114                  OPENDDS_STRING(converter).c_str()));
00115     }
00116     listener->control_delivered(msg);
00117     return SEND_CONTROL_OK;
00118   }
00119 
00120   TransportSendControlElementAllocator& use_alloc =
00121     allocator ? *allocator : send_control_element_allocator_;
00122 
00123   TransportSendControlElement* const send_element =
00124     TransportSendControlElement::alloc(static_cast<int>(dup_map.size()), pub_id,
00125                                        listener, header, msg, &use_alloc);
00126   if (!send_element) return SEND_CONTROL_ERROR;
00127 
00128   for (MapType::iterator itr = dup_map.begin();
00129        itr != dup_map.end();
00130        ++itr) {
00131     itr->second->send_start();
00132     itr->second->send(send_element);
00133     itr->second->send_stop(pub_id);
00134   }
00135 
00136   return SEND_CONTROL_OK;
00137 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_control ( DataSampleElement sample  ) 

Send a control message that is wrapped in a DataSampleElement.

Definition at line 75 of file DataLinkSet.inl.

References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, map_, send_control_element_allocator_, and VDBG.

Referenced by OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::TransportClient::send_control_to().

00076 {
00077   DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00078   VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00079   GuardType guard(this->lock_);
00080   TransportSendControlElement* send_element =
00081     TransportSendControlElement::alloc(static_cast<int>(map_.size()), sample,
00082                                        &send_control_element_allocator_);
00083   if (!send_element) {
00084     ACE_ERROR((LM_ERROR, "(%P|%t) DataLinkSet::send_control allocation of "
00085       "TransportSendControlElement failed\n"));
00086     return;
00087   }
00088   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00089     itr->second->send(send_element);
00090   }
00091 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_final_acks ( const RepoId readerid  ) 

Definition at line 254 of file DataLinkSet.inl.

References map_.

Referenced by OpenDDS::DCPS::TransportClient::send_final_acks().

00255 {
00256   GuardType guard(this->lock_);
00257   for (MapType::iterator itr = map_.begin();
00258        itr != map_.end();
00259        ++itr) {
00260     itr->second->send_final_acks(readerid);
00261   }
00262 
00263   map_.clear();
00264 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_response ( RepoId  sub_id,
const DataSampleHeader header,
ACE_Message_Block *  response 
)

Definition at line 140 of file DataLinkSet.inl.

References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, header, map_, send_control_element_allocator_, send_response_listener_, and OpenDDS::DCPS::SendResponseListener::track_message().

Referenced by OpenDDS::DCPS::TransportClient::send_response().

00144 {
00145   DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00146   GuardType guard(this->lock_);
00147 
00148   TransportSendControlElement* const send_element =
00149     TransportSendControlElement::alloc(static_cast<int>(map_.size()), pub_id,
00150                                        &send_response_listener_, header,
00151                                        response, &send_control_element_allocator_);
00152   if (!send_element) return;
00153   send_response_listener_.track_message();
00154 
00155   for (MapType::iterator itr = map_.begin();
00156        itr != map_.end();
00157        ++itr) {
00158     itr->second->send_start();
00159     itr->second->send(send_element);
00160     itr->second->send_stop(pub_id);
00161   }
00162 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_start ( DataLinkSet link_set  ) 

Calls send_start() on the links in link_set and also adds the links from link_set to *this.

Definition at line 194 of file DataLinkSet.inl.

References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00195 {
00196   DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00197   GuardType guard1(this->lock_);
00198   GuardType guard2(link_set->lock_);
00199   for (MapType::iterator itr = link_set->map_.begin();
00200        itr != link_set->map_.end();
00201        ++itr) {
00202     // Attempt to add the current DataLink to this set.
00203     int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00204 
00205     if (result == 0) {
00206       // We successfully added the current DataLink to this set,
00207       // meaning that it wasn't already a member.  We should tell
00208       // the DataLink about the send_start() event.
00209       itr->second->send_start();
00210 
00211     } else if (result == -1) {
00212       ACE_ERROR((LM_ERROR,
00213                  "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00214     }
00215 
00216     // Note that there is a possibility that the result == 1, which
00217     // means that the DataLink already exists in our map_->  We skip
00218     // all of these cases.
00219   }
00220 }

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_stop ( RepoId  repoId  ) 

Calls send_stop() on the links with ID repoId and then clears the set.

Definition at line 223 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00224 {
00225   DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00226   // Iterate over our map_ and tell each DataLink about the send_stop() event.
00227   GuardType guard(this->lock_);
00228   for (MapType::iterator itr = map_.begin();
00229        itr != map_.end();
00230        ++itr) {
00231     itr->second->send_stop(repoId);
00232   }
00233 
00234   map_.clear();
00235 }

TransportSendControlElementAllocator& OpenDDS::DCPS::DataLinkSet::tsce_allocator (  )  [inline]

Definition at line 86 of file DataLinkSet.h.

Referenced by OpenDDS::DCPS::TransportClient::send_control_to().

00086                                                          {
00087     return send_control_element_allocator_;
00088   }


Member Data Documentation

LockType OpenDDS::DCPS::DataLinkSet::lock_ [private]

This lock will protect critical sections of code that play a role in the sending of data.

Definition at line 100 of file DataLinkSet.h.

Referenced by send_start().

MapType OpenDDS::DCPS::DataLinkSet::map_ [private]

Hash map for DataLinks.

Definition at line 93 of file DataLinkSet.h.

Referenced by copy_map_to(), empty(), insert_link(), remove_all_msgs(), remove_link(), remove_sample(), select_links(), send(), send_control(), send_final_acks(), send_response(), send_start(), and send_stop().

TransportSendControlElementAllocator OpenDDS::DCPS::DataLinkSet::send_control_element_allocator_ [private]

Allocator for TransportSendControlElement.

Definition at line 96 of file DataLinkSet.h.

Referenced by DataLinkSet(), send_control(), and send_response().

SendResponseListener OpenDDS::DCPS::DataLinkSet::send_response_listener_ [private]

Listener for TransportSendControlElements created in send_response.

Definition at line 103 of file DataLinkSet.h.

Referenced by send_response().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:26 2016 for OpenDDS by  doxygen 1.4.7