Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "EntryExit.h"
9 : #include "DataLink.h"
10 : #include "TransportSendElement.h"
11 : #include "dds/DCPS/DataSampleHeader.h"
12 : #include "dds/DCPS/Util.h"
13 : #include "dds/DCPS/Definitions.h"
14 : #include "dds/DCPS/GuidConverter.h"
15 :
16 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
17 : #include "dds/DdsDcpsGuidTypeSupportImpl.h"
18 : #include "TransportCustomizedElement.h"
19 : #endif
20 :
21 : ACE_INLINE void
22 0 : OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
23 : {
24 : DBG_ENTRY_LVL("DataLinkSet", "send", 6);
25 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
26 : sample), 5);
27 :
28 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
29 : const bool customHeader =
30 0 : DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
31 : #endif
32 :
33 0 : MapType map_copy;
34 : {
35 0 : GuardType guard(lock_);
36 0 : map_copy = map_;
37 0 : }
38 :
39 0 : if (map_copy.size()) {
40 0 : TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_copy.size()), sample);
41 0 : for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
42 :
43 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
44 0 : if (customHeader) {
45 : typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
46 0 : FilterIter fi = sample->get_filter_per_link().find(itr->first);
47 0 : GUIDSeq* guids = 0;
48 0 : if (fi != sample->get_filter_per_link().end()) {
49 0 : guids = fi->second.ptr();
50 : }
51 :
52 0 : VDBG_LVL((LM_DEBUG,
53 : "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
54 : itr->second.in(), guids ? guids->length() : 0), 5);
55 :
56 0 : Message_Block_Ptr mb (send_element->msg()->duplicate());
57 :
58 0 : DataSampleHeader::add_cfentries(guids, mb.get());
59 :
60 0 : TransportCustomizedElement* tce = new TransportCustomizedElement(send_element);
61 0 : tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain
62 :
63 0 : itr->second->send(tce);
64 :
65 0 : } else {
66 : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
67 :
68 : // Tell the DataLink to send it.
69 0 : itr->second->send(send_element);
70 :
71 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
72 : }
73 : #endif
74 : }
75 0 : } else if (sample->get_send_listener()) {
76 0 : sample->get_send_listener()->data_dropped(sample, true);
77 : }
78 0 : }
79 :
80 : ACE_INLINE void
81 0 : OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
82 : {
83 : DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
84 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
85 0 : MapType map_copy;
86 : {
87 0 : GuardType guard(lock_);
88 0 : map_copy = map_;
89 0 : }
90 :
91 : TransportSendControlElement* send_element =
92 0 : new TransportSendControlElement(static_cast<int>(map_copy.size()), sample);
93 :
94 0 : for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
95 0 : itr->second->send(send_element);
96 : }
97 0 : }
98 :
99 : ACE_INLINE OpenDDS::DCPS::SendControlStatus
100 0 : OpenDDS::DCPS::DataLinkSet::send_control(GUID_t pub_id,
101 : const TransportSendListener_rch& listener,
102 : const DataSampleHeader& header,
103 : Message_Block_Ptr msg)
104 : {
105 : DBG_ENTRY_LVL("DataLinkSet","send_control",6);
106 : //Optimized - use cached allocator.
107 :
108 0 : MapType dup_map;
109 0 : copy_map_to(dup_map);
110 :
111 0 : if (dup_map.empty()) {
112 : // similar to the "no links" case in TransportClient::send()
113 0 : if (DCPS_debug_level > 4) {
114 0 : const LogGuid logger(pub_id);
115 0 : ACE_DEBUG((LM_DEBUG,
116 : ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
117 : ACE_TEXT("no links for publication %C, ")
118 : ACE_TEXT("not sending control message.\n"),
119 : logger.c_str()));
120 0 : }
121 0 : listener->control_delivered(msg);
122 0 : return SEND_CONTROL_OK;
123 : }
124 :
125 : TransportSendControlElement* const send_element =
126 0 : new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
127 0 : listener.in(), header, move(msg));
128 :
129 0 : for (MapType::iterator itr = dup_map.begin();
130 0 : itr != dup_map.end();
131 0 : ++itr) {
132 0 : itr->second->send_start();
133 0 : itr->second->send(send_element);
134 0 : itr->second->send_stop(pub_id);
135 : }
136 :
137 0 : return SEND_CONTROL_OK;
138 0 : }
139 :
140 : ACE_INLINE void
141 0 : OpenDDS::DCPS::DataLinkSet::send_response(
142 : GUID_t pub_id,
143 : const DataSampleHeader& header,
144 : Message_Block_Ptr response)
145 : {
146 : DBG_ENTRY_LVL("DataLinkSet","send_response",6);
147 0 : GuardType guard(this->lock_);
148 :
149 : TransportSendControlElement* const send_element =
150 0 : new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
151 : &send_response_listener_, header,
152 0 : move(response));
153 0 : if (!send_element) return;
154 0 : send_response_listener_.track_message();
155 :
156 0 : for (MapType::iterator itr = map_.begin();
157 0 : itr != map_.end();
158 0 : ++itr) {
159 0 : itr->second->send_start();
160 0 : itr->second->send(send_element);
161 0 : itr->second->send_stop(pub_id);
162 : }
163 0 : }
164 :
165 : ACE_INLINE bool
166 0 : OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
167 : {
168 : DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
169 0 : MapType map_copy;
170 : {
171 0 : GuardType guard(lock_);
172 0 : map_copy = map_;
173 0 : }
174 0 : const MapType::iterator end = map_copy.end();
175 0 : for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
176 0 : if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
177 0 : return true;
178 : }
179 : }
180 :
181 0 : return false;
182 0 : }
183 :
184 : ACE_INLINE bool
185 0 : OpenDDS::DCPS::DataLinkSet::remove_all_msgs(const GUID_t& pub_id)
186 : {
187 : DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
188 0 : MapType map_copy;
189 : {
190 0 : GuardType guard(lock_);
191 0 : map_copy = map_;
192 0 : }
193 0 : const MapType::iterator end = map_copy.end();
194 0 : for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
195 0 : itr->second->remove_all_msgs(pub_id);
196 : }
197 :
198 0 : return true;
199 0 : }
200 :
201 : ACE_INLINE void
202 0 : OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* in)
203 : {
204 : DBG_ENTRY_LVL("DataLinkSet","send_start",6);
205 :
206 : typedef OPENDDS_VECTOR(DataLink_rch) SendStartVec;
207 0 : SendStartVec send_start_vec;
208 :
209 : {
210 0 : GuardType guard1(lock_);
211 0 : GuardType guard2(in->lock_);
212 0 : for (MapType::iterator itr = in->map_.begin(); itr != in->map_.end(); ++itr) {
213 : // Attempt to add the current DataLink to this set.
214 0 : int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
215 :
216 0 : if (result == 0) {
217 : // We successfully added the current DataLink to this set,
218 : // meaning that it wasn't already a member. We should tell
219 : // the DataLink about the send_start() event.
220 0 : send_start_vec.push_back(itr->second);
221 :
222 0 : } else if (result == -1) {
223 0 : ACE_ERROR((LM_ERROR,
224 : "(%P|%t) ERROR: Failed to bind data link into set.\n"));
225 : }
226 :
227 : // Note that there is a possibility that the result == 1, which
228 : // means that the DataLink already exists in our map_-> We skip
229 : // all of these cases.
230 : }
231 0 : }
232 :
233 0 : for (SendStartVec::iterator it = send_start_vec.begin(); it != send_start_vec.end(); ++it) {
234 0 : (*it)->send_start();
235 : }
236 0 : }
237 :
238 : ACE_INLINE void
239 0 : OpenDDS::DCPS::DataLinkSet::send_stop(GUID_t repoId)
240 : {
241 : DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
242 : // Iterate over our map_ and tell each DataLink about the send_stop() event.
243 0 : MapType map_copy;
244 : {
245 0 : GuardType guard(lock_);
246 0 : map_copy = map_;
247 0 : map_.clear();
248 0 : }
249 :
250 0 : for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
251 0 : itr->second->send_stop(repoId);
252 : }
253 0 : }
254 :
255 : ACE_INLINE void
256 0 : OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
257 : {
258 0 : target.clear();
259 :
260 : // Lock the existing map
261 0 : GuardType guard(this->lock_);
262 :
263 :
264 : // Copy to target
265 0 : for (MapType::iterator itr = map_.begin();
266 0 : itr != map_.end();
267 0 : ++itr) {
268 0 : target.insert(*itr);
269 : }
270 0 : }
271 :
272 : ACE_INLINE void
273 0 : OpenDDS::DCPS::DataLinkSet::send_final_acks(const GUID_t& readerid)
274 : {
275 0 : MapType map_copy;
276 : {
277 0 : GuardType guard(lock_);
278 0 : map_copy = map_;
279 0 : map_.clear();
280 0 : }
281 :
282 0 : for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
283 0 : itr->second->send_final_acks(readerid);
284 : }
285 0 : }
|