OpenDDS  Snapshot(2023/04/28-20:55)
DataLink.inl
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 #include "TransportStrategy.h"
11 #include "EntryExit.h"
12 #include "dds/DCPS/GuidConverter.h"
13 
15 
16 namespace OpenDDS {
17 namespace DCPS {
18 
20 Priority&
22 {
23  return this->transport_priority_;
24 }
25 
29 {
30  return this->transport_priority_;
31 }
32 
33 
36 {
37  return this->is_loopback_;
38 }
39 
40 
43 {
44  return this->is_loopback_;
45 }
46 
47 
50 {
51  return this->is_active_;
52 }
53 
54 
56 bool DataLink::is_active() const
57 {
58  return this->is_active_;
59 }
60 
63 {
64  return this->datalink_release_delay_;
65 }
66 
67 ACE_INLINE void
69 {
70  DBG_ENTRY_LVL("DataLink","send_start",6);
71 
72  if (this->thr_per_con_send_task_ != 0) {
73  this->thr_per_con_send_task_->add_request(SEND_START);
74 
75  } else
76  this->send_start_i();
77 }
78 
79 ACE_INLINE void
81 {
82  DBG_ENTRY_LVL("DataLink","send_start_i",6);
83  // This one is easy. Simply delegate to our TransportSendStrategy
84  // data member.
85 
87 
88  if (!strategy.is_nil()) {
89  strategy->send_start();
90  }
91 }
92 
93 ACE_INLINE void
95 {
96  DBG_ENTRY_LVL("DataLink","send",6);
97 
98  if (element->is_request_ack() && handle_send_request_ack(element)) {
99  return;
100  }
101 
102  element = this->customize_queue_element(element);
103  if (!element) {
104  return;
105  }
106 
107  if (this->thr_per_con_send_task_ != 0) {
108  if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
109  element->data_dropped(true);
110  }
111 
112  } else {
113  this->send_i(element);
114 
115  }
116 }
117 
118 ACE_INLINE void
120 {
121  DBG_ENTRY_LVL("DataLink","send_i",6);
122  // This one is easy. Simply delegate to our TransportSendStrategy
123  // data member.
124 
126 
127  if (strategy) {
128  strategy->send(element, relink);
129  } else {
130  element->data_dropped(true);
131  }
132 }
133 
134 ACE_INLINE void
136 {
137  DBG_ENTRY_LVL("DataLink","send_stop",6);
138 
139  if (this->thr_per_con_send_task_ != 0) {
140  this->thr_per_con_send_task_->add_request(SEND_STOP);
141 
142  } else
143  this->send_stop_i(repoId);
144 }
145 
146 ACE_INLINE void
148 {
149  DBG_ENTRY_LVL("DataLink","send_stop_i",6);
150  // This one is easy. Simply delegate to our TransportSendStrategy
151  // data member.
152 
154 
155  if (!strategy.is_nil()) {
156  strategy->send_stop(repoId);
157  }
158 }
159 
161 void DataLink::set_scheduling_release(bool scheduling_release)
162 {
163  this->scheduling_release_ = scheduling_release;
164 }
165 
168 {
169  DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
170 
171  if (this->thr_per_con_send_task_ != 0) {
172  const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
173  if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
174  VDBG((LM_DEBUG, "(%P|%t) DBG: "
175  "Removed sample from ThreadPerConnection queue.\n"));
176  return rr;
177  }
178  }
179 
181 
182  if (!strategy.is_nil()) {
183  return strategy->remove_sample(sample);
184  }
185 
186  return REMOVE_NOT_FOUND;
187 }
188 
189 ACE_INLINE void
191 {
192  DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
193 
194  // This one is easy. Simply delegate to our TransportSendStrategy
195  // data member.
196 
198 
199  if (!strategy.is_nil()) {
200  strategy->remove_all_msgs(pub_id);
201  }
202 }
203 
206 {
207  DBG_ENTRY_LVL("DataLink","id",6);
208  return id_;
209 }
210 
211 ACE_INLINE int
213  const TransportStrategy_rch& receive_strategy, bool invoke_all)
214 {
215  DBG_ENTRY_LVL("DataLink","start",6);
216 
217  // We assume that the send_strategy is not NULL, but the receive_strategy
218  // is allowed to be NULL.
219 
220  // Attempt to start the strategies, and if there is a start() failure,
221  // make sure to stop() any strategy that was already start()'ed.
222  if (send_strategy->start() != 0) {
223  // Failed to start the TransportSendStrategy.
225  return -1;
226  }
227 
228  if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
229  // Failed to start the TransportReceiveStrategy.
230 
231  // Remember to stop() the TransportSendStrategy since we did start it,
232  // and now need to "undo" that action.
233  send_strategy->stop();
235  return -1;
236  }
237 
238  // We started both strategy objects. Save them to data members since
239  // we will now take ownership of them.
240  {
241  GuardType guard(this->strategy_lock_);
242 
243  this->send_strategy_ = send_strategy;
244  this->receive_strategy_ = receive_strategy;
245  }
246  if (invoke_all) {
248  }
249  {
250  //catch any associations added during initial invoke_on_start_callbacks
251  //only after first use_datalink has resolved does datalink's state truly
252  //change to started, thus can't let pending associations proceed normally yet
253  GuardType guard(this->strategy_lock_);
254  this->started_ = true;
255  }
256  //Now state transitioned to started so no new on_start_callbacks will be added
257  //so resolve any added during transition to started.
258  if (invoke_all) {
260  }
261  return 0;
262 }
263 
264 
266 const char*
268 {
269  static const char* NoticeStr[] = { "DISCONNECTED",
270  "RECONNECTED",
271  "LOST"
272  };
273 
274  return NoticeStr [notice];
275 }
276 
278 void
280 {
282  if (strategy) {
283  strategy->terminate_send(false);
284  }
285 }
286 
288 void
290 {
292  {
293  IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
294  if (pos != send_listeners_.end()) {
295  send_listeners_.erase(pos);
296  if (Transport_debug_level > 5) {
297  LogGuid logger(local_id);
299  ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
300  ACE_TEXT("removed %C from send_listeners\n"),
301  logger.c_str()));
302  }
303  return;
304  }
305  }
306  {
307  IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
308  if (pos != recv_listeners_.end()) {
309  recv_listeners_.erase(pos);
310  if (Transport_debug_level > 5) {
311  LogGuid logger(local_id);
313  ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
314  ACE_TEXT("removed %C from recv_listeners\n"),
315  logger.c_str()));
316  }
317  return;
318  }
319  }
320 }
321 
325 {
326  // pub_map_ (and send_listeners_) are already locked when entering this
327  // private method.
328  IdToSendListenerMap::const_iterator found =
329  this->send_listeners_.find(pub_id);
330  if (found == this->send_listeners_.end()) {
331  return TransportSendListener_rch();
332  }
333  return found->second.lock();
334 }
335 
339 {
340  // sub_map_ (and recv_listeners_) are already locked when entering this
341  // private method.
342  IdToRecvListenerMap::const_iterator found =
343  this->recv_listeners_.find(sub_id);
344  if (found == this->recv_listeners_.end()) {
346  }
347  return found->second.lock();
348 }
349 
351 void
353 {
354  GuardType guard(this->pub_sub_maps_lock_);
355  this->default_listener_ = trl;
356 }
357 
361 {
362  GuardType guard(this->pub_sub_maps_lock_);
363  return this->default_listener_;
364 }
365 
367 void
368 DataLink::send_final_acks (const GUID_t& /*readerid*/)
369 {
370 }
371 
375 {
376  GuardType guard(strategy_lock_);
377  return send_strategy_;
378 }
379 
380 }
381 }
382 
#define ACE_DEBUG(X)
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392
virtual TransportQueueElement * customize_queue_element(TransportQueueElement *element)
Definition: DataLink.h:360
const TimeDuration & datalink_release_delay() const
Definition: DataLink.inl:62
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463
bool data_dropped(bool dropped_by_transport=false)
RcHandle< TransportSendListener > TransportSendListener_rch
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
virtual void remove_all_msgs(const GUID_t &pub_id)
Definition: DataLink.inl:190
void set_scheduling_release(bool scheduling_release)
Definition: DataLink.inl:161
const char * c_str() const
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
TransportSendListener_rch send_listener_for(const GUID_t &pub_id) const
Definition: DataLink.inl:324
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
void send_stop(GUID_t repoId)
Definition: DataLink.inl:135
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_UINT64 DataLinkIdType
Identifier type for DataLink objects.
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
RemoveResult remove_sample(const DataSampleElement *sample)
LM_DEBUG
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
void send(TransportQueueElement *element, bool relink=true)
void send(TransportQueueElement *element)
Definition: DataLink.inl:94
#define VDBG(DBG_ARGS)
Priority & transport_priority()
Definition: DataLink.inl:21
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
Definition: DataLink.inl:205
TimeDuration datalink_release_delay_
Definition: DataLink.h:453
short Priority
ACE_TEXT("TCP_Factory")
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
void remove_listener(const GUID_t &local_id)
Definition: DataLink.inl:289
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
ACE_CDR::Long Priority
const char * connection_notice_as_str(ConnectionNotice notice)
Helper function to output the enum as a string to help debugging.
Definition: DataLink.inl:267
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TransportReceiveListener_wrch default_listener() const
Definition: DataLink.inl:360
TransportReceiveListener_rch recv_listener_for(const GUID_t &sub_id) const
Definition: DataLink.inl:338
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420
virtual RemoveResult remove_sample(const DataSampleElement *sample)
Definition: DataLink.inl:167
#define ACE_INLINE
Priority transport_priority_
TRANSPORT_PRIORITY value associated with the link.
Definition: DataLink.h:431
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void send_final_acks(const GUID_t &readerid)
Definition: DataLink.inl:368
Base wrapper class around a data/control sample to be sent.
virtual bool handle_send_request_ack(TransportQueueElement *element)
Definition: DataLink.cpp:1191
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194