OpenDDS  Snapshot(2023/04/28-20:55)
TcpDataLink.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "TcpDataLink.h"
9 #include "TcpReceiveStrategy.h"
10 #include "TcpInst.h"
11 #include "TcpSendStrategy.h"
15 #include "dds/DCPS/GuidConverter.h"
16 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
17 #include "ace/Log_Msg.h"
18 
19 #if !defined (__ACE_INLINE__)
20 #include "TcpDataLink.inl"
21 #endif /* __ACE_INLINE__ */
22 
23 namespace {
25  const Encoding encoding_unaligned_native(Encoding::KIND_UNALIGNED_CDR);
26 }
27 
29 
31  const OpenDDS::DCPS::TcpTransport_rch& transport_impl,
32  const ACE_INET_Addr& remote_address,
33  Priority priority,
34  bool is_loopback,
35  bool is_active)
36  : DataLink(transport_impl, priority, is_loopback, is_active)
37  , remote_address_(remote_address)
38  , graceful_disconnect_sent_(false)
39  , release_is_pending_(false)
40 {
41  DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
42 }
43 
45 {
46  DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
47 }
48 
49 /// Called when the DataLink has been "stopped" for some reason. It could
50 /// be called from the DataLink::transport_shutdown() method (when the
51 /// TransportImpl is handling a shutdown() call). Or, it could be called
52 /// from the DataLink::release_reservations() method, when it discovers that
53 /// it has just released the last remaining reservations from the DataLink,
54 /// and the DataLink is in the process of "releasing" itself.
55 void
57 {
58  DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
59 
60  TcpConnection_rch connection(this->connection_.lock());
61  if (connection) {
62  // Tell the connection object to disconnect.
63  connection->disconnect();
64  }
65 }
66 
67 void
69 {
71  if (stopped_clients_.count(element->publication_id())) {
72  element->data_dropped(true);
73  } else {
74  DCPS::DataLink::send_i(element, relink);
75  }
76 }
77 
78 void
80 {
82  if (!stopped_clients_.count(repoId)) {
84  }
85 }
86 
87 bool
89 {
91  return stopped_clients_.count(local_id) == 0;
92 }
93 
94 void
96 {
98  stopped_clients_.insert(local_id);
99 
100  TcpSendStrategy_rch strategy = send_strategy();
101  if (strategy) {
102  strategy->remove_all_msgs(local_id);
103  }
104 }
105 
106 void
108 {
109  DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
110 
112 
114 
115  TcpConnection_rch connection(connection_.lock());
116 
117  if (rs) {
118  // If we received the GRACEFUL_DISCONNECT message from peer before we
119  // initiate the disconnecting of the datalink, then we will not send
120  // the GRACEFUL_DISCONNECT message to the peer.
121  bool disconnected = rs->gracefully_disconnected();
122 
123  if (connection && !graceful_disconnect_sent_ && !disconnected) {
126  }
127  }
128 
129  if (connection) {
130  connection->shutdown();
131  }
132 }
133 
134 /// The TcpTransport calls this method when it has an established
135 /// connection object for us. This call puts this TcpDataLink into
136 /// the "connected" state.
137 int
139  const TcpConnection_rch& connection,
142 {
143  DBG_ENTRY_LVL("TcpDataLink","connect",6);
144 
145  {
146  GuardType guard(strategy_lock_);
147  this->connection_ = connection;
148  }
149 
150  if (connection->peer().enable(ACE_NONBLOCK) == -1) {
152  "(%P|%t) ERROR: TcpDataLink::connect failed to set "
153  "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
154  }
155 
156  // Let connection know the datalink for callbacks upon reconnect failure.
157  connection->set_datalink(rchandle_from(this));
158 
159  // And lastly, inform our base class (DataLink) that we are now "connected",
160  // and it should start the strategy objects.
161  if (this->start(send_strategy, receive_strategy, false) != 0) {
162  // Our base (DataLink) class failed to start the strategy objects.
163  // We need to "undo" some things here before we return -1 to indicate
164  // that an error has taken place.
165 
166  // Drop our reference to the connection object.
167  this->connection_.reset();
168 
169  return -1;
170  }
171 
173  return 0;
174 }
175 
176 //Allows the passive side to detect that the active side is connecting again
177 //prior to discovery identifying the released datalink from the active side.
178 //The passive side still believes it has a connection to the remote, however,
179 //the connect has created a new link/connection, thus the passive side can try
180 //to reuse the existing structures but reset it to associate the datalink with
181 //this new connection.
182 int
184 {
185  DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
186 
187  if (this->is_active_) {
188  return -1;
189  }
190  //Need to check if connection is nil. If connection is not nil, then connection
191  //has previously gone through connection phase so this is a reuse of the connection
192  //proceed to determine if we can reuse/reset existing mechanisms or need to start from
193  //scratch.
194 
195  TcpConnection_rch old_connection(this->connection_.lock());
196 
197  if (old_connection) {
198  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
199  "trying to reuse existing connection\n"), 0);
200  old_connection->transfer(connection.in());
201 
202  //Connection already exists.
205 
206  if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
207  return -1;
208  } else {
209  brs = this->receive_strategy_;
210  bss = this->send_strategy_;
211 
212  this->connection_ = connection;
213 
214  TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
215 
216  TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
217 
218  // Associate the new connection object with the receiving strategy and disassociate
219  // the old connection object with the receiving strategy.
220  int rs_result = rs->reset(0, connection.in());
221 
222  // Associate the new connection object with the sending strategy and disassociate
223  // the old connection object with the sending strategy.
224  int ss_result = ss->reset(true);
225 
226  if (rs_result == 0 && ss_result == 0) {
228  return 0;
229  }
230  }
231  }
232  return -1;
233 }
234 
235 /// Associate the new connection object with this datalink object.
236 /// The states of the "old" connection object are copied to the new
237 /// connection object and the "old" connection object is replaced by
238 /// the new connection object.
239 int
241 {
242  DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
243 
244  TcpConnection_rch existing_connection(connection_.lock());
245  // Sanity check - the connection should exist already since we are reconnecting.
246  if (!existing_connection) {
248  "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
249  , 1);
250  return -1;
251  }
252 
253  existing_connection->transfer(connection.in());
254 
255  bool released = false;
258 
259  {
260  GuardType strategy_guard(strategy_lock_);
261 
264 
265  if (!trs || !tss) {
266  // if either are invalid, both should be
269  released = true;
270  }
271  }
272 
273  if (released) {
275  if (transport) {
276  const int result = transport->connect_tcp_datalink(*this, connection);
277  if (result == 0) {
279  }
280  return result;
281  }
282  return -1;
283  }
284 
285  connection_ = connection;
286 
287  // Associate the new connection object with the receiveing strategy and disassociate
288  // the old connection object with the receiveing strategy.
289  int rs_result = trs->reset(existing_connection.in(), connection.in());
290 
291  // Associate the new connection object with the sending strategy and disassociate
292  // the old connection object with the sending strategy.
293  int ss_result = tss->reset();
294 
295  if (rs_result == 0 && ss_result == 0) {
297  return 0;
298  }
299 
300  return -1;
301 }
302 
303 void
305 {
306  DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
307 
308  // Will clear all queued messages but still let the disconnect message
309  // sent.
310  this->send_strategy_->terminate_send(true);
311 
312  DataSampleHeader header_data;
313  // The message_id_ is the most important value for the DataSampleHeader.
314  header_data.message_id_ = GRACEFUL_DISCONNECT;
315 
316  // Other data in the DataSampleHeader are not necessary set. The bogus values
317  // can be used.
318 
319  //header_data.byte_order_
320  // = this->transport_->config()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER;
321  //header_data.message_length_ = 0;
322  //header_data.sequence_ = 0;
323  //DDS::Time_t source_timestamp
324  // = SystemTimePoint::now().to_dds_time();
325  //header_data.source_timestamp_sec_ = source_timestamp.sec;
326  //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
327  //header_data.coherency_group_ = 0;
328  //header_data.publication_id_ = 0;
329 
330  // TODO:
331  // It seems a bug in the transport implementation that the receiving side can
332  // not receive the message when the message has no sample data and is sent
333  // in a single packet.
334 
335  // To work around this problem, I have to add bogus data to chain with the
336  // DataSampleHeader to make the receiving work.
337 
338  Message_Block_Ptr data(
339  new ACE_Message_Block(20,
341  0, //cont
342  0, //data
343  0, //allocator_strategy
344  0, //locking_strategy
348  0,
349  0));
350  data->wr_ptr(20);
351 
352  header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
353 
354  Message_Block_Ptr message(
355  new ACE_Message_Block(header_data.get_max_serialized_size(),
357  data.release(), //cont
358  0, //data
359  0, //allocator_strategy
360  0, //locking_strategy
364  0,
365  0));
366 
367  *message << header_data;
368 
369  TransportControlElement* send_element = new TransportControlElement(move(message));
370 
371  // I don't want to rebuild a connection in order to send
372  // a graceful disconnect message.
373  this->send_i(send_element, false);
374 }
375 
376 void
378 {
379  this->release_is_pending_ = flag;
380 }
381 
382 bool
384 {
385  return release_is_pending_;
386 }
387 
388 bool
390 {
391  if (Transport_debug_level >= 1) {
392  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::handle_send_request_ack(%@) sequence number %q, publication_id=%C\n"),
393  element, element->sequence().getValue(), LogGuid(element->publication_id()).c_str()));
394  }
395  bool result = false;
396  TcpConnection_rch connection(connection_.lock());
397  if (connection) {
399  pending_request_acks_.push_back(element);
400  } else {
401  element->data_dropped(true);
402  result = true;
403  }
404  return result;
405 }
406 
407 void
409 {
410  const SequenceNumber sequence = sample.header_.sequence_;
411 
412  if (sequence == -1) {
413  return;
414  }
415 
416  if (Transport_debug_level >= 1) {
417  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received sequence number %q, publiction_id=%C\n"),
418  sequence.getValue(), LogGuid(sample.header_.publication_id_).c_str()));
419  }
420 
421  TransportQueueElement* elem=0;
422  {
423  // find the pending request with the same sequence number.
425  PendingRequestAcks::iterator it;
426  for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it) {
427  if ((*it)->sequence() == sequence && (*it)->publication_id() == sample.header_.publication_id_) {
428  elem = *it;
429  pending_request_acks_.erase(it);
430  break;
431  }
432  }
433  }
434 
435  if (elem) {
436  if (Transport_debug_level >= 1) {
437  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() found matching element %@\n"),
438  elem));
439  }
440  send_strategy()->deliver_ack_request(elem);
441  }
442  else {
443  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received unknown sequence number %q\n"),
444  sequence.getValue()));
445  }
446 }
447 
448 void
450 {
451  if (sample.header_.sequence_ == -1 && sample.header_.message_length_ == guid_cdr_size) {
452  GUID_t local;
453  Message_Block_Ptr payload(receive_strategy()->to_msgblock(sample));
454  Serializer ser(payload.get(), encoding_unaligned_native);
455  if (ser >> local) {
457  }
458  return;
459  }
460 
461  DataSampleHeader header_data;
462  // The message_id_ is the most important value for the DataSampleHeader.
463  header_data.message_id_ = SAMPLE_ACK;
464 
465  // Other data in the DataSampleHeader are not necessary set. The bogus values
466  // can be used.
467 
468  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
469  header_data.message_length_ = 0;
470  header_data.sequence_ = sample.header_.sequence_;
471  header_data.publication_id_ = sample.header_.publication_id_;
472  header_data.publisher_id_ = sample.header_.publisher_id_;
473 
474  Message_Block_Ptr message(
475  new ACE_Message_Block(header_data.get_max_serialized_size(),
477  0, //cont
478  0, //data
479  0, //allocator_strategy
480  0, //locking_strategy
484  0,
485  0));
486 
487  *message << header_data;
488 
489  TransportControlElement* send_element = new TransportControlElement(move(message));
490 
491 
492  // I don't want to rebuild a connection in order to send
493  // a sample ack message
494  this->send_i(send_element, false);
495 }
496 
497 void
499 {
500  // We have a connection.
501  // Invoke callbacks for readers so we can receive messages and let writers know we are ready.
502  typedef std::vector<std::pair<GUID_t, GUID_t> > PairVec;
503  PairVec to_call_and_send;
504 
505  {
506  GuardType guard(strategy_lock_);
507 
508  if (!connection_ || !send_strategy_) {
509  return;
510  }
511 
512  for (OnStartCallbackMap::const_iterator it = on_start_callbacks_.begin(); it != on_start_callbacks_.end(); ++it) {
513  for (RepoToClientMap::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) {
514  if (GuidConverter(it2->first).isReader()) {
515  to_call_and_send.push_back(std::make_pair(it2->first, it->first));
516  }
517  }
518  }
519  }
520 
522 
523  for (PairVec::const_iterator it = to_call_and_send.begin(); it != to_call_and_send.end(); ++it) {
524  invoke_on_start_callbacks(it->first, it->second, true);
525  send_association_msg(it->first, it->second);
526  }
527 }
528 
529 void
531 {
532  DataSampleHeader header_data;
533  header_data.message_id_ = REQUEST_ACK;
534  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
535  header_data.message_length_ = guid_cdr_size;
536  header_data.sequence_ = -1;
537  header_data.publication_id_ = local;
538  header_data.publisher_id_ = remote;
539 
540  Message_Block_Ptr message(
541  new ACE_Message_Block(header_data.get_max_serialized_size(),
543  0, //cont
544  0, //data
545  0, //allocator_strategy
546  0, //locking_strategy
550  0,
551  0));
552 
553  *message << header_data;
554  Serializer ser(message.get(), encoding_unaligned_native);
555  ser << remote;
556 
557  TransportControlElement* send_element = new TransportControlElement(move(message), local);
558 
559  this->send_i(send_element, false);
560 }
561 
562 void
564 {
566  PendingRequestAcks::iterator it;
567  for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it) {
568  (*it)->data_dropped(true);
569  }
570  pending_request_acks_.clear();
571 }
572 
575 {
576  GuardType guard(strategy_lock_);
578 }
579 
582 {
583  GuardType guard(strategy_lock_);
585 }
586 
587 int
589  const GUID_t& local_publication_id,
590  const TransportSendListener_wrch& send_listener,
591  bool reliable)
592 {
593  {
595  stopped_clients_.erase(local_publication_id);
596  }
597  const int result = DataLink::make_reservation(remote_subscription_id, local_publication_id, send_listener, reliable);
598  send_association_msg(local_publication_id, remote_subscription_id);
599  return result;
600 }
601 
602 int
604  const GUID_t& local_subscription_id,
605  const TransportReceiveListener_wrch& receive_listener,
606  bool reliable)
607 {
608  {
610  stopped_clients_.erase(local_subscription_id);
611  }
612  const int result = DataLink::make_reservation(remote_publication_id, local_subscription_id, receive_listener, reliable);
613  send_association_msg(local_subscription_id, remote_publication_id);
614 
615  return result;
616 }
617 
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
virtual SequenceNumber sequence() const
bool handle_send_request_ack(TransportQueueElement *element)
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
void request_ack_received(const ReceivedDataSample &sample)
size_t length(void) const
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463
bool data_dropped(bool dropped_by_transport=false)
bool isReader() const
Returns true if the GUID represents a reader entity.
const size_t guid_cdr_size
Definition: GuidUtils.h:115
int reset(bool reset_mode=false)
PendingRequestAcks pending_request_acks_
Definition: TcpDataLink.h:101
const char * c_str() const
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: TcpDataLink.cpp:68
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_CDR_BYTE_ORDER
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
void ack_received(const ReceivedDataSample &sample)
TcpDataLink(const TcpTransport_rch &transport_impl, const ACE_INET_Addr &remote_address, Priority priority, bool is_loopback, bool is_active)
Definition: TcpDataLink.cpp:30
void send_association_msg(const GUID_t &local, const GUID_t &remote)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
LM_DEBUG
virtual void send_stop_i(GUID_t repoId)
Definition: TcpDataLink.cpp:79
int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define ACE_NONBLOCK
int connect(const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
int reconnect(const TcpConnection_rch &connection)
TcpSendStrategy_rch send_strategy()
Holds a data sample received by the transport.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
AtomicBool release_is_pending_
Definition: TcpDataLink.h:98
TcpReceiveStrategy_rch receive_strategy()
bool check_active_client(const GUID_t &local_id)
Definition: TcpDataLink.cpp:88
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
char * wr_ptr(void) const
void set_release_pending(bool flag)
Set release pending flag.
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103
ACE_TEXT("TCP_Factory")
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
ACE_SYNCH_MUTEX pending_request_acks_lock_
Definition: TcpDataLink.h:100
void client_stop(const GUID_t &local_id)
Definition: TcpDataLink.cpp:95
ACE_CDR::Long Priority
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Sequence number abstraction. Only allows positive 64 bit values.
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447
static const ACE_Time_Value zero
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void pre_stop_i()
Definition: DataLink.cpp:993
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
bool is_release_pending() const
Get release pending flag.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
int reuse_existing_connection(const TcpConnection_rch &connection)
LM_ERROR
int reset(TcpConnection *old_connection, TcpConnection *new_connection)
Base wrapper class around a data/control sample to be sent.
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194