DataLinkSet.inl

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "EntryExit.h"
00009 #include "DataLink.h"
00010 #include "TransportSendElement.h"
00011 #include "dds/DCPS/DataSampleHeader.h"
00012 #include "dds/DCPS/Util.h"
00013 #include "dds/DCPS/Definitions.h"
00014 #include "dds/DCPS/GuidConverter.h"
00015 
00016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00017 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00018 #include "TransportCustomizedElement.h"
00019 #endif
00020 
00021 ACE_INLINE void
00022 OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
00023 {
00024   DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026             sample), 5);
00027   GuardType guard(this->lock_);
00028   TransportSendElement* send_element =
00029     TransportSendElement::alloc(static_cast<int>(map_.size()), sample);
00030 
00031 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00032   const bool customHeader =
00033     DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00034 #endif
00035 
00036   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037 
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039     if (customHeader) {
00040       typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041       FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042       GUIDSeq* guids = 0;
00043       if (fi != sample->get_filter_per_link().end()) {
00044         guids = fi->second.ptr();
00045       }
00046 
00047       VDBG_LVL((LM_DEBUG,
00048         "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049         itr->second.in(), guids ? guids->length() : 0), 5);
00050 
00051       ACE_Message_Block* mb = sample->get_sample()->duplicate();
00052 
00053       DataSampleHeader::add_cfentries(guids, mb);
00054 
00055       TransportCustomizedElement* tce =
00056         TransportCustomizedElement::alloc(send_element, false,
00057           sample->get_transport_customized_element_allocator());
00058       tce->set_msg(mb); // tce now owns ACE_Message_Block chain
00059 
00060       itr->second->send(tce);
00061 
00062     } else {
00063 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00064 
00065       // Tell the DataLink to send it.
00066       itr->second->send(send_element);
00067 
00068 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00069     }
00070 #endif
00071   }
00072 }
00073 
00074 ACE_INLINE void
00075 OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
00076 {
00077   DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00078   VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00079   GuardType guard(this->lock_);
00080   TransportSendControlElement* send_element =
00081     TransportSendControlElement::alloc(static_cast<int>(map_.size()), sample,
00082                                        &send_control_element_allocator_);
00083   if (!send_element) {
00084     ACE_ERROR((LM_ERROR, "(%P|%t) DataLinkSet::send_control allocation of "
00085       "TransportSendControlElement failed\n"));
00086     return;
00087   }
00088   for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00089     itr->second->send(send_element);
00090   }
00091 }
00092 
00093 ACE_INLINE OpenDDS::DCPS::SendControlStatus
00094 OpenDDS::DCPS::DataLinkSet::send_control(RepoId                  pub_id,
00095                                          TransportSendListener*  listener,
00096                                          const DataSampleHeader& header,
00097                                          ACE_Message_Block*      msg,
00098                                          TransportSendControlElementAllocator* allocator)
00099 {
00100   DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00101   //Optimized - use cached allocator.
00102 
00103   MapType dup_map;
00104   copy_map_to(dup_map);
00105 
00106   if (dup_map.empty()) {
00107     // similar to the "no links" case in TransportClient::send()
00108     if (DCPS_debug_level > 4) {
00109       const GuidConverter converter(pub_id);
00110       ACE_DEBUG((LM_DEBUG,
00111                  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00112                  ACE_TEXT("no links for publication %C, ")
00113                  ACE_TEXT("not sending control message.\n"),
00114                  OPENDDS_STRING(converter).c_str()));
00115     }
00116     listener->control_delivered(msg);
00117     return SEND_CONTROL_OK;
00118   }
00119 
00120   TransportSendControlElementAllocator& use_alloc =
00121     allocator ? *allocator : send_control_element_allocator_;
00122 
00123   TransportSendControlElement* const send_element =
00124     TransportSendControlElement::alloc(static_cast<int>(dup_map.size()), pub_id,
00125                                        listener, header, msg, &use_alloc);
00126   if (!send_element) return SEND_CONTROL_ERROR;
00127 
00128   for (MapType::iterator itr = dup_map.begin();
00129        itr != dup_map.end();
00130        ++itr) {
00131     itr->second->send_start();
00132     itr->second->send(send_element);
00133     itr->second->send_stop(pub_id);
00134   }
00135 
00136   return SEND_CONTROL_OK;
00137 }
00138 
00139 ACE_INLINE void
00140 OpenDDS::DCPS::DataLinkSet::send_response(
00141   RepoId pub_id,
00142   const DataSampleHeader& header,
00143   ACE_Message_Block* response)
00144 {
00145   DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00146   GuardType guard(this->lock_);
00147 
00148   TransportSendControlElement* const send_element =
00149     TransportSendControlElement::alloc(static_cast<int>(map_.size()), pub_id,
00150                                        &send_response_listener_, header,
00151                                        response, &send_control_element_allocator_);
00152   if (!send_element) return;
00153   send_response_listener_.track_message();
00154 
00155   for (MapType::iterator itr = map_.begin();
00156        itr != map_.end();
00157        ++itr) {
00158     itr->second->send_start();
00159     itr->second->send(send_element);
00160     itr->second->send_stop(pub_id);
00161   }
00162 }
00163 
00164 ACE_INLINE bool
00165 OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
00166 {
00167   DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00168   GuardType guard(this->lock_);
00169   const MapType::iterator end = this->map_.end();
00170   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00171 
00172     if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
00173       return true;
00174     }
00175   }
00176 
00177   return false;
00178 }
00179 
00180 ACE_INLINE bool
00181 OpenDDS::DCPS::DataLinkSet::remove_all_msgs(RepoId pub_id)
00182 {
00183   DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00184   GuardType guard(this->lock_);
00185   const MapType::iterator end = this->map_.end();
00186   for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00187     itr->second->remove_all_msgs(pub_id);
00188   }
00189 
00190   return true;
00191 }
00192 
00193 ACE_INLINE void
00194 OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* link_set)
00195 {
00196   DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00197   GuardType guard1(this->lock_);
00198   GuardType guard2(link_set->lock_);
00199   for (MapType::iterator itr = link_set->map_.begin();
00200        itr != link_set->map_.end();
00201        ++itr) {
00202     // Attempt to add the current DataLink to this set.
00203     int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00204 
00205     if (result == 0) {
00206       // We successfully added the current DataLink to this set,
00207       // meaning that it wasn't already a member.  We should tell
00208       // the DataLink about the send_start() event.
00209       itr->second->send_start();
00210 
00211     } else if (result == -1) {
00212       ACE_ERROR((LM_ERROR,
00213                  "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00214     }
00215 
00216     // Note that there is a possibility that the result == 1, which
00217     // means that the DataLink already exists in our map_->  We skip
00218     // all of these cases.
00219   }
00220 }
00221 
00222 ACE_INLINE void
00223 OpenDDS::DCPS::DataLinkSet::send_stop(RepoId repoId)
00224 {
00225   DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00226   // Iterate over our map_ and tell each DataLink about the send_stop() event.
00227   GuardType guard(this->lock_);
00228   for (MapType::iterator itr = map_.begin();
00229        itr != map_.end();
00230        ++itr) {
00231     itr->second->send_stop(repoId);
00232   }
00233 
00234   map_.clear();
00235 }
00236 
00237 ACE_INLINE void
00238 OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
00239 {
00240   target.clear();
00241 
00242   // Lock the existing map
00243   GuardType guard(this->lock_);
00244 
00245   // Copy to target
00246   for (MapType::iterator itr = map_.begin();
00247        itr != map_.end();
00248        ++itr) {
00249     target.insert(*itr);
00250   }
00251 }
00252 
00253 ACE_INLINE void
00254 OpenDDS::DCPS::DataLinkSet::send_final_acks(const RepoId& readerid)
00255 {
00256   GuardType guard(this->lock_);
00257   for (MapType::iterator itr = map_.begin();
00258        itr != map_.end();
00259        ++itr) {
00260     itr->second->send_final_acks(readerid);
00261   }
00262 
00263   map_.clear();
00264 }

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7