00001
00002
00003
00004
00005
00006
00007
00008 #include "EntryExit.h"
00009 #include "DataLink.h"
00010 #include "TransportSendElement.h"
00011 #include "dds/DCPS/DataSampleHeader.h"
00012 #include "dds/DCPS/Util.h"
00013 #include "dds/DCPS/Definitions.h"
00014 #include "dds/DCPS/GuidConverter.h"
00015
00016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00017 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00018 #include "TransportCustomizedElement.h"
00019 #endif
00020
00021 ACE_INLINE void
00022 OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
00023 {
00024 DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026 sample), 5);
00027 GuardType guard(this->lock_);
00028
00029 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00030 const bool customHeader =
00031 DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00032 #endif
00033
00034 TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_.size()), sample);
00035
00036 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039 if (customHeader) {
00040 typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041 FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042 GUIDSeq* guids = 0;
00043 if (fi != sample->get_filter_per_link().end()) {
00044 guids = fi->second.ptr();
00045 }
00046
00047 VDBG_LVL((LM_DEBUG,
00048 "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049 itr->second.in(), guids ? guids->length() : 0), 5);
00050
00051 Message_Block_Ptr mb (send_element->msg()->duplicate());
00052
00053 DataSampleHeader::add_cfentries(guids, mb.get());
00054
00055 TransportCustomizedElement* tce =
00056 new TransportCustomizedElement(send_element, false);
00057 tce->set_msg(move(mb));
00058
00059 itr->second->send(tce);
00060
00061 } else {
00062 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00063
00064
00065 itr->second->send(send_element);
00066
00067 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00068 }
00069 #endif
00070 }
00071 }
00072
00073 ACE_INLINE void
00074 OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
00075 {
00076 DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00077 VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00078 GuardType guard(this->lock_);
00079 TransportSendControlElement* send_element =
00080 new TransportSendControlElement(static_cast<int>(map_.size()), sample);
00081
00082 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00083 itr->second->send(send_element);
00084 }
00085 }
00086
00087 ACE_INLINE OpenDDS::DCPS::SendControlStatus
00088 OpenDDS::DCPS::DataLinkSet::send_control(RepoId pub_id,
00089 const TransportSendListener_rch& listener,
00090 const DataSampleHeader& header,
00091 Message_Block_Ptr msg)
00092 {
00093 DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00094
00095
00096 MapType dup_map;
00097 copy_map_to(dup_map);
00098
00099 if (dup_map.empty()) {
00100
00101 if (DCPS_debug_level > 4) {
00102 const GuidConverter converter(pub_id);
00103 ACE_DEBUG((LM_DEBUG,
00104 ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00105 ACE_TEXT("no links for publication %C, ")
00106 ACE_TEXT("not sending control message.\n"),
00107 OPENDDS_STRING(converter).c_str()));
00108 }
00109 listener->control_delivered(msg);
00110 return SEND_CONTROL_OK;
00111 }
00112
00113 TransportSendControlElement* const send_element =
00114 new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
00115 listener.in(), header, move(msg));
00116
00117 for (MapType::iterator itr = dup_map.begin();
00118 itr != dup_map.end();
00119 ++itr) {
00120 itr->second->send_start();
00121 itr->second->send(send_element);
00122 itr->second->send_stop(pub_id);
00123 }
00124
00125 return SEND_CONTROL_OK;
00126 }
00127
00128 ACE_INLINE void
00129 OpenDDS::DCPS::DataLinkSet::send_response(
00130 RepoId pub_id,
00131 const DataSampleHeader& header,
00132 Message_Block_Ptr response)
00133 {
00134 DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00135 GuardType guard(this->lock_);
00136
00137 TransportSendControlElement* const send_element =
00138 new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
00139 &send_response_listener_, header,
00140 move(response));
00141 if (!send_element) return;
00142 send_response_listener_.track_message();
00143
00144 for (MapType::iterator itr = map_.begin();
00145 itr != map_.end();
00146 ++itr) {
00147 itr->second->send_start();
00148 itr->second->send(send_element);
00149 itr->second->send_stop(pub_id);
00150 }
00151 }
00152
00153 ACE_INLINE bool
00154 OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
00155 {
00156 DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00157 GuardType guard(this->lock_);
00158 const MapType::iterator end = this->map_.end();
00159 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00160
00161 if (itr->second->remove_sample(sample, 0) == REMOVE_RELEASED) {
00162 return true;
00163 }
00164 }
00165
00166 return false;
00167 }
00168
00169 ACE_INLINE bool
00170 OpenDDS::DCPS::DataLinkSet::remove_all_msgs(RepoId pub_id)
00171 {
00172 DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00173 GuardType guard(this->lock_);
00174 const MapType::iterator end = this->map_.end();
00175 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00176 itr->second->remove_all_msgs(pub_id);
00177 }
00178
00179 return true;
00180 }
00181
00182 ACE_INLINE void
00183 OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* link_set)
00184 {
00185 DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00186 GuardType guard1(this->lock_);
00187 GuardType guard2(link_set->lock_);
00188 for (MapType::iterator itr = link_set->map_.begin();
00189 itr != link_set->map_.end();
00190 ++itr) {
00191
00192 int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00193
00194 if (result == 0) {
00195
00196
00197
00198 itr->second->send_start();
00199
00200 } else if (result == -1) {
00201 ACE_ERROR((LM_ERROR,
00202 "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00203 }
00204
00205
00206
00207
00208 }
00209 }
00210
00211 ACE_INLINE void
00212 OpenDDS::DCPS::DataLinkSet::send_stop(RepoId repoId)
00213 {
00214 DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00215
00216 GuardType guard(this->lock_);
00217 for (MapType::iterator itr = map_.begin();
00218 itr != map_.end();
00219 ++itr) {
00220 itr->second->send_stop(repoId);
00221 }
00222
00223 map_.clear();
00224 }
00225
00226 ACE_INLINE void
00227 OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
00228 {
00229 target.clear();
00230
00231
00232 GuardType guard(this->lock_);
00233
00234
00235 for (MapType::iterator itr = map_.begin();
00236 itr != map_.end();
00237 ++itr) {
00238 target.insert(*itr);
00239 }
00240 }
00241
00242 ACE_INLINE void
00243 OpenDDS::DCPS::DataLinkSet::send_final_acks(const RepoId& readerid)
00244 {
00245 GuardType guard(this->lock_);
00246 for (MapType::iterator itr = map_.begin();
00247 itr != map_.end();
00248 ++itr) {
00249 itr->second->send_final_acks(readerid);
00250 }
00251
00252 map_.clear();
00253 }