00001
00002
00003
00004
00005
00006
00007
00008 #include "EntryExit.h"
00009 #include "DataLink.h"
00010 #include "TransportSendElement.h"
00011 #include "dds/DCPS/DataSampleHeader.h"
00012 #include "dds/DCPS/Util.h"
00013 #include "dds/DCPS/Definitions.h"
00014 #include "dds/DCPS/GuidConverter.h"
00015
00016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00017 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00018 #include "TransportCustomizedElement.h"
00019 #endif
00020
00021 ACE_INLINE void
00022 OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
00023 {
00024 DBG_ENTRY_LVL("DataLinkSet", "send", 6);
00025 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
00026 sample), 5);
00027 GuardType guard(this->lock_);
00028 TransportSendElement* send_element =
00029 TransportSendElement::alloc(static_cast<int>(map_.size()), sample);
00030
00031 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00032 const bool customHeader =
00033 DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
00034 #endif
00035
00036 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00037
00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00039 if (customHeader) {
00040 typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
00041 FilterIter fi = sample->get_filter_per_link().find(itr->first);
00042 GUIDSeq* guids = 0;
00043 if (fi != sample->get_filter_per_link().end()) {
00044 guids = fi->second.ptr();
00045 }
00046
00047 VDBG_LVL((LM_DEBUG,
00048 "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
00049 itr->second.in(), guids ? guids->length() : 0), 5);
00050
00051 ACE_Message_Block* mb = sample->get_sample()->duplicate();
00052
00053 DataSampleHeader::add_cfentries(guids, mb);
00054
00055 TransportCustomizedElement* tce =
00056 TransportCustomizedElement::alloc(send_element, false,
00057 sample->get_transport_customized_element_allocator());
00058 tce->set_msg(mb);
00059
00060 itr->second->send(tce);
00061
00062 } else {
00063 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00064
00065
00066 itr->second->send(send_element);
00067
00068 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00069 }
00070 #endif
00071 }
00072 }
00073
00074 ACE_INLINE void
00075 OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
00076 {
00077 DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
00078 VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
00079 GuardType guard(this->lock_);
00080 TransportSendControlElement* send_element =
00081 TransportSendControlElement::alloc(static_cast<int>(map_.size()), sample,
00082 &send_control_element_allocator_);
00083 if (!send_element) {
00084 ACE_ERROR((LM_ERROR, "(%P|%t) DataLinkSet::send_control allocation of "
00085 "TransportSendControlElement failed\n"));
00086 return;
00087 }
00088 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00089 itr->second->send(send_element);
00090 }
00091 }
00092
00093 ACE_INLINE OpenDDS::DCPS::SendControlStatus
00094 OpenDDS::DCPS::DataLinkSet::send_control(RepoId pub_id,
00095 TransportSendListener* listener,
00096 const DataSampleHeader& header,
00097 ACE_Message_Block* msg,
00098 TransportSendControlElementAllocator* allocator)
00099 {
00100 DBG_ENTRY_LVL("DataLinkSet","send_control",6);
00101
00102
00103 MapType dup_map;
00104 copy_map_to(dup_map);
00105
00106 if (dup_map.empty()) {
00107
00108 if (DCPS_debug_level > 4) {
00109 const GuidConverter converter(pub_id);
00110 ACE_DEBUG((LM_DEBUG,
00111 ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
00112 ACE_TEXT("no links for publication %C, ")
00113 ACE_TEXT("not sending control message.\n"),
00114 OPENDDS_STRING(converter).c_str()));
00115 }
00116 listener->control_delivered(msg);
00117 return SEND_CONTROL_OK;
00118 }
00119
00120 TransportSendControlElementAllocator& use_alloc =
00121 allocator ? *allocator : send_control_element_allocator_;
00122
00123 TransportSendControlElement* const send_element =
00124 TransportSendControlElement::alloc(static_cast<int>(dup_map.size()), pub_id,
00125 listener, header, msg, &use_alloc);
00126 if (!send_element) return SEND_CONTROL_ERROR;
00127
00128 for (MapType::iterator itr = dup_map.begin();
00129 itr != dup_map.end();
00130 ++itr) {
00131 itr->second->send_start();
00132 itr->second->send(send_element);
00133 itr->second->send_stop(pub_id);
00134 }
00135
00136 return SEND_CONTROL_OK;
00137 }
00138
00139 ACE_INLINE void
00140 OpenDDS::DCPS::DataLinkSet::send_response(
00141 RepoId pub_id,
00142 const DataSampleHeader& header,
00143 ACE_Message_Block* response)
00144 {
00145 DBG_ENTRY_LVL("DataLinkSet","send_response",6);
00146 GuardType guard(this->lock_);
00147
00148 TransportSendControlElement* const send_element =
00149 TransportSendControlElement::alloc(static_cast<int>(map_.size()), pub_id,
00150 &send_response_listener_, header,
00151 response, &send_control_element_allocator_);
00152 if (!send_element) return;
00153 send_response_listener_.track_message();
00154
00155 for (MapType::iterator itr = map_.begin();
00156 itr != map_.end();
00157 ++itr) {
00158 itr->second->send_start();
00159 itr->second->send(send_element);
00160 itr->second->send_stop(pub_id);
00161 }
00162 }
00163
00164 ACE_INLINE bool
00165 OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
00166 {
00167 DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
00168 GuardType guard(this->lock_);
00169 const MapType::iterator end = this->map_.end();
00170 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00171
00172 if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
00173 return true;
00174 }
00175 }
00176
00177 return false;
00178 }
00179
00180 ACE_INLINE bool
00181 OpenDDS::DCPS::DataLinkSet::remove_all_msgs(RepoId pub_id)
00182 {
00183 DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
00184 GuardType guard(this->lock_);
00185 const MapType::iterator end = this->map_.end();
00186 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) {
00187 itr->second->remove_all_msgs(pub_id);
00188 }
00189
00190 return true;
00191 }
00192
00193 ACE_INLINE void
00194 OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* link_set)
00195 {
00196 DBG_ENTRY_LVL("DataLinkSet","send_start",6);
00197 GuardType guard1(this->lock_);
00198 GuardType guard2(link_set->lock_);
00199 for (MapType::iterator itr = link_set->map_.begin();
00200 itr != link_set->map_.end();
00201 ++itr) {
00202
00203 int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
00204
00205 if (result == 0) {
00206
00207
00208
00209 itr->second->send_start();
00210
00211 } else if (result == -1) {
00212 ACE_ERROR((LM_ERROR,
00213 "(%P|%t) ERROR: Failed to bind data link into set.\n"));
00214 }
00215
00216
00217
00218
00219 }
00220 }
00221
00222 ACE_INLINE void
00223 OpenDDS::DCPS::DataLinkSet::send_stop(RepoId repoId)
00224 {
00225 DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
00226
00227 GuardType guard(this->lock_);
00228 for (MapType::iterator itr = map_.begin();
00229 itr != map_.end();
00230 ++itr) {
00231 itr->second->send_stop(repoId);
00232 }
00233
00234 map_.clear();
00235 }
00236
00237 ACE_INLINE void
00238 OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
00239 {
00240 target.clear();
00241
00242
00243 GuardType guard(this->lock_);
00244
00245
00246 for (MapType::iterator itr = map_.begin();
00247 itr != map_.end();
00248 ++itr) {
00249 target.insert(*itr);
00250 }
00251 }
00252
00253 ACE_INLINE void
00254 OpenDDS::DCPS::DataLinkSet::send_final_acks(const RepoId& readerid)
00255 {
00256 GuardType guard(this->lock_);
00257 for (MapType::iterator itr = map_.begin();
00258 itr != map_.end();
00259 ++itr) {
00260 itr->second->send_final_acks(readerid);
00261 }
00262
00263 map_.clear();
00264 }