OpenDDS::DCPS::DataLinkSet Class Reference

#include <DataLinkSet.h>

Inheritance diagram for OpenDDS::DCPS::DataLinkSet:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataLinkSet:
Collaboration graph
[legend]

List of all members.

Public Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType

Public Member Functions

 DataLinkSet ()
virtual ~DataLinkSet ()
int insert_link (const DataLink_rch &link)
void remove_link (const DataLink_rch &link)
void send (DataSampleElement *sample)
 Send to each DataLink in the set.
void send_control (DataSampleElement *sample)
 Send a control message that is wrapped in a DataSampleElement.
SendControlStatus send_control (RepoId pub_id, const TransportSendListener_rch &listener, const DataSampleHeader &header, Message_Block_Ptr msg)
 Send control message to each DataLink in the set.
void send_response (RepoId sub_id, const DataSampleHeader &header, Message_Block_Ptr response)
bool remove_sample (const DataSampleElement *sample)
bool remove_all_msgs (RepoId pub_id)
void send_start (DataLinkSet *link_set)
void send_stop (RepoId repoId)
DataLinkSet_rch select_links (const RepoId *remoteIds, const CORBA::ULong num_targets)
bool empty ()
void send_final_acks (const RepoId &readerid)
typedef OPENDDS_MAP (DataLinkIdType, DataLink_rch) MapType
LockTypelock ()
 Accessors for external iteration.
MapType & map ()

Private Member Functions

void copy_map_to (MapType &target)
 lock and copy map for lock-free access

Private Attributes

MapType map_
 Hash map for DataLinks.
LockType lock_
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_response.

Detailed Description

Definition at line 30 of file DataLinkSet.h.


Member Typedef Documentation

Definition at line 78 of file DataLinkSet.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLinkSet::LockType

Definition at line 77 of file DataLinkSet.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataLinkSet::DataLinkSet (  ) 

Definition at line 30 of file DataLinkSet.cpp.

References DBG_ENTRY_LVL.

00031   : send_response_listener_("DataLinkSet")
00032 {
00033   DBG_ENTRY_LVL("DataLinkSet","DataLinkSet",6);
00034 }

OpenDDS::DCPS::DataLinkSet::~DataLinkSet (  )  [virtual]

Definition at line 36 of file DataLinkSet.cpp.

References DBG_ENTRY_LVL.

00037 {
00038   DBG_ENTRY_LVL("DataLinkSet","~DataLinkSet",6);
00039 }


Member Function Documentation

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::copy_map_to ( MapType &  target  )  [private]

lock and copy map for lock-free access

Definition at line 227 of file DataLinkSet.inl.

References lock_, and map_.

Referenced by send_control().

00228 {
00229   target.clear();
00230 
00231   // Lock the existing map
00232   GuardType guard(this->lock_);
00233 
00234   // Copy to target
00235   for (MapType::iterator itr = map_.begin();
00236        itr != map_.end();
00237        ++itr) {
00238     target.insert(*itr);
00239   }
00240 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataLinkSet::empty ( void   ) 

Definition at line 87 of file DataLinkSet.cpp.

References lock_, and map_.

00088 {
00089   GuardType guard(this->lock_);
00090 
00091   return map_.empty();
00092 }

int OpenDDS::DCPS::DataLinkSet::insert_link ( const DataLink_rch link  ) 

Definition at line 42 of file DataLinkSet.cpp.

References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::TransportClient::send_control_to(), and OpenDDS::DCPS::TransportClient::send_response().

00043 {
00044   DBG_ENTRY_LVL("DataLinkSet","insert_link",6);
00045   GuardType guard(this->lock_);
00046   return OpenDDS::DCPS::bind(map_, link->id(), link);
00047 }

Here is the call graph for this function:

Here is the caller graph for this function:

LockType& OpenDDS::DCPS::DataLinkSet::lock ( void   )  [inline]

Accessors for external iteration.

Definition at line 84 of file DataLinkSet.h.

00084 { return lock_; }

MapType& OpenDDS::DCPS::DataLinkSet::map ( void   )  [inline]

Definition at line 85 of file DataLinkSet.h.

00085 { return map_; }

typedef OpenDDS::DCPS::DataLinkSet::OPENDDS_MAP ( DataLinkIdType  ,
DataLink_rch   
)
ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 170 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::remove_all_msgs().

00171 {
00172   DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00173   GuardType guard(this->lock_);
00174   const MapType::iterator end = this->map_.end();
00175   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00176     itr->second->remove_all_msgs(pub_id);
00177   }
00178 
00179   return true;
00180 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataLinkSet::remove_link ( const DataLink_rch link  ) 

Definition at line 50 of file DataLinkSet.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, LM_DEBUG, lock_, map_, OpenDDS::DCPS::unbind(), and VDBG.

Referenced by OpenDDS::DCPS::TransportClient::disassociate().

00051 {
00052   DBG_ENTRY_LVL("DataLinkSet", "remove_link", 6);
00053   GuardType guard1(this->lock_);
00054   if (unbind(map_, link->id()) != 0) {
00055     // Just report to the log that we tried.
00056     VDBG((LM_DEBUG,
00057           ACE_TEXT("(%P|%t) DataLinkSet::remove_links: ")
00058           ACE_TEXT("link_id %d not found in map.\n"),
00059           link->id()));
00060   }
00061 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_sample ( const DataSampleElement sample  ) 

Definition at line 154 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, lock_, map_, and OpenDDS::DCPS::REMOVE_RELEASED.

Referenced by OpenDDS::DCPS::TransportClient::remove_sample().

00155 {
00156   DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00157   GuardType guard(this->lock_);
00158   const MapType::iterator end = this->map_.end();
00159   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00160 
00161     if (itr->second->remove_sample(sample, 0) == REMOVE_RELEASED) {
00162       return true;
00163     }
00164   }
00165 
00166   return false;
00167 }

Here is the caller graph for this function:

OpenDDS::DCPS::DataLinkSet_rch OpenDDS::DCPS::DataLinkSet::select_links ( const RepoId remoteIds,
const CORBA::ULong  num_targets 
)

Definition at line 64 of file DataLinkSet.cpp.

References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00066 {
00067   DBG_ENTRY_LVL("DataLinkSet","select_links",6);
00068 
00069   DataLinkSet_rch selected_links ( make_rch<DataLinkSet>() );
00070   GuardType guard(this->lock_);
00071   for (MapType::iterator itr = map_.begin();
00072        itr != map_.end();
00073        ++itr) {
00074     for (CORBA::ULong i = 0; i < num_targets; ++i) {
00075       if (itr->second->is_target(remoteIds[i])) {
00076         OpenDDS::DCPS::bind(selected_links->map_,
00077              itr->second->id(), itr->second);
00078         break;
00079       }
00080     }
00081   }
00082 
00083   return selected_links;
00084 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send ( DataSampleElement sample  ) 

Send to each DataLink in the set.

Definition at line 22 of file DataLinkSet.inl.

References OpenDDS::DCPS::DataSampleHeader::add_cfentries(), OpenDDS::DCPS::CONTENT_FILTER_FLAG, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_filter_per_link(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, map_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and VDBG_LVL.

00023 {
00024   DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026             sample), 5);
00027   GuardType guard(this->lock_);
00028 
00029 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00030   const bool customHeader =
00031     DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00032 #endif
00033 
00034   TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_.size()), sample);
00035 
00036   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037 
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039     if (customHeader) {
00040       typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041       FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042       GUIDSeq* guids = 0;
00043       if (fi != sample->get_filter_per_link().end()) {
00044         guids = fi->second.ptr();
00045       }
00046 
00047       VDBG_LVL((LM_DEBUG,
00048         "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049         itr->second.in(), guids ? guids->length() : 0), 5);
00050 
00051       Message_Block_Ptr mb (send_element->msg()->duplicate());
00052 
00053       DataSampleHeader::add_cfentries(guids, mb.get());
00054 
00055       TransportCustomizedElement* tce =
00056         new TransportCustomizedElement(send_element, false);
00057       tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain
00058 
00059       itr->second->send(tce);
00060 
00061     } else {
00062 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00063 
00064       // Tell the DataLink to send it.
00065       itr->second->send(send_element);
00066 
00067 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00068     }
00069 #endif
00070   }
00071 }

Here is the call graph for this function:

ACE_INLINE OpenDDS::DCPS::SendControlStatus OpenDDS::DCPS::DataLinkSet::send_control ( RepoId  pub_id,
const TransportSendListener_rch listener,
const DataSampleHeader header,
Message_Block_Ptr  msg 
)

Send control message to each DataLink in the set.

Definition at line 88 of file DataLinkSet.inl.

References ACE_TEXT(), copy_map_to(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, OpenDDS::DCPS::move(), OPENDDS_STRING, and OpenDDS::DCPS::SEND_CONTROL_OK.

00092 {
00093   DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00094   //Optimized - use cached allocator.
00095 
00096   MapType dup_map;
00097   copy_map_to(dup_map);
00098 
00099   if (dup_map.empty()) {
00100     // similar to the "no links" case in TransportClient::send()
00101     if (DCPS_debug_level > 4) {
00102       const GuidConverter converter(pub_id);
00103       ACE_DEBUG((LM_DEBUG,
00104                  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00105                  ACE_TEXT("no links for publication %C, ")
00106                  ACE_TEXT("not sending control message.\n"),
00107                  OPENDDS_STRING(converter).c_str()));
00108     }
00109     listener->control_delivered(msg);
00110     return SEND_CONTROL_OK;
00111   }
00112 
00113   TransportSendControlElement* const send_element =
00114     new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
00115                                        listener.in(), header, move(msg));
00116 
00117   for (MapType::iterator itr = dup_map.begin();
00118        itr != dup_map.end();
00119        ++itr) {
00120     itr->second->send_start();
00121     itr->second->send(send_element);
00122     itr->second->send_stop(pub_id);
00123   }
00124 
00125   return SEND_CONTROL_OK;
00126 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_control ( DataSampleElement sample  ) 

Send a control message that is wrapped in a DataSampleElement.

Definition at line 74 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, LM_DEBUG, lock_, map_, and VDBG.

Referenced by OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::TransportClient::send_control_to().

00075 {
00076   DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00077   VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00078   GuardType guard(this->lock_);
00079   TransportSendControlElement* send_element =
00080     new TransportSendControlElement(static_cast<int>(map_.size()), sample);
00081 
00082   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00083     itr->second->send(send_element);
00084   }
00085 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_final_acks ( const RepoId readerid  ) 

Definition at line 243 of file DataLinkSet.inl.

References lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_final_acks().

00244 {
00245   GuardType guard(this->lock_);
00246   for (MapType::iterator itr = map_.begin();
00247        itr != map_.end();
00248        ++itr) {
00249     itr->second->send_final_acks(readerid);
00250   }
00251 
00252   map_.clear();
00253 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_response ( RepoId  sub_id,
const DataSampleHeader header,
Message_Block_Ptr  response 
)

Definition at line 129 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, lock_, map_, OpenDDS::DCPS::move(), send_response_listener_, and OpenDDS::DCPS::SendResponseListener::track_message().

Referenced by OpenDDS::DCPS::TransportClient::send_response().

00133 {
00134   DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00135   GuardType guard(this->lock_);
00136 
00137   TransportSendControlElement* const send_element =
00138     new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
00139                                        &send_response_listener_, header,
00140                                        move(response));
00141   if (!send_element) return;
00142   send_response_listener_.track_message();
00143 
00144   for (MapType::iterator itr = map_.begin();
00145        itr != map_.end();
00146        ++itr) {
00147     itr->second->send_start();
00148     itr->second->send(send_element);
00149     itr->second->send_stop(pub_id);
00150   }
00151 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_start ( DataLinkSet link_set  ) 

Calls send_start() on the links in link_set and also adds the links from link_set to *this.

Definition at line 183 of file DataLinkSet.inl.

References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, LM_ERROR, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00184 {
00185   DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00186   GuardType guard1(this->lock_);
00187   GuardType guard2(link_set->lock_);
00188   for (MapType::iterator itr = link_set->map_.begin();
00189        itr != link_set->map_.end();
00190        ++itr) {
00191     // Attempt to add the current DataLink to this set.
00192     int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00193 
00194     if (result == 0) {
00195       // We successfully added the current DataLink to this set,
00196       // meaning that it wasn't already a member.  We should tell
00197       // the DataLink about the send_start() event.
00198       itr->second->send_start();
00199 
00200     } else if (result == -1) {
00201       ACE_ERROR((LM_ERROR,
00202                  "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00203     }
00204 
00205     // Note that there is a possibility that the result == 1, which
00206     // means that the DataLink already exists in our map_->  We skip
00207     // all of these cases.
00208   }
00209 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_stop ( RepoId  repoId  ) 

Calls send_stop() on the links with ID repoId and then clears the set.

Definition at line 212 of file DataLinkSet.inl.

References DBG_ENTRY_LVL, lock_, and map_.

Referenced by OpenDDS::DCPS::TransportClient::send_i().

00213 {
00214   DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00215   // Iterate over our map_ and tell each DataLink about the send_stop() event.
00216   GuardType guard(this->lock_);
00217   for (MapType::iterator itr = map_.begin();
00218        itr != map_.end();
00219        ++itr) {
00220     itr->second->send_stop(repoId);
00221   }
00222 
00223   map_.clear();
00224 }

Here is the caller graph for this function:


Member Data Documentation

This lock will protect critical sections of code that play a role in the sending of data.

Definition at line 95 of file DataLinkSet.h.

Referenced by copy_map_to(), empty(), insert_link(), remove_all_msgs(), remove_link(), remove_sample(), select_links(), send(), send_control(), send_final_acks(), send_response(), send_start(), and send_stop().

Listener for TransportSendControlElements created in send_response.

Definition at line 98 of file DataLinkSet.h.

Referenced by send_response().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1