Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "TransportSendStrategy.h"
9 : #include "TransportStrategy.h"
10 : #include "ThreadPerConnectionSendTask.h"
11 : #include "EntryExit.h"
12 : #include "dds/DCPS/GuidConverter.h"
13 :
14 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
15 :
16 : namespace OpenDDS {
17 : namespace DCPS {
18 :
19 : ACE_INLINE
20 : Priority&
21 0 : DataLink::transport_priority()
22 : {
23 0 : return this->transport_priority_;
24 : }
25 :
26 : ACE_INLINE
27 : Priority
28 : DataLink::transport_priority() const
29 : {
30 : return this->transport_priority_;
31 : }
32 :
33 :
34 : ACE_INLINE
35 : bool& DataLink::is_loopback()
36 : {
37 : return this->is_loopback_;
38 : }
39 :
40 :
41 : ACE_INLINE
42 : bool DataLink::is_loopback() const
43 : {
44 : return this->is_loopback_;
45 : }
46 :
47 :
48 : ACE_INLINE
49 : bool& DataLink::is_active()
50 : {
51 : return this->is_active_;
52 : }
53 :
54 :
55 : ACE_INLINE
56 : bool DataLink::is_active() const
57 : {
58 : return this->is_active_;
59 : }
60 :
61 : ACE_INLINE const TimeDuration&
62 : DataLink::datalink_release_delay() const
63 : {
64 : return this->datalink_release_delay_;
65 : }
66 :
67 : ACE_INLINE void
68 0 : DataLink::send_start()
69 : {
70 : DBG_ENTRY_LVL("DataLink","send_start",6);
71 :
72 0 : if (this->thr_per_con_send_task_ != 0) {
73 0 : this->thr_per_con_send_task_->add_request(SEND_START);
74 :
75 : } else
76 0 : this->send_start_i();
77 0 : }
78 :
79 : ACE_INLINE void
80 0 : DataLink::send_start_i()
81 : {
82 : DBG_ENTRY_LVL("DataLink","send_start_i",6);
83 : // This one is easy. Simply delegate to our TransportSendStrategy
84 : // data member.
85 :
86 0 : TransportSendStrategy_rch strategy = get_send_strategy();
87 :
88 0 : if (!strategy.is_nil()) {
89 0 : strategy->send_start();
90 : }
91 0 : }
92 :
93 : ACE_INLINE void
94 0 : DataLink::send(TransportQueueElement* element)
95 : {
96 : DBG_ENTRY_LVL("DataLink","send",6);
97 :
98 0 : if (element->is_request_ack() && handle_send_request_ack(element)) {
99 0 : return;
100 : }
101 :
102 0 : element = this->customize_queue_element(element);
103 0 : if (!element) {
104 0 : return;
105 : }
106 :
107 0 : if (this->thr_per_con_send_task_ != 0) {
108 0 : if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
109 0 : element->data_dropped(true);
110 : }
111 :
112 : } else {
113 0 : this->send_i(element);
114 :
115 : }
116 : }
117 :
118 : ACE_INLINE void
119 0 : DataLink::send_i(TransportQueueElement* element, bool relink)
120 : {
121 : DBG_ENTRY_LVL("DataLink","send_i",6);
122 : // This one is easy. Simply delegate to our TransportSendStrategy
123 : // data member.
124 :
125 0 : TransportSendStrategy_rch strategy = get_send_strategy();
126 :
127 0 : if (strategy) {
128 0 : strategy->send(element, relink);
129 : } else {
130 0 : element->data_dropped(true);
131 : }
132 0 : }
133 :
134 : ACE_INLINE void
135 0 : DataLink::send_stop(GUID_t repoId)
136 : {
137 : DBG_ENTRY_LVL("DataLink","send_stop",6);
138 :
139 0 : if (this->thr_per_con_send_task_ != 0) {
140 0 : this->thr_per_con_send_task_->add_request(SEND_STOP);
141 :
142 : } else
143 0 : this->send_stop_i(repoId);
144 0 : }
145 :
146 : ACE_INLINE void
147 0 : DataLink::send_stop_i(GUID_t repoId)
148 : {
149 : DBG_ENTRY_LVL("DataLink","send_stop_i",6);
150 : // This one is easy. Simply delegate to our TransportSendStrategy
151 : // data member.
152 :
153 0 : TransportSendStrategy_rch strategy = get_send_strategy();
154 :
155 0 : if (!strategy.is_nil()) {
156 0 : strategy->send_stop(repoId);
157 : }
158 0 : }
159 :
160 : ACE_INLINE
161 0 : void DataLink::set_scheduling_release(bool scheduling_release)
162 : {
163 0 : this->scheduling_release_ = scheduling_release;
164 0 : }
165 :
166 : ACE_INLINE RemoveResult
167 0 : DataLink::remove_sample(const DataSampleElement* sample)
168 : {
169 : DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
170 :
171 0 : if (this->thr_per_con_send_task_ != 0) {
172 0 : const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
173 0 : if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
174 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
175 : "Removed sample from ThreadPerConnection queue.\n"));
176 0 : return rr;
177 : }
178 : }
179 :
180 0 : TransportSendStrategy_rch strategy = get_send_strategy();
181 :
182 0 : if (!strategy.is_nil()) {
183 0 : return strategy->remove_sample(sample);
184 : }
185 :
186 0 : return REMOVE_NOT_FOUND;
187 0 : }
188 :
189 : ACE_INLINE void
190 0 : DataLink::remove_all_msgs(const GUID_t& pub_id)
191 : {
192 : DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
193 :
194 : // This one is easy. Simply delegate to our TransportSendStrategy
195 : // data member.
196 :
197 0 : TransportSendStrategy_rch strategy = get_send_strategy();
198 :
199 0 : if (!strategy.is_nil()) {
200 0 : strategy->remove_all_msgs(pub_id);
201 : }
202 0 : }
203 :
204 : ACE_INLINE DataLinkIdType
205 0 : DataLink::id() const
206 : {
207 : DBG_ENTRY_LVL("DataLink","id",6);
208 0 : return id_;
209 : }
210 :
211 : ACE_INLINE int
212 : DataLink::start(const TransportSendStrategy_rch& send_strategy,
213 : const TransportStrategy_rch& receive_strategy, bool invoke_all)
214 : {
215 : DBG_ENTRY_LVL("DataLink","start",6);
216 :
217 : // We assume that the send_strategy is not NULL, but the receive_strategy
218 : // is allowed to be NULL.
219 :
220 : // Attempt to start the strategies, and if there is a start() failure,
221 : // make sure to stop() any strategy that was already start()'ed.
222 : if (send_strategy->start() != 0) {
223 : // Failed to start the TransportSendStrategy.
224 : invoke_on_start_callbacks(false);
225 : return -1;
226 : }
227 :
228 : if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
229 : // Failed to start the TransportReceiveStrategy.
230 :
231 : // Remember to stop() the TransportSendStrategy since we did start it,
232 : // and now need to "undo" that action.
233 : send_strategy->stop();
234 : invoke_on_start_callbacks(false);
235 : return -1;
236 : }
237 :
238 : // We started both strategy objects. Save them to data members since
239 : // we will now take ownership of them.
240 : {
241 : GuardType guard(this->strategy_lock_);
242 :
243 : this->send_strategy_ = send_strategy;
244 : this->receive_strategy_ = receive_strategy;
245 : }
246 : if (invoke_all) {
247 : invoke_on_start_callbacks(true);
248 : }
249 : {
250 : //catch any associations added during initial invoke_on_start_callbacks
251 : //only after first use_datalink has resolved does datalink's state truly
252 : //change to started, thus can't let pending associations proceed normally yet
253 : GuardType guard(this->strategy_lock_);
254 : this->started_ = true;
255 : }
256 : //Now state transitioned to started so no new on_start_callbacks will be added
257 : //so resolve any added during transition to started.
258 : if (invoke_all) {
259 : invoke_on_start_callbacks(true);
260 : }
261 : return 0;
262 : }
263 :
264 :
265 : ACE_INLINE
266 : const char*
267 0 : DataLink::connection_notice_as_str(ConnectionNotice notice)
268 : {
269 : static const char* NoticeStr[] = { "DISCONNECTED",
270 : "RECONNECTED",
271 : "LOST"
272 : };
273 :
274 0 : return NoticeStr [notice];
275 : }
276 :
277 : ACE_INLINE
278 : void
279 : DataLink::terminate_send()
280 : {
281 : TransportSendStrategy_rch strategy = get_send_strategy();
282 : if (strategy) {
283 : strategy->terminate_send(false);
284 : }
285 : }
286 :
287 : ACE_INLINE
288 : void
289 0 : DataLink::remove_listener(const GUID_t& local_id)
290 : {
291 0 : GuardType guard(pub_sub_maps_lock_);
292 : {
293 0 : IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
294 0 : if (pos != send_listeners_.end()) {
295 0 : send_listeners_.erase(pos);
296 0 : if (Transport_debug_level > 5) {
297 0 : LogGuid logger(local_id);
298 0 : ACE_DEBUG((LM_DEBUG,
299 : ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
300 : ACE_TEXT("removed %C from send_listeners\n"),
301 : logger.c_str()));
302 0 : }
303 0 : return;
304 : }
305 : }
306 : {
307 0 : IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
308 0 : if (pos != recv_listeners_.end()) {
309 0 : recv_listeners_.erase(pos);
310 0 : if (Transport_debug_level > 5) {
311 0 : LogGuid logger(local_id);
312 0 : ACE_DEBUG((LM_DEBUG,
313 : ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
314 : ACE_TEXT("removed %C from recv_listeners\n"),
315 : logger.c_str()));
316 0 : }
317 0 : return;
318 : }
319 : }
320 0 : }
321 :
322 : ACE_INLINE
323 : TransportSendListener_rch
324 0 : DataLink::send_listener_for(const GUID_t& pub_id) const
325 : {
326 : // pub_map_ (and send_listeners_) are already locked when entering this
327 : // private method.
328 : IdToSendListenerMap::const_iterator found =
329 0 : this->send_listeners_.find(pub_id);
330 0 : if (found == this->send_listeners_.end()) {
331 0 : return TransportSendListener_rch();
332 : }
333 0 : return found->second.lock();
334 : }
335 :
336 : ACE_INLINE
337 : TransportReceiveListener_rch
338 0 : DataLink::recv_listener_for(const GUID_t& sub_id) const
339 : {
340 : // sub_map_ (and recv_listeners_) are already locked when entering this
341 : // private method.
342 : IdToRecvListenerMap::const_iterator found =
343 0 : this->recv_listeners_.find(sub_id);
344 0 : if (found == this->recv_listeners_.end()) {
345 0 : return TransportReceiveListener_rch();
346 : }
347 0 : return found->second.lock();
348 : }
349 :
350 : ACE_INLINE
351 : void
352 : DataLink::default_listener(const TransportReceiveListener_wrch& trl)
353 : {
354 : GuardType guard(this->pub_sub_maps_lock_);
355 : this->default_listener_ = trl;
356 : }
357 :
358 : ACE_INLINE
359 : TransportReceiveListener_wrch
360 : DataLink::default_listener() const
361 : {
362 : GuardType guard(this->pub_sub_maps_lock_);
363 : return this->default_listener_;
364 : }
365 :
366 : ACE_INLINE
367 : void
368 0 : DataLink::send_final_acks (const GUID_t& /*readerid*/)
369 : {
370 0 : }
371 :
372 : ACE_INLINE
373 : TransportSendStrategy_rch
374 0 : DataLink::get_send_strategy()
375 : {
376 0 : GuardType guard(strategy_lock_);
377 0 : return send_strategy_;
378 0 : }
379 :
380 : }
381 : }
382 :
383 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|