DataLinkSet.inl

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "EntryExit.h"
00009 #include "DataLink.h"
00010 #include "TransportSendElement.h"
00011 #include "dds/DCPS/DataSampleHeader.h"
00012 #include "dds/DCPS/Util.h"
00013 #include "dds/DCPS/Definitions.h"
00014 #include "dds/DCPS/GuidConverter.h"
00015 
00016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00017 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00018 #include "TransportCustomizedElement.h"
00019 #endif
00020 
00021 ACE_INLINE void
00022 OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
00023 {
00024   DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026             sample), 5);
00027   GuardType guard(this->lock_);
00028 
00029 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00030   const bool customHeader =
00031     DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00032 #endif
00033 
00034   TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_.size()), sample);
00035 
00036   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037 
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039     if (customHeader) {
00040       typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041       FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042       GUIDSeq* guids = 0;
00043       if (fi != sample->get_filter_per_link().end()) {
00044         guids = fi->second.ptr();
00045       }
00046 
00047       VDBG_LVL((LM_DEBUG,
00048         "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049         itr->second.in(), guids ? guids->length() : 0), 5);
00050 
00051       Message_Block_Ptr mb (send_element->msg()->duplicate());
00052 
00053       DataSampleHeader::add_cfentries(guids, mb.get());
00054 
00055       TransportCustomizedElement* tce =
00056         new TransportCustomizedElement(send_element, false);
00057       tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain
00058 
00059       itr->second->send(tce);
00060 
00061     } else {
00062 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00063 
00064       // Tell the DataLink to send it.
00065       itr->second->send(send_element);
00066 
00067 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00068     }
00069 #endif
00070   }
00071 }
00072 
00073 ACE_INLINE void
00074 OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
00075 {
00076   DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00077   VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00078   GuardType guard(this->lock_);
00079   TransportSendControlElement* send_element =
00080     new TransportSendControlElement(static_cast<int>(map_.size()), sample);
00081 
00082   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00083     itr->second->send(send_element);
00084   }
00085 }
00086 
00087 ACE_INLINE OpenDDS::DCPS::SendControlStatus
00088 OpenDDS::DCPS::DataLinkSet::send_control(RepoId                           pub_id,
00089                                          const TransportSendListener_rch& listener,
00090                                          const DataSampleHeader&          header,
00091                                          Message_Block_Ptr                msg)
00092 {
00093   DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00094   //Optimized - use cached allocator.
00095 
00096   MapType dup_map;
00097   copy_map_to(dup_map);
00098 
00099   if (dup_map.empty()) {
00100     // similar to the "no links" case in TransportClient::send()
00101     if (DCPS_debug_level > 4) {
00102       const GuidConverter converter(pub_id);
00103       ACE_DEBUG((LM_DEBUG,
00104                  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00105                  ACE_TEXT("no links for publication %C, ")
00106                  ACE_TEXT("not sending control message.\n"),
00107                  OPENDDS_STRING(converter).c_str()));
00108     }
00109     listener->control_delivered(msg);
00110     return SEND_CONTROL_OK;
00111   }
00112 
00113   TransportSendControlElement* const send_element =
00114     new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
00115                                        listener.in(), header, move(msg));
00116 
00117   for (MapType::iterator itr = dup_map.begin();
00118        itr != dup_map.end();
00119        ++itr) {
00120     itr->second->send_start();
00121     itr->second->send(send_element);
00122     itr->second->send_stop(pub_id);
00123   }
00124 
00125   return SEND_CONTROL_OK;
00126 }
00127 
00128 ACE_INLINE void
00129 OpenDDS::DCPS::DataLinkSet::send_response(
00130   RepoId pub_id,
00131   const DataSampleHeader& header,
00132   Message_Block_Ptr response)
00133 {
00134   DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00135   GuardType guard(this->lock_);
00136 
00137   TransportSendControlElement* const send_element =
00138     new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
00139                                        &send_response_listener_, header,
00140                                        move(response));
00141   if (!send_element) return;
00142   send_response_listener_.track_message();
00143 
00144   for (MapType::iterator itr = map_.begin();
00145        itr != map_.end();
00146        ++itr) {
00147     itr->second->send_start();
00148     itr->second->send(send_element);
00149     itr->second->send_stop(pub_id);
00150   }
00151 }
00152 
00153 ACE_INLINE bool
00154 OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
00155 {
00156   DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00157   GuardType guard(this->lock_);
00158   const MapType::iterator end = this->map_.end();
00159   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00160 
00161     if (itr->second->remove_sample(sample, 0) == REMOVE_RELEASED) {
00162       return true;
00163     }
00164   }
00165 
00166   return false;
00167 }
00168 
00169 ACE_INLINE bool
00170 OpenDDS::DCPS::DataLinkSet::remove_all_msgs(RepoId pub_id)
00171 {
00172   DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00173   GuardType guard(this->lock_);
00174   const MapType::iterator end = this->map_.end();
00175   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00176     itr->second->remove_all_msgs(pub_id);
00177   }
00178 
00179   return true;
00180 }
00181 
00182 ACE_INLINE void
00183 OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* link_set)
00184 {
00185   DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00186   GuardType guard1(this->lock_);
00187   GuardType guard2(link_set->lock_);
00188   for (MapType::iterator itr = link_set->map_.begin();
00189        itr != link_set->map_.end();
00190        ++itr) {
00191     // Attempt to add the current DataLink to this set.
00192     int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00193 
00194     if (result == 0) {
00195       // We successfully added the current DataLink to this set,
00196       // meaning that it wasn't already a member.  We should tell
00197       // the DataLink about the send_start() event.
00198       itr->second->send_start();
00199 
00200     } else if (result == -1) {
00201       ACE_ERROR((LM_ERROR,
00202                  "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00203     }
00204 
00205     // Note that there is a possibility that the result == 1, which
00206     // means that the DataLink already exists in our map_->  We skip
00207     // all of these cases.
00208   }
00209 }
00210 
00211 ACE_INLINE void
00212 OpenDDS::DCPS::DataLinkSet::send_stop(RepoId repoId)
00213 {
00214   DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00215   // Iterate over our map_ and tell each DataLink about the send_stop() event.
00216   GuardType guard(this->lock_);
00217   for (MapType::iterator itr = map_.begin();
00218        itr != map_.end();
00219        ++itr) {
00220     itr->second->send_stop(repoId);
00221   }
00222 
00223   map_.clear();
00224 }
00225 
00226 ACE_INLINE void
00227 OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
00228 {
00229   target.clear();
00230 
00231   // Lock the existing map
00232   GuardType guard(this->lock_);
00233 
00234   // Copy to target
00235   for (MapType::iterator itr = map_.begin();
00236        itr != map_.end();
00237        ++itr) {
00238     target.insert(*itr);
00239   }
00240 }
00241 
00242 ACE_INLINE void
00243 OpenDDS::DCPS::DataLinkSet::send_final_acks(const RepoId& readerid)
00244 {
00245   GuardType guard(this->lock_);
00246   for (MapType::iterator itr = map_.begin();
00247        itr != map_.end();
00248        ++itr) {
00249     itr->second->send_final_acks(readerid);
00250   }
00251 
00252   map_.clear();
00253 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1