OpenDDS  Snapshot(2023/04/28-20:55)
TransportSendStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "TransportSendStrategy.h"
11 
12 #include "RemoveAllVisitor.h"
13 #include "TransportInst.h"
14 #include "ThreadSynchStrategy.h"
15 #include "ThreadSynchResource.h"
16 #include "TransportQueueElement.h"
17 #include "TransportSendElement.h"
18 #include "TransportSendBuffer.h"
19 #include "BuildChainVisitor.h"
20 #include "QueueRemoveVisitor.h"
21 #include "PacketRemoveVisitor.h"
22 #include "TransportDefs.h"
23 #include "DirectPriorityMapper.h"
24 #include "EntryExit.h"
25 
29 
30 #include <ace/Reverse_Lock_T.h>
31 
32 #if !defined (__ACE_INLINE__)
34 #endif /* __ACE_INLINE__ */
35 
37 
38 namespace OpenDDS {
39 namespace DCPS {
40 
41 //TBD: The number of chunks of the replace element allocator
42 // is hard coded for now. This will be configurable when
43 // we implement the dds configurations. This value should
44 // be the number of marshalled DataSampleHeader that a
45 // packet could contain.
46 #define NUM_REPLACED_ELEMENT_CHUNKS 20
47 
48 namespace {
49  /// Arbitrary small constant that represents the minimum
50  /// amount of payload data we'll have in one fragment.
51  /// In this case "payload data" includes the content-filtering
52  /// GUID sequence, so this is chosen to be 4 + (16 * N).
53  static const size_t MIN_FRAG = 68;
54 }
55 
56 // I think 2 chunks for the header message block is enough
57 // - one for the original copy and one for duplicate which
58 // occurs every packet and is released after packet is sent.
59 // The data block only needs 1 chunk since the duplicate()
60 // just increases the ref count.
62  std::size_t id,
63  const TransportImpl_rch& transport,
64  ThreadSynchResource* synch_resource,
65  Priority priority,
66  const ThreadSynchStrategy_rch& thread_sync_strategy)
67  : ThreadSynchWorker(id),
71  max_header_size_(0),
72  header_block_(0),
73  pkt_chain_(0),
74  header_complete_(false),
75  start_counter_(0),
76  mode_(MODE_DIRECT),
77  mode_before_suspend_(MODE_NOT_SET),
78  lock_(),
79  replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
80  replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
81  transport_(transport),
82  graceful_disconnecting_(false),
83  link_released_(true),
84  send_buffer_(0)
85 {
86  DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
87 
88  TransportInst_rch cfg = transport->config();
89  if (cfg) {
93  }
94 
95  // Create a ThreadSynch object just for us.
96  DirectPriorityMapper mapper(priority);
97  synch_.reset(thread_sync_strategy->create_synch_object(
98  synch_resource,
99 #ifdef ACE_WIN32
101 #else
102  mapper.thread_priority(),
103 #endif
104  TheServiceParticipant->scheduler()));
105 
106  // We cache this value in data member since it doesn't change, and we
107  // don't want to keep asking for it over and over.
109 
110  delayed_delivered_notification_queue_.reserve(max_samples_);
111 }
112 
114 {
115  DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
116 
117 
118  delayed_delivered_notification_queue_.clear();
119 }
120 
121 void
123 {
125 
126  if (send_buffer_ != 0) {
127  send_buffer_->bind(this);
128  }
129 }
130 
133 {
134  DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
135 
136  SendPacketOutcome outcome;
137  bool no_more_work = false;
138 
139  { // scope for the guard(lock_);
140  GuardType guard(lock_);
141 
142  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(mode_)), 5);
143 
144  if (mode_ == MODE_TERMINATED) {
145  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
146  "Entered perform_work() and mode_ is MODE_TERMINATED - "
147  "we lost connection and could not reconnect, just return "
148  "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
150  }
151 
152  // The perform_work() is called by our synch_ object using
153  // a thread designated to call this method when it thinks
154  // we need to be called in order to "service" the queue_ and/or
155  // deal with a partially sent current packet.
156  //
157  // We will return a 0 if we don't see a need to have our perform_work()
158  // called again, and we will return a 1 if we do see the need to have our
159  // perform_work() method called again.
160 
161  // First, make sure that the mode_ indicates that we are, indeed, in
162  // the MODE_QUEUE mode. If we are not in MODE_QUEUE mode (meaning we are
163  // in MODE_DIRECT), then it means we didn't need to have this perform_work()
164  // method called - in this case, do nothing other than return
165  // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
166  // see a need for it to call our perform_work() again (at least not
167  // right now).
168  if (mode_ != MODE_QUEUE && mode_ != MODE_SUSPEND) {
169  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
170  "Entered perform_work() and mode_ is %C - just return "
171  "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(mode_)), 5);
173  }
174 
175  // Check the "state" of the current packet. We will either find that the
176  // current packet is in a state of being "partially sent", or we will find
177  // it in a state of being "empty". When the current packet is "empty", it
178  // means that it is time to build up the current packet using elements
179  // extracted from the queue_, and then we will attempt to send the
180  // packet. When we find the current packet in the "partially sent" state,
181  // we will not touch the queue_ - we will just try to send the unsent
182  // bytes in the current (partially sent) packet.
183  const size_t header_length = header_.length_;
184 
185  if (header_length == 0) {
186  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
187  "The current packet doesn't have any unsent bytes - we "
188  "need to 'populate' the current packet with elems from "
189  "the queue.\n"), 5);
190 
191  // The current packet is "empty". Build up the current packet using
192  // elements from the queue_, and prepare the current packet to be sent.
193 
194  // Before we build the packet from the queue_, let's make sure that
195  // there is actually something on the queue_ to build from.
196  if (queue_.size() == 0) {
197  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
198  "But the queue is empty. We have cleared the "
199  "backpressure situation.\n"),5);
200  // We are here because the queue_ is empty, and there isn't
201  // any "partial packet" bytes left to send. We have overcome
202  // the backpressure situation and don't have anything to do
203  // right now.
204 
205  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
206  "Flip mode to MODE_DIRECT, and return "
207  "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
208 
209  // Flip the mode back to MODE_DIRECT.
210  mode_ = MODE_DIRECT;
211 
212  // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
213  // perform_work() doesn't need to be called again (at this time).
215  }
216 
217  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
218  "There is at least one elem in the queue - get the packet "
219  "elems from the queue.\n"), 5);
220 
221  // There is stuff in the queue_ if we get to this point in the logic.
222  // Build-up the current packet using element(s) from the queue_.
224 
225  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
226  "Prepare the packet from the packet elems_.\n"), 5);
227 
228  // Now we can prepare the new packet to be sent.
229  prepare_packet();
230 
231  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
232  "Packet has been prepared from packet elems_.\n"), 5);
233 
234  } else {
235  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
236  "We have a current packet that still has unsent bytes.\n"), 5);
237  }
238 
239  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
240  "Attempt to send the current packet.\n"), 5);
241 
242  // Now we can attempt to send the current packet - whether it is
243  // a "partially sent" packet or one that we just built-up using elements
244  // from the queue_ (and subsequently prepared for sending) - it doesn't
245  // matter. Just attempt to send as many of the "unsent" bytes in the
246  // packet as possible.
247  outcome = send_packet();
248 
249  // If we sent the whole packet (eg, partial_send is false), and the queue_
250  // is now empty, then we've cleared the backpressure situation.
251  if ((outcome == OUTCOME_COMPLETE_SEND) && (queue_.size() == 0)) {
252  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
253  "Flip the mode to MODE_DIRECT, and then return "
254  "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
255 
256  // Revert back to MODE_DIRECT mode.
257  mode_ = MODE_DIRECT;
258  no_more_work = true;
259  }
260  } // End of scope for guard(lock_);
261 
262  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
263  "The outcome of the send_packet() was %d.\n", outcome), 5);
264 
266 
267  // If we sent the whole packet (eg, partial_send is false), and the queue_
268  // is now empty, then we've cleared the backpressure situation.
269  if (no_more_work) {
270  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
271  "We sent the whole packet, and there is nothing left on "
272  "the queue now.\n"), 5);
273 
274  // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
275  // don't desire another call to this perform_work() method.
277  }
278 
279  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
280  "We still have unsent bytes in the current packet AND/OR there "
281  "are still elements in the queue.\n"), 5);
282 
283  if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
284  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
285  "We lost our connection, or had some fatal connection "
286  "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
287 
288  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
289  "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
290 
291  bool do_suspend = true;
292  relink(do_suspend);
293 
294  if (mode_ == MODE_SUSPEND) {
295  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
296  "The reconnect has not done yet and we are still in MODE_SUSPEND. "
297  "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
298  // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
299  // don't desire another call to this perform_work() method.
301 
302  } else if (mode_ == MODE_TERMINATED) {
303  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
304  "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
306 
307  } else {
308  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
309  "Reconnect succeeded, Notify synch thread of work "
310  "availability.\n"), 5);
311  // If the datalink is re-established then notify the synch
312  // thread to perform work. We do not hold the object lock at
313  // this point.
314  synch_->work_available();
315  }
316  }
317 
318  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
319  "We still have an 'unbroken' connection.\n"), 5);
320 
321  if (outcome == OUTCOME_BACKPRESSURE) {
322  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
323  "We experienced backpressure on our attempt to send the "
324  "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
325  // We have a "clogged resource".
327  }
328 
329  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
330  "We may have sent the whole current packet, but still have "
331  "elements on the queue.\n"), 5);
332  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
333  "Or, we may have only partially sent the current packet.\n"), 5);
334 
335  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
336  "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
337 
338  // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
339  // in the queue_ to be sent. *OR* we have have had an OUTCOME_PARTIAL_SEND,
340  // which equates to the same thing - we still have work to do.
341 
342  // We are still in MODE_QUEUE mode, thus there is still work to be
343  // done to service the queue_ and/or a partially sent current packet.
344  // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
345  // want it to call this perform_work() method.
347 }
348 
349 // Now we need to "peel off" those message blocks that were fully
350 // sent, adjust the first message block with an unsent byte to have
351 // its rd_ptr() pointing to that first unsent byte, and set the
352 // pkt_chain_ to that first message block with an unsent byte.
353 // As we "peel off" fully sent message blocks, we need to also deal with
354 // fully sent elements by removing them from the elems_ and
355 // calling their data_delivered() method. In addition, as we peel off
356 // the message blocks that are fully sent, we need to untie them from
357 // the chain and release them.
358 // And finally, don't forget to adjust the header_.length_ to
359 // account for the num_bytes_sent (beware that some of the num_bytes_sent
360 // may be packet header bytes and shouldn't affect the header_.length_
361 // which doesn't include the packet header bytes.
362 int
364 {
365  DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
366 
367  VDBG((LM_DEBUG, "(%P|%t) DBG: "
368  "Adjusting the current packet because %d bytes of the packet "
369  "have been sent.\n", num_bytes_sent));
370 
371  ssize_t num_bytes_left = num_bytes_sent;
372  ssize_t num_non_header_bytes_sent = 0;
373 
374  VDBG((LM_DEBUG, "(%P|%t) DBG: "
375  "Set num_bytes_left to %d.\n", num_bytes_left));
376  VDBG((LM_DEBUG, "(%P|%t) DBG: "
377  "Set num_non_header_bytes_sent to %d.\n",
378  num_non_header_bytes_sent));
379 
380  VDBG((LM_DEBUG, "(%P|%t) DBG: "
381  "Peek at the element at the front of the packet elems_.\n"));
382 
383  // This is the element currently at the front of elems_.
384  TransportQueueElement* element = elems_.peek();
385 
386  if(!element){
387  ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
388  } else {
389  VDBG((LM_DEBUG, "(%P|%t) DBG: "
390  "Use the element's msg() to find the last block in "
391  "the msg() chain.\n"));
392 
393  // Get a pointer to the last message block in the element.
394  const ACE_Message_Block* elem_tail_block = element->msg();
395 
396  VDBG((LM_DEBUG, "(%P|%t) DBG: "
397  "Start with tail block == element->msg().\n"));
398 
399  while (elem_tail_block->cont() != 0) {
400  VDBG((LM_DEBUG, "(%P|%t) DBG: "
401  "Set tail block to its cont() block (next in chain).\n"));
402  elem_tail_block = elem_tail_block->cont();
403  }
404 
405  VDBG((LM_DEBUG, "(%P|%t) DBG: "
406  "Tail block now set (because tail block's cont() is 0).\n"));
407 
408  VDBG((LM_DEBUG, "(%P|%t) DBG: "
409  "Start the 'while (num_bytes_left > 0)' loop.\n"));
410 
411  while (num_bytes_left > 0) {
412  VDBG((LM_DEBUG, "(%P|%t) DBG: "
413  "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
414  num_bytes_left));
415 
416  const int block_length = static_cast<int>(pkt_chain_->length());
417 
418  VDBG((LM_DEBUG, "(%P|%t) DBG: "
419  "Length of block at front of pkt_chain_ is [%d].\n",
420  block_length));
421 
422  if (block_length <= num_bytes_left) {
423  VDBG((LM_DEBUG, "(%P|%t) DBG: "
424  "The whole block at the front of pkt_chain_ was sent.\n"));
425 
426  // The entire message block at the front of the chain has been sent.
427  // Detach the head message block from the chain and adjust
428  // the pkt_chain_ to point to the next block (if any) in
429  // the chain.
430  VDBG((LM_DEBUG, "(%P|%t) DBG: "
431  "Extract the fully sent block from the pkt_chain_.\n"));
432 
433  ACE_Message_Block* fully_sent_block = pkt_chain_;
434 
435  VDBG((LM_DEBUG, "(%P|%t) DBG: "
436  "Set pkt_chain_ to pkt_chain_->cont().\n"));
437 
439 
440  VDBG((LM_DEBUG, "(%P|%t) DBG: "
441  "Set the fully sent block's cont() to 0.\n"));
442 
443  fully_sent_block->cont(0);
444 
445  // Update the num_bytes_left to indicate that we have
446  // processed the entire length of the block.
447  num_bytes_left -= block_length;
448 
449  VDBG((LM_DEBUG, "(%P|%t) DBG: "
450  "Updated num_bytes_left to account for fully sent "
451  "block (block_length == [%d]).\n", block_length));
452  VDBG((LM_DEBUG, "(%P|%t) DBG: "
453  "Now, num_bytes_left == [%d].\n", num_bytes_left));
454 
455  if (!header_complete_) {
456  VDBG((LM_DEBUG, "(%P|%t) DBG: "
457  "Since the header_complete_ flag is false, it means "
458  "that the packet header block was still in the "
459  "pkt_chain_.\n"));
460 
461  VDBG((LM_DEBUG, "(%P|%t) DBG: "
462  "Not anymore... Set the header_complete_ flag "
463  "to true.\n"));
464 
465  // That was the packet header block. And now we know that it
466  // has been completely sent.
467  header_complete_ = true;
468 
469  VDBG((LM_DEBUG, "(%P|%t) DBG: "
470  "Release the fully sent block.\n"));
471 
472  // Release the fully_sent_block
473  fully_sent_block->release();
474 
475  } else {
476  VDBG((LM_DEBUG, "(%P|%t) DBG: "
477  "Since the header_complete_ flag is true, it means "
478  "that the packet header block was not in the "
479  "pkt_chain_.\n"));
480  VDBG((LM_DEBUG, "(%P|%t) DBG: "
481  "So, the fully sent block was part of an element.\n"));
482 
483  // That wasn't the packet header block. It was from the
484  // element currently at the front of the elems_
485  // collection. If it was the last block from the
486  // element, then we need to extract the element from the
487  // elems_ collection and invoke data_delivered() on it.
488  num_non_header_bytes_sent += block_length;
489 
490  VDBG((LM_DEBUG, "(%P|%t) DBG: "
491  "Updated num_non_header_bytes_sent to account for "
492  "fully sent block (block_length == [%d]).\n",
493  block_length));
494 
495  VDBG((LM_DEBUG, "(%P|%t) DBG: "
496  "Now, num_non_header_bytes_sent == [%d].\n",
497  num_non_header_bytes_sent));
498 
499  if (fully_sent_block->base() == elem_tail_block->base()) {
500  VDBG((LM_DEBUG, "(%P|%t) DBG: "
501  "Ok. The fully sent block was a duplicate of "
502  "the tail block of the element that is at the "
503  "front of the packet elems_.\n"));
504 
505  VDBG((LM_DEBUG, "(%P|%t) DBG: "
506  "This means that we have completely sent the "
507  "element at the front of the packet elems_.\n"));
508 
509  // This means that we have completely sent the element
510  // that is currently at the front of the elems_ collection.
511 
512  VDBG((LM_DEBUG, "(%P|%t) DBG: "
513  "We can release the fully sent block now.\n"));
514 
515  // Release the fully_sent_block
516  fully_sent_block->release();
517 
518  VDBG((LM_DEBUG, "(%P|%t) DBG: "
519  "We can extract the element from the front of "
520  "the packet elems_ (we were just peeking).\n"));
521 
522  // Extract the element from the elems_ collection
523  element = elems_.get();
524 
525  VDBG((LM_DEBUG, "(%P|%t) DBG: "
526  "Tell the element that a decision has been made "
527  "regarding its fate - data_delivered().\n"));
528 
529  // Inform the element that the data has been delivered.
530  add_delayed_notification(element);
531 
532  VDBG((LM_DEBUG, "(%P|%t) DBG: "
533  "Peek at the next element in the packet "
534  "elems_.\n"));
535 
536  // Set up for the next element in elems_ by peek()'ing.
537  element = elems_.peek();
538 
539  if (element != 0) {
540  VDBG((LM_DEBUG, "(%P|%t) DBG: "
541  "The is an element still in the packet "
542  "elems_ (we are peeking at it now).\n"));
543 
544  VDBG((LM_DEBUG, "(%P|%t) DBG: "
545  "We are going to find the tail block for the "
546  "current element (we are peeking at).\n"));
547 
548  // There was a "next element". Determine the
549  // elem_tail_block for it.
550  elem_tail_block = element->msg();
551 
552  VDBG((LM_DEBUG, "(%P|%t) DBG: "
553  "Start w/tail block == element->msg().\n"));
554 
555  while (elem_tail_block->cont() != 0) {
556  VDBG((LM_DEBUG, "(%P|%t) DBG: "
557  "Set tail block to next in chain.\n"));
558  elem_tail_block = elem_tail_block->cont();
559  }
560 
561  VDBG((LM_DEBUG, "(%P|%t) DBG: "
562  "Done finding tail block.\n"));
563  }
564 
565  } else {
566  VDBG((LM_DEBUG, "(%P|%t) DBG: "
567  "Ok. The fully sent block is *not* a "
568  "duplicate of the tail block of the element "
569  "at the front of the packet elems_.\n"));
570 
571  VDBG((LM_DEBUG, "(%P|%t) DBG: "
572  "Thus, we have not completely sent the "
573  "element yet.\n"));
574 
575  // We didn't completely send the element - it has more
576  // message blocks that haven't been sent (that we know of).
577 
578  VDBG((LM_DEBUG, "(%P|%t) DBG: "
579  "We can release the fully_sent_block now.\n"));
580 
581  // Release the fully_sent_block
582  fully_sent_block->release();
583  }
584  }
585 
586  } else {
587  VDBG((LM_DEBUG, "(%P|%t) DBG: "
588  "Only part of the block at the front of pkt_chain_ "
589  "was sent.\n"));
590 
591  VDBG((LM_DEBUG, "(%P|%t) DBG: "
592  "Advance the rd_ptr() of the front block (of pkt_chain_) "
593  "by the num_bytes_left (%d).\n", num_bytes_left));
594 
595  // Only part of the current block was sent.
596  pkt_chain_->rd_ptr(num_bytes_left);
597 
598  if (header_complete_) {
599  VDBG((LM_DEBUG, "(%P|%t) DBG: "
600  "And since the packet header block has already been "
601  "completely sent, add num_bytes_left to the "
602  "num_non_header_bytes_sent.\n"));
603 
604  VDBG((LM_DEBUG, "(%P|%t) DBG: "
605  "Before, num_non_header_bytes_sent == %d.\n",
606  num_non_header_bytes_sent));
607 
608  // We know that the current block isn't the packet header
609  // block because the packet header block has already been
610  // completely sent. We need to count these bytes in the
611  // num_non_header_bytes_sent.
612  num_non_header_bytes_sent += num_bytes_left;
613 
614  VDBG((LM_DEBUG, "(%P|%t) DBG: "
615  "After, num_non_header_bytes_sent == %d.\n",
616  num_non_header_bytes_sent));
617  }
618 
619  VDBG((LM_DEBUG, "(%P|%t) DBG: "
620  "Set the num_bytes_left to 0 now.\n"));
621 
622  num_bytes_left = 0;
623  }
624  }
625  }
626 
627  VDBG((LM_DEBUG, "(%P|%t) DBG: "
628  "The 'num_bytes_left' loop has completed.\n"));
629 
630  VDBG((LM_DEBUG, "(%P|%t) DBG: "
631  "Adjust the header_.length_ to account for the "
632  "num_non_header_bytes_sent.\n"));
633  VDBG((LM_DEBUG, "(%P|%t) DBG: "
634  "Before, header_.length_ == %d.\n",
635  header_.length_));
636 
637  // Adjust the packet header_.length_ to indicate how many non header
638  // bytes are left to send.
639  header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
640 
641  VDBG((LM_DEBUG, "(%P|%t) DBG: "
642  "After, header_.length_ == %d.\n",
643  header_.length_));
644 
645  // Returns 0 if the entire packet was sent, and returns 1 otherwise.
646  int rc = (header_.length_ == 0) ? 0 : 1;
647 
648  VDBG((LM_DEBUG, "(%P|%t) DBG: "
649  "Adjustments all done. Returning [%d]. 0 means entire packet "
650  "has been sent. 1 means otherwise.\n",
651  rc));
652 
653  return rc;
654 }
655 
656 bool
658 {
659  DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
660  TransportQueueElement* sample = 0;
662 
664 
665  size_t num_delayed_notifications = 0;
666  bool found_element = false;
667 
668  {
669  GuardType guard(lock_);
670 
671  num_delayed_notifications = delayed_delivered_notification_queue_.size();
672 
673  if (num_delayed_notifications == 0) {
674  return false;
675 
676  } else if (num_delayed_notifications == 1) {
677  // Optimization for the most common case (doesn't need vectors)
678 
679  if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
680  found_element = true;
681  sample = delayed_delivered_notification_queue_[0].first;
682  mode = delayed_delivered_notification_queue_[0].second;
683 
684  delayed_delivered_notification_queue_.clear();
685  }
686 
687  } else {
688  OPENDDS_VECTOR(TQESendModePair)::iterator iter;
689  for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
690  sample = iter->first;
691  mode = iter->second;
692  if (!match || match->matches(*sample)) {
693  found_element = true;
694  samples.push_back(*iter);
695  iter = delayed_delivered_notification_queue_.erase(iter);
696  } else {
697  ++iter;
698  }
699  }
700  }
701  }
702 
703  if (!found_element) {
704  return false;
705  }
706 
707  bool transport_shutdown = true;
708  TransportImpl_rch transport = transport_.lock();
709  if (transport) {
710  transport_shutdown = transport->is_shut_down();
711  }
712 
713  if (num_delayed_notifications == 1) {
714  // optimization for the common case
715  if (mode == MODE_TERMINATED) {
716  if (!transport_shutdown || sample->owned_by_transport()) {
717  sample->data_dropped(true);
718  }
719  } else {
720  if (!transport_shutdown || sample->owned_by_transport()) {
721  sample->data_delivered();
722  }
723  }
724 
725  } else {
726  for (size_t i = 0; i < samples.size(); ++i) {
727  if (samples[i].second == MODE_TERMINATED) {
728  if (!transport_shutdown || samples[i].first->owned_by_transport()) {
729  samples[i].first->data_dropped(true);
730  }
731  } else {
732  if (!transport_shutdown || samples[i].first->owned_by_transport()) {
733  samples[i].first->data_delivered();
734  }
735  }
736  }
737  }
738  return true;
739 }
740 
741 /// Remove all samples in the backpressure queue and packet queue.
742 void
743 TransportSendStrategy::terminate_send(bool graceful_disconnecting)
744 {
745  DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
746 
747  bool reset_flag = true;
748 
749  {
750  GuardType guard(lock_);
751 
752  // If the terminate_send call due to a non-graceful disconnection before
753  // a datalink shutdown then we will not try to send the graceful disconnect
754  // message.
755  if ((mode_ == MODE_TERMINATED || mode_ == MODE_SUSPEND)
757  VDBG((LM_DEBUG, "(%P|%t) DBG: "
758  "It was already terminated non gracefully, will not set to graceful disconnecting\n"));
759  reset_flag = false;
760  }
761  }
762 
763  VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED\n"));
764 
766 
767  if (reset_flag) {
768  GuardType guard(lock_);
769  graceful_disconnecting_ = graceful_disconnecting;
770  }
771 }
772 
773 void
775 {
776 }
777 
778 void
780 {
781  DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
782 
784  QueueType elems;
785  QueueType queue;
786  {
787  GuardType guard(lock_);
788 
789  if (old_mode != MODE_NOT_SET && mode_ != old_mode)
790  return;
791 
792  if (header_.length_ > 0 && pkt_chain_) {
793  // Clear the messages in the pkt_chain_ that is partially sent.
794  // We just reuse these functions for normal partial send except actual sending.
795  int num_bytes_left = static_cast<int>(pkt_chain_->total_length());
796  int result = adjust_packet_after_send(num_bytes_left);
797 
798  if (result == 0) {
799  VDBG((LM_DEBUG, "(%P|%t) DBG: "
800  "The adjustment logic says that the packet is cleared.\n"));
801 
802  } else {
803  VDBG((LM_DEBUG, "(%P|%t) DBG: "
804  "The adjustment returned partial sent.\n"));
805  }
806  }
807 
808  elems.swap(elems_);
809  queue.swap(queue_);
810 
811  header_.length_ = 0;
812  pkt_chain_ = 0;
813  header_complete_ = false;
814  start_counter_ = 0;
815  mode_ = new_mode;
817  }
818 
819  // We need remove the queued elements outside the lock,
820  // otherwise we have a deadlock situation when remove visitor
821  // calls the data_dropped on each dropped elements.
822 
823  // Clear all samples in queue.
824  RemoveAllVisitor remove_all_visitor;
825 
826  elems.accept_remove_visitor(remove_all_visitor);
827  queue.accept_remove_visitor(remove_all_visitor);
828 }
829 
830 int
832 {
833  DBG_ENTRY_LVL("TransportSendStrategy","start",6);
834 
835  {
836  GuardType guard(lock_);
837 
838  if (!start_i()) {
839  return -1;
840  }
841  }
842 
843  size_t header_chunks(1);
844 
845  // If a secondary send buffer is bound, sent headers should
846  // be cached to properly maintain the buffer:
847  if (send_buffer_ != 0) {
848  header_chunks += send_buffer_->capacity();
849 
850  } else {
851  header_chunks += 1;
852  }
853 
854  header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
855  header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
856  header_db_lock_pool_.reset(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())));
857  header_data_allocator_.reset(new DataAllocator(TheServiceParticipant->association_chunk_multiplier(), max_header_size_));
858 
859  // Since we (the TransportSendStrategy object) are a reference-counted
860  // object, but the synch_ object doesn't necessarily know this, we need
861  // to give a "copy" of a reference to ourselves to the synch_ object here.
862  // We will do the reverse when we unregister ourselves (as a worker) from
863  // the synch_ object.
864 
865  if (synch_->register_worker(*this) == -1) {
866 
868  "(%P|%t) ERROR: TransportSendStrategy failed to register "
869  "as a worker with the ThreadSynch object.\n"),
870  -1);
871  }
872 
873  return 0;
874 }
875 
876 void
878 {
879  DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
880 
881  if (header_block_ != 0) {
883  header_block_ = 0;
884  }
885 
886  synch_->unregister_worker();
887 
888  QueueType elems;
889  QueueType queue;
890  {
891  GuardType guard(lock_);
892 
893  if (pkt_chain_ != 0) {
894  size_t size = pkt_chain_->total_length();
895  if (size > 0) {
896  pkt_chain_->release();
898  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
899  ACE_TEXT("terminating with %d unsent bytes.\n"),
900  size));
901  }
902  pkt_chain_ = 0;
903  }
904 
905  if (elems_.size()) {
906  elems_.swap(elems);
908  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
909  ACE_TEXT("terminating with %d unsent elements.\n"),
910  elems_.size()));
911  }
912 
913  if (queue_.size()) {
914  queue_.swap(queue);
916  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
917  ACE_TEXT("terminating with %d queued elements.\n"),
918  queue_.size()));
919  }
920  }
921 
922  RemoveAllVisitor remove_all_visitor;
923 
924  elems.accept_remove_visitor(remove_all_visitor);
925  queue.accept_remove_visitor(remove_all_visitor);
926 
927  {
928  GuardType guard(lock_);
929 
930  stop_i();
931  }
932 }
933 
934 void
936 {
937  if (Transport_debug_level > 9) {
939  ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
940  ACE_TEXT("sending data at 0x%x.\n"),
941  id(), element));
942  }
943 
944  DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
945 
946  {
947  GuardType guard(lock_);
948 
949  if (link_released_) {
950  add_delayed_notification(element);
951 
952  } else {
954  VDBG((LM_DEBUG, "(%P|%t) DBG: "
955  "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
956  "graceful disconnecting, so discard message.\n"));
957  guard.release();
958  element->data_dropped(true);
959  return;
960  }
961 
962  size_t element_length = element->msg()->total_length();
963 
964  VDBG((LM_DEBUG, "(%P|%t) DBG: "
965  "Send element msg() has total_length() == [%d].\n",
966  element_length));
967 
968  VDBG((LM_DEBUG, "(%P|%t) DBG: "
969  "max_header_size_ == [%d].\n",
971 
972  VDBG((LM_DEBUG, "(%P|%t) DBG: "
973  "max_size_ == [%d].\n",
974  max_size_));
975 
976  const size_t max_msg_size = max_message_size();
977 
978  // Really an assert. We can't accept any element that wouldn't fit into
979  // a transport packet by itself (ie, it would be the only element in the
980  // packet). This max_size_ is the user-configurable maximum, not based
981  // on the transport's inherent maximum message size. If max_msg_size
982  // is non-zero, we will fragment so max_size_ doesn't apply per-element.
983  if (max_msg_size == 0 &&
984  max_header_size_ + element_length > max_size_) {
986  "(%P|%t) ERROR: Element too large (%Q) "
987  "- won't fit into packet.\n", ACE_UINT64(element_length)));
988  return;
989  }
990 
991  // Check the mode_ to see if we simply put the element on the queue.
992  if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
993  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
994  "mode_ == %C, so queue elem and leave.\n",
995  mode_as_str(mode_)), 5);
996 
997  queue_.put(element);
998 
999  if (mode_ != MODE_SUSPEND) {
1000  synch_->work_available();
1001  }
1002 
1003  return;
1004  }
1005 
1006  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1007  "mode_ == MODE_DIRECT.\n"));
1008 
1009  // We are in the MODE_DIRECT send mode. When in this mode, the send()
1010  // calls will "build up" the transport packet to be sent directly when it
1011  // reaches the optimal size, contains the maximum number of samples, etc.
1012 
1013  // We need to check if the current element (the arg passed-in to this
1014  // send() method) should be appended to the transport packet, or if the
1015  // transport packet should be sent (directly) first, dealing with the
1016  // current element afterwards.
1017 
1018  // We will decide to send the packet as it is now, under two circumstances:
1019  //
1020  // Either:
1021  //
1022  // (1) The current element won't fit into the current packet since it
1023  // would violate the max_packet_size_.
1024  //
1025  // -OR-
1026  //
1027  // (2) There is at least one element already in the current packet,
1028  // and the current element says that it must be sent in an
1029  // exclusive packet (ie, in a packet all by itself).
1030  //
1031  const bool exclusive = element->requires_exclusive_packet();
1032 
1033  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1034  "The element %C require an exclusive packet.\n",
1035  (exclusive ? "DOES" : "does NOT")
1036  ));
1037 
1038  const size_t space_needed =
1039  (max_msg_size > 0)
1040  ? /* fragmenting */ DataSampleHeader::get_max_serialized_size() + MIN_FRAG
1041  : /* not fragmenting */ element_length;
1042 
1043  if ((exclusive && (elems_.size() != 0))
1044  || (current_space_available() < space_needed)) {
1045 
1046  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1047  "Element won't fit in current packet or requires exclusive"
1048  " - send current packet (directly) now.\n"));
1049 
1050  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1051  "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
1052  , max_header_size_, header_.length_, element_length));
1053 
1054  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1055  "Tot possible length: %d, max_len: %d\n"
1056  , max_header_size_ + header_.length_ + element_length
1057  , max_size_));
1058  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1059  "current elem size: %d\n"
1060  , elems_.size()));
1061 
1062  // Send the current packet, and deal with the current element
1063  // afterwards.
1064  // The invocation's relink status should dictate the direct_send's
1065  // do_relink. We don't want a (relink == false) invocation to end up
1066  // doing a relink. Think of (relink == false) as a non-blocking call.
1067  direct_send(relink);
1068 
1069  // Now check to see if we flipped into MODE_QUEUE, which would mean
1070  // that the direct_send() experienced backpressure, and the
1071  // packet was only partially sent. If this has happened, we deal with
1072  // the current element by placing it on the queue (and then we are done).
1073  //
1074  // Otherwise, if the mode_ is still MODE_DIRECT, we can just
1075  // "drop" through to the next step in the logic where we append the
1076  // current element to the current packet.
1077  if (mode_ == MODE_QUEUE) {
1078  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1079  "We experienced backpressure on that direct send, as "
1080  "the mode_ is now MODE_QUEUE or MODE_SUSPEND. "
1081  "Queue elem and leave.\n"), 5);
1082  queue_.put(element);
1083  synch_->work_available();
1084 
1085  return;
1086  }
1087  }
1088 
1089  // Loop for sending 'element', in fragments if needed
1090  bool first_pkt = true; // enter the loop 1st time through unconditionally
1091  for (TransportQueueElement* next_fragment = 0;
1092  (first_pkt || next_fragment)
1093  && (mode_ == MODE_DIRECT || mode_ == MODE_TERMINATED);) {
1094  // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
1095 
1096  if (next_fragment) {
1097  element = next_fragment;
1098  element_length = next_fragment->msg()->total_length();
1099  header_.first_fragment_ = false;
1100  }
1101 
1102  header_.last_fragment_ = false;
1103  if (max_msg_size) { // fragmentation enabled
1104  const size_t avail = current_space_available();
1105  if (element_length > avail) {
1106  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting %B > %B\n", element_length, avail), 0);
1107  const TqePair ep = element->fragment(avail);
1108  if (ep == null_tqe_pair) {
1109  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::send: "
1110  "Element Fragmentation Failed\n"));
1111  return;
1112  }
1113  element = ep.first;
1114  element_length = element->msg()->total_length();
1115  next_fragment = ep.second;
1116  header_.first_fragment_ = first_pkt;
1117  } else if (next_fragment) {
1118  // We are sending the "tail" element of a previous fragment()
1119  // operation, and this element didn't itself require fragmentation
1120  header_.last_fragment_ = true;
1121  next_fragment = 0;
1122  }
1123  }
1124  first_pkt = false;
1125 
1126  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1127  "Start the 'append elem' to current packet logic.\n"));
1128 
1129  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1130  "Put element into current packet elems_.\n"));
1131 
1132  // Now that we know the current element should go into the current
1133  // packet, we can just go ahead and "append" the current element to
1134  // the current packet.
1135 
1136  // Add the current element to the collection of packet elements.
1137  elems_.put(element);
1138 
1139  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1140  "Before, the header_.length_ == [%d].\n",
1141  header_.length_));
1142 
1143  // Adjust the header_.length_ to account for the length of the element.
1144  header_.length_ += static_cast<ACE_UINT32>(element_length);
1145  const size_t message_length = header_.length_;
1146 
1147  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1148  "After adding element's length, the header_.length_ == [%d].\n",
1149  message_length));
1150 
1151  // The current packet now contains the current element. We need to
1152  // check to see if the conditions are such that we should go ahead and
1153  // attempt to send the packet "directly" now, or if we can just leave
1154  // and send the current packet later (in another send() call or in a
1155  // send_stop() call).
1156 
1157  // There a few conditions that will cause us to attempt to send the
1158  // packet (directly) right now:
1159  // - Fragmentation was needed
1160  // - The current packet has the maximum number of samples per packet.
1161  // - The current packet's total length exceeds the optimum packet size.
1162  // - The current element (currently part of the packet elems_)
1163  // requires an exclusive packet.
1164  //
1165  if (next_fragment || (elems_.size() >= max_samples_)
1166  || (max_header_size_ + message_length > optimum_size_)
1167  || exclusive) {
1168  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1169  "Now the current packet looks full - send it (directly).\n"));
1170 
1171  direct_send(relink);
1172 
1173  if (next_fragment && mode_ != MODE_DIRECT) {
1174  if (mode_ == MODE_QUEUE) {
1175  queue_.put(next_fragment);
1176  synch_->work_available();
1177 
1178  } else {
1179  next_fragment->data_dropped(true /* dropped by transport */);
1180  }
1181  } else if (mode_ == MODE_QUEUE) {
1182  // Background thread handles packets in progress
1183  synch_->work_available();
1184  }
1185 
1186  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1187  "Back from the direct_send() attempt.\n"));
1188 
1189  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1190  "And we %C as a result of the direct_send() call.\n",
1191  ((mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
1192  : "stayed in MODE_DIRECT")));
1193 
1194  } else {
1195  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1196  "Packet not sent. Send conditions weren't satisfied.\n"));
1197  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1198  "elems_.size(): %d, max_samples_: %d\n",
1199  int(elems_.size()), int(max_samples_)));
1200  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1201  "header_size_: %d, optimum_size_: %d\n",
1202  int(max_header_size_ + message_length),
1203  int(optimum_size_)));
1204  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1205  "element_requires_exclusive_packet: %d\n", int(exclusive)));
1206 
1207  if (mode_ == MODE_QUEUE) {
1208  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1209  "We flipped into MODE_QUEUE.\n"));
1210 
1211  } else {
1212  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1213  "We stayed in MODE_DIRECT.\n"));
1214  }
1215  }
1216  }
1217  }
1218  }
1219 
1221 }
1222 
1223 void
1225 {
1226  DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
1227  {
1228  GuardType guard(lock_);
1229 
1230  if (link_released_)
1231  return;
1232 
1233  if (start_counter_ == 0) {
1234  // This is an indication of a logic error. This is more of an assert.
1235  VDBG_LVL((LM_ERROR,
1236  "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
1237  return;
1238  }
1239 
1240  --start_counter_;
1241 
1242  if (start_counter_ != 0) {
1243  // This wasn't the last send_stop() that we are expecting. We only
1244  // really honor the first send_start() and the last send_stop().
1245  // We can return without doing anything else in this case.
1246  return;
1247  }
1248 
1250  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1251  "TransportSendStrategy::send_stop: dont try to send current packet "
1252  "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
1253  return;
1254  }
1255 
1256  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1257  "This is an 'important' send_stop() event since our "
1258  "start_counter_ is 0.\n"));
1259 
1260  // We just caused the start_counter_ to become zero. This
1261  // means that we aren't expecting another send() or send_stop() at any
1262  // time in the near future (ie, it isn't imminent).
1263 
1264  // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
1265  // anything to do here because samples have already been going to the
1266  // queue.
1267 
1268  // We only need to do something if the mode_ is
1269  // MODE_DIRECT. It means that we may have some sample(s) in the
1270  // current packet that have never been sent. This is our
1271  // opportunity to send the current packet directly if this is the case.
1272  if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
1273  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1274  "But since we are in %C, we don't have to do "
1275  "anything more in this important send_stop().\n",
1276  mode_as_str(mode_)));
1277  // We don't do anything if we are in MODE_QUEUE. Just leave.
1278  return;
1279  }
1280 
1281  size_t header_length = header_.length_;
1282  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1283  "We are in MODE_DIRECT in an important send_stop() - "
1284  "header_.length_ == [%d].\n", header_length));
1285 
1286  // Only attempt to send the current packet (directly) if the current
1287  // packet actually contains something (it could be empty).
1288  if ((header_length > 0) &&
1289  //(elems_.size ()+not_yet_pac_q_->size() > 0))
1290  (elems_.size() > 0)) {
1291  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1292  "There is something in the current packet - attempt to send "
1293  "it (directly) now.\n"));
1294  // If a relink needs to be done for this packet to be sent, do it.
1295  direct_send(true);
1296  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1297  "Back from the attempt to send leftover packet directly.\n"));
1298 
1299  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1300  "But we %C as a result.\n",
1301  ((mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
1302  "stayed in MODE_DIRECT" )));
1303  if (mode_ == MODE_QUEUE && mode_ != MODE_SUSPEND) {
1304  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1305  "Notify Synch thread of work availability\n"));
1306  synch_->work_available();
1307  }
1308  }
1309  }
1310 
1312 }
1313 
1314 void
1316 {
1317  DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
1318 
1319  const TransportQueueElement::MatchOnPubId match(pub_id);
1321 
1322  GuardType guard(lock_);
1323 
1324  if (send_buffer_ != 0) {
1325  // If a secondary send buffer is bound, removed samples must
1326  // be retained in order to properly maintain the buffer:
1327  send_buffer_->retain_all(pub_id);
1328  }
1329 
1330  do_remove_sample(pub_id, match, true);
1331 }
1332 
1335 {
1336  DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
1337 
1338  VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5);
1339 
1340  // The sample to remove is either in temporary delayed notification list or
1341  // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
1342  // notification list by transport thread, it needs acquire WriterDataContainer lock for
1343  // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
1344  // complete as this call already hold the WriterContainer's lock. So this call is safe to
1345  // access the sample to remove. If it's going to be removed by this remove_sample() calling
1346  // thread, it will be removed either from delayed notification list or from internal list
1347  // in which case the element carry the info if the sample is released so the datalinkset
1348  // can stop calling rest datalinks to remove this sample if it's already released..
1349 
1350  const char* const payload = sample->get_sample()->cont()->rd_ptr();
1351  GUID_t pub_id = sample->get_pub_id();
1352  const TransportQueueElement::MatchOnDataPayload modp(payload);
1353  if (send_delayed_notifications(&modp)) {
1354  return REMOVE_RELEASED;
1355  }
1356 
1357  GuardType guard(lock_);
1358  return do_remove_sample(pub_id, modp);
1359 }
1360 
1363  const TransportQueueElement::MatchCriteria& criteria, bool remove_all)
1364 {
1365  DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
1366 
1367  //ciju: Tim had the idea that we could do the following check
1368  // if ((mode_ == MODE_DIRECT) ||
1369  // ((pkt_chain_ == 0) && (queue_ == empty)))
1370  // then we can assume that the sample can be safely removed (no need for
1371  // replacement) from the elems_ queue.
1372  if ((mode_ == MODE_DIRECT)
1373  || ((pkt_chain_ == 0) && (queue_.size() == 0))) {
1374  //ciju: I believe this is the only mode where a safe
1375  // assumption can be made that the samples
1376  // in the elems_ queue aren't part of a packet.
1377  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1378  "The mode is MODE_DIRECT, or the queue is empty and no "
1379  "transport packet is in progress.\n"));
1380 
1381  QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1382  elems_.accept_remove_visitor(simple_rem_vis);
1383 
1384  const RemoveResult status = simple_rem_vis.status();
1385 
1386  if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1387  header_.length_ -= simple_rem_vis.removed_bytes();
1388 
1389  } else if (status == REMOVE_NOT_FOUND) {
1390  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1391  "Failed to find the sample to remove.\n"));
1392  }
1393 
1394  if (criteria.unique() || !remove_all) return status;
1395  }
1396 
1397  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1398  "Visit the queue_ with the RemoveElementVisitor.\n"));
1399 
1400  QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1401  queue_.accept_remove_visitor(simple_rem_vis);
1402 
1403  RemoveResult status = simple_rem_vis.status();
1404 
1405  if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1406  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1407  "The sample was removed from the queue_.\n"));
1408  // This means that the visitor did not encounter any fatal error
1409  // along the way, *AND* the sample was found in the queue_,
1410  // and has now been removed. We are done.
1411  if (criteria.unique() || !remove_all) return status;
1412  }
1413 
1414  if (status == REMOVE_ERROR) {
1415  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1416  "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
1417  // This means that the visitor encountered some fatal error along
1418  // the way (and it already reported something to the log).
1419  // Return our failure code.
1420  return status;
1421  }
1422 
1423  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1424  "The RemoveElementVisitor did not find the sample in queue_.\n"));
1425 
1426  // We get here if the visitor did not encounter any fatal error, but it
1427  // also didn't find the sample - and hence it didn't perform any
1428  // "remove sample" logic.
1429 
1430  // Now we need to turn our attention to the current transport packet,
1431  // since the packet is likely in a "partially sent" state, and the
1432  // sample may still be contributing unsent bytes in the pkt_chain_.
1433 
1434  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1435  "Visit our elems_ with the PacketRemoveVisitor.\n"));
1436 
1437  PacketRemoveVisitor pac_rem_vis(criteria,
1438  pkt_chain_,
1439  header_block_,
1442  remove_all);
1443 
1444  elems_.accept_replace_visitor(pac_rem_vis);
1445 
1446  status = pac_rem_vis.status();
1447 
1448  if (status == REMOVE_ERROR) {
1449  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1450  "The PacketRemoveVisitor encountered a fatal error.\n"));
1451 
1452  } else if (status == REMOVE_NOT_FOUND) {
1453  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1454  "The PacketRemoveVisitor didn't find the sample.\n"));
1455 
1456  } else {
1457  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1458  "The PacketRemoveVisitor found the sample and removed it.\n"));
1459  }
1460 
1461  return status;
1462 }
1463 
1464 void
1466 {
1467  DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
1468 
1469  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1470  "Prepare the current packet for a direct send attempt.\n"));
1471 
1472  // Prepare the packet for sending.
1473  prepare_packet();
1474 
1475  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1476  "Now attempt to send the packet.\n"));
1477 
1478  // We will try resend the packet if the send() fails and then connection
1479  // is re-established. Only loops if the "continue" line is hit.
1480  while (true) {
1481  // Attempt to send the packet
1482  const SendPacketOutcome outcome = send_packet();
1483 
1484  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1485  "The outcome of the send_packet() was %d.\n", outcome));
1486 
1487  if ((outcome == OUTCOME_BACKPRESSURE) ||
1488  (outcome == OUTCOME_PARTIAL_SEND)) {
1489  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1490  "The outcome of the send_packet() was either "
1491  "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
1492 
1493  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1494  "Flip into the MODE_QUEUE mode_.\n"), 5);
1495 
1496  // We encountered backpressure, or only sent part of the packet.
1497  mode_ = MODE_QUEUE;
1498 
1499  } else if ((outcome == OUTCOME_PEER_LOST) ||
1500  (outcome == OUTCOME_SEND_ERROR)) {
1501  if (outcome == OUTCOME_SEND_ERROR) {
1503  "(%P|%t) WARNING: Problem detected in "
1504  "send buffer management: %p.\n",
1505  "send_bytes"), 1);
1506 
1507  if (Transport_debug_level > 0) {
1508  TransportImpl_rch transport = transport_.lock();
1509  if (transport) {
1510  transport->dump();
1511  }
1512  }
1513  } else {
1514  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1515  "The outcome of the send_packet() was "
1516  "OUTCOME_PEER_LOST.\n"));
1517  }
1518 
1519  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1520  "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
1521 
1522  if (mode_ != MODE_SUSPEND) {
1524  mode_ = MODE_SUSPEND;
1525  }
1526 
1527  if (do_relink) {
1528  bool do_suspend = false;
1529  relink(do_suspend);
1530 
1531  if (mode_ == MODE_SUSPEND) {
1532  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1533  "The reconnect has not done yet and we are "
1534  "still in MODE_SUSPEND.\n"), 5);
1535 
1536  } else if (mode_ == MODE_TERMINATED) {
1537  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1538  "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
1539 
1540  } else {
1541  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1542  "Try send the packet again since the connection "
1543  "is re-established.\n"), 5);
1544 
1545  // Try send the packet again since the connection is re-established.
1546  continue;
1547  }
1548  }
1549 
1550  } else {
1551  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1552  "The outcome of the send_packet() must have been "
1553  "OUTCOME_COMPLETE_SEND.\n"));
1554  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1555  "So, we will just stay in MODE_DIRECT.\n"));
1556  }
1557 
1558  break;
1559  }
1560 
1561  // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
1562 }
1563 
1564 void
1566 {
1567  DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
1568 
1569  for (TransportQueueElement* element = queue_.peek(); element != 0;
1570  element = queue_.peek()) {
1571 
1572  // Total number of bytes in the current element's message block chain.
1573  size_t element_length = element->msg()->total_length();
1574 
1575  // Flag used to determine if the element requires a packet all to itself.
1576  const bool exclusive_packet = element->requires_exclusive_packet();
1577 
1578  const size_t avail = current_space_available();
1579 
1580  bool frag = false;
1581  if (element_length > avail) {
1582  // The current element won't fit into the current packet
1583  if (max_message_size()) { // fragmentation enabled
1584  header_.first_fragment_ = !element->is_fragment();
1585  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0);
1586  const TqePair ep = element->fragment(avail);
1587  if (ep == null_tqe_pair) {
1588  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::get_packet_elems_from_queue: "
1589  "Element Fragmentation Failed\n"));
1590  return;
1591  }
1592  element = ep.first;
1593  element_length = element->msg()->total_length();
1594  queue_.replace_head(ep.second);
1595  frag = true; // queue_ is already taken care of, don't get() later
1596  } else {
1597  break;
1598  }
1599  }
1600 
1601  // If exclusive and the current packet is empty, we won't violate the
1602  // exclusive_packet requirement by put()'ing the element
1603  // into the elems_ collection.
1604  if ((exclusive_packet && elems_.size() == 0)
1605  || !exclusive_packet) {
1606  // At this point, we have passed all of the pre-conditions and we can
1607  // now extract the current element from the queue_, put it into the
1608  // packet elems_, and adjust the packet header_.length_.
1609  elems_.put(frag ? element : queue_.get());
1610  if (header_.length_ == 0) {
1611  header_.last_fragment_ = !frag && element->is_fragment();
1612  }
1613  header_.length_ += static_cast<ACE_UINT32>(element_length);
1614  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0);
1615  }
1616 
1617  // With exclusive and (elems_.size() != 0), we don't use the current
1618  // element as part of the packet. We know that there is already
1619  // at least one element in the packet, and the current element
1620  // is going to need its own (exclusive) packet. We will just
1621  // use the packet elems_ as it is now. Always break once
1622  // we've encountered and dealt with the exclusive_packet case.
1623  // Also break if fragmentation was required.
1624  if (exclusive_packet || frag
1625  // If the current number of packet elems_ has reached the maximum
1626  // number of samples per packet, then we are done.
1627  || elems_.size() == max_samples_
1628  // If the current value of the header_.length_ exceeds (or equals)
1629  // the optimum_size_ for a packet, then we are done.
1630  || header_.length_ >= optimum_size_) {
1631  break;
1632  }
1633  }
1634 }
1635 
1636 void
1638 {
1639  DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
1640 
1641  // Increment header sequence for packet:
1643 
1644  // Allow the specific implementation the opportunity to set
1645  // values in the packet header.
1646  prepare_header_i();
1647 }
1648 
1649 void
1651 {
1652  DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
1653 
1654  // Default implementation does nothing.
1655 }
1656 
1657 void
1659 {
1660  DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
1661 
1662  // Prepare the header for sending.
1663  prepare_header();
1664 
1665  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1666  "Marshal the packet header.\n"));
1667 
1668  if (header_block_ != 0) {
1670  }
1671 
1673  static_cast<ACE_Message_Block*>(header_mb_allocator_->malloc()),
1676  0, // cont
1677  0, // data
1678  header_data_allocator_.get(),
1683  header_db_allocator_.get(),
1684  header_mb_allocator_.get()));
1685 
1687 
1689 
1690  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1691  "Use a BuildChainVisitor to visit the packet elems_.\n"));
1692 
1693  // Build up a chain of blocks by duplicating the message block chain
1694  // held by each element (in elems_), and then chaining the new duplicate
1695  // blocks together to form one long chain.
1696  BuildChainVisitor visitor;
1697  elems_.accept_visitor(visitor);
1698 
1699  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1700  "Attach the visitor's chain of blocks to the lone (packet "
1701  "header) block currently in the pkt_chain_.\n"));
1702 
1703  // Attach the visitor's chain of blocks to the packet header block.
1704  pkt_chain_->cont(visitor.chain());
1705 
1706  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1707  "Increment header sequence for next packet.\n"));
1708 
1709  // Allow the specific implementation the opportunity to process the
1710  // newly prepared packet.
1711  prepare_packet_i();
1712 
1713  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1714  "Set the header_complete_ flag to false.\n"));
1715 
1716  // Set the header_complete_ to false to indicate
1717  // that the first block in the pkt_chain_ is the packet header block
1718  // (actually a duplicate() of the packet header_block_).
1719  header_complete_ = false;
1720 }
1721 
1722 bool
1724 {
1725  return *mb << header_;
1726 }
1727 
1728 void
1730 {
1731  DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
1732 
1733  // Default implementation does nothing.
1734 }
1735 
1736 void
1738 {
1739  graceful_disconnecting_ = flag;
1740 }
1741 
1742 ssize_t
1744 {
1745  if (Transport_debug_level > 9) {
1747  ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
1748  ACE_TEXT("sending data at 0x%x.\n"),
1749  id(), packet));
1750  }
1751  DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
1752 
1753 #ifdef OPENDDS_SECURITY
1754  Message_Block_Ptr substitute;
1755  if (security_config()) {
1756  const DDS::Security::CryptoTransform_var crypto = security_config()->get_crypto_transform();
1757  // pre_send_packet may provide different data that takes the place of the
1758  // original "packet" (used for security encryption/authentication)
1759  if (crypto) {
1760  substitute.reset(pre_send_packet(packet));
1761  if (!substitute) {
1762  VDBG((LM_DEBUG, "(%P|%t) DBG: pre_send_packet returned NULL, dropping.\n"));
1763  return packet->total_length();
1764  }
1765  }
1766  }
1767 #endif
1768 
1769  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1770  "Populate the iovec array using the packet.\n"), 5);
1771 
1772  iovec iov[MAX_SEND_BLOCKS];
1773 
1774 #ifdef OPENDDS_SECURITY
1775  const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
1776 #else
1777  const int num_blocks = mb_to_iov(*packet, iov);
1778 #endif
1779 
1780  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1781  "There are [%d] number of entries in the iovec array.\n",
1782  num_blocks), 5);
1783 
1784  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1785  "Attempt to send_bytes() now.\n"), 5);
1786 
1787  const ssize_t num_bytes_sent = send_bytes(iov, num_blocks, bp);
1788 
1789  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1790  "The send_bytes() said that num_bytes_sent == [%d].\n",
1791  num_bytes_sent), 5);
1792 
1793 #ifdef OPENDDS_SECURITY
1794  if (num_bytes_sent > 0 && substitute && packet->data_block() != substitute->data_block()) {
1795  // Although the "substitute" data took the place of "packet", the rest
1796  // of the framework needs to account for the bytes in "packet" being taken
1797  // care of, as if they were actually sent.
1798  // Since this is done with datagram sockets, partial sends aren't possible.
1799  return packet->total_length();
1800  }
1801 #endif
1802 
1803  return num_bytes_sent;
1804 }
1805 
1808 {
1809  DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
1810 
1811  int bp_flag = 0;
1812  const ssize_t num_bytes_sent =
1813  do_send_packet(pkt_chain_, bp_flag);
1814 
1815  if (num_bytes_sent == 0) {
1816  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1817  "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
1818  // This means that the peer has disconnected.
1819  return OUTCOME_PEER_LOST;
1820  }
1821 
1822  if (num_bytes_sent < 0) {
1823  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1824  "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
1825 
1826  // Check for backpressure...
1827  if (bp_flag == 1) {
1828  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1829  "Since backpressure flag is true, return "
1830  "OUTCOME_BACKPRESSURE.\n"), 5);
1831  // Ok. Not really an error - just backpressure.
1832  return OUTCOME_BACKPRESSURE;
1833  }
1834 
1835  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1836  "Since backpressure flag is false, return "
1837  "OUTCOME_SEND_ERROR.\n"), 5);
1838 
1839  // Not backpressure - it's a real error.
1840  // Note: moved this to send_bytes so the errno msg could be written.
1841  //ACE_ERROR((LM_ERROR,
1842  // "(%P|%t) ERROR: Call to peer().send() failed with negative "
1843  // "return code.\n"));
1844 
1845  return OUTCOME_SEND_ERROR;
1846  }
1847 
1848  if (send_buffer_ != 0) {
1849  // If a secondary send buffer is bound, sent samples must
1850  // be inserted in order to properly maintain the buffer:
1852  &elems_, pkt_chain_);
1853  }
1854 
1855  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1856  "Since num_bytes_sent > 0, adjust the packet to account for "
1857  "the bytes that did get sent.\n"),5);
1858 
1859  // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
1860  // to account for the bytes that have been sent.
1861  const int result =
1862  adjust_packet_after_send(num_bytes_sent);
1863 
1864  if (result == 0) {
1865  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1866  "The adjustment logic says that the complete packet was "
1867  "sent. Return OUTCOME_COMPLETE_SEND.\n"));
1868  return OUTCOME_COMPLETE_SEND;
1869  }
1870 
1871  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1872  "The adjustment logic says that only a part of the packet was "
1873  "sent. Return OUTCOME_PARTIAL_SEND.\n"));
1874 
1875  return OUTCOME_PARTIAL_SEND;
1876 }
1877 
1878 ssize_t
1879 TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
1880 {
1881  int val = 0;
1882  ACE_HANDLE handle = get_handle();
1883 
1884  if (handle == ACE_INVALID_HANDLE)
1885  return -1;
1886 
1888 
1889  // Set the back-pressure flag to false.
1890  bp = 0;
1891 
1892  // Clear errno
1893  errno = 0;
1894 
1895  ssize_t result = send_bytes_i(iov, n);
1896 
1897  if (result == -1) {
1898  if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
1899  VDBG((LM_DEBUG,"(%P|%t) DBG: "
1900  "Backpressure encountered.\n"));
1901  // Set the back-pressure flag to true
1902  bp = 1;
1903 
1904  } else {
1905  VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
1906  ACE_TEXT("sendv"), n),1);
1907 
1908  // try to get the application to core when "Bad Address" is returned
1909  // by looking at the iovec
1910  for (int ii = 0; ii < n; ii++) {
1911  ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
1912  ii, iov[ii].iov_len, iov[ii].iov_base));
1913  }
1914  }
1915  }
1916 
1917  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
1918  "The sendv() returned [%d].\n", result), 5);
1919 
1920  ACE::restore_non_blocking_mode(handle, val);
1921 
1922  return result;
1923 }
1924 
1925 void
1927 {
1928  if (Transport_debug_level) {
1929  size_t size = delayed_delivered_notification_queue_.size();
1930  if ((size > 0) && (size % max_samples_ == 0)) {
1932  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
1933  size));
1934  }
1935  }
1936 
1937  delayed_delivered_notification_queue_.push_back(std::make_pair(element, mode_.load()));
1938 }
1939 
1941 {
1942  const TransportQueueElement::MatchOnElement moe(element);
1943  {
1944  GuardType guard(lock_);
1946  }
1947 
1948  element->data_delivered();
1949 }
1950 
1951 size_t TransportSendStrategy::space_available(size_t already_used) const
1952 {
1953  const size_t used = max_header_size_ + already_used;
1954  const size_t max_msg = max_message_size();
1955  if (max_msg) {
1956  return std::min(static_cast<size_t>(max_size_), max_msg) - used;
1957  }
1958  return max_size_ - used;
1959 }
1960 
1962 {
1964 }
1965 
1966 int
1968 {
1969  int num_blocks = 0;
1970 #ifdef _MSC_VER
1971 #pragma warning(push)
1972 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
1973 // since on other platforms iov_len is 64-bit
1974 #pragma warning(disable : 4267)
1975 #endif
1976  for (const ACE_Message_Block* block = &msg;
1977  block && num_blocks < MAX_SEND_BLOCKS;
1978  block = block->cont()) {
1979  iov[num_blocks].iov_len = block->length();
1980  iov[num_blocks++].iov_base = block->rd_ptr();
1981  }
1982 #ifdef _MSC_VER
1983 #pragma warning(pop)
1984 #endif
1985  return num_blocks;
1986 }
1987 
1989  TransportQueueElement* original_element, TqeVector& elements_to_send)
1990 {
1991  original_element->increment_loan();
1992  const size_t space = space_available();
1993  for (TransportQueueElement* e = original_element; e;) {
1994  const size_t esize = e->msg()->total_length();
1995  if (esize > space) {
1996  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportSendStrategy::fragmentation_helper: "
1997  "message size %B > space %B: Fragmenting\n", esize, space), 0);
1998  const TqePair pair = e->fragment(space);
1999  if (pair == null_tqe_pair) {
2000  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::fragmentation_helper: "
2001  "Element Fragmentation Failed\n"));
2002  return false;
2003  }
2004  elements_to_send.push_back(pair.first);
2005  e = pair.second;
2006  } else {
2007  elements_to_send.push_back(e);
2008  e = 0;
2009  }
2010  }
2011  return true;
2012 }
2013 
2014 } // namespace DCPS
2015 } // namespace OpenDDS
2016 
#define ACE_DEBUG(X)
void set_graceful_disconnecting(bool flag)
Set graceful disconnecting flag.
#define ACE_ERROR(X)
ACE_UINT32 optimum_packet_size_
Optimum size (in bytes) of a packet (packet header + sample(s)).
static const ACE_Time_Value max_time
WeakRcHandle< TransportImpl > transport_
std::pair< TransportQueueElement *, SendMode > TQESendModePair
Used for delayed notifications when performing work.
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
Cached_Allocator_With_Overflow< ACE_Data_Block, RECEIVE_SYNCH > TransportDataBlockAllocator
int adjust_packet_after_send(ssize_t num_bytes_sent)
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
size_t length(void) const
SequenceNumber header_sequence_
Current transport header sequence number.
LM_INFO
virtual void relink(bool do_suspend=true)
virtual ACE_Message_Block * pre_send_packet(const ACE_Message_Block *m)
SendMode mode() const
Access the current sending mode.
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool data_dropped(bool dropped_by_transport=false)
ssize_t do_send_packet(const ACE_Message_Block *packet, int &bp)
Form an IOV and call the send_bytes() template method.
void swap(BasicQueue &other)
Definition: BasicQueue_T.h:122
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
virtual RemoveResult do_remove_sample(const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
Implement framework chain visitations to remove a sample.
DataBlockLock * get_lock()
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
unique_ptr< TransportMessageBlockAllocator > header_mb_allocator_
Allocator for header data block.
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
int ssize_t
int release(void)
ACE_Guard< ACE_Thread_Mutex > lock_
TransportSendStrategy(std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
virtual ssize_t send_bytes_i(const iovec iov[], int n)=0
char * rd_ptr(void) const
virtual short thread_priority() const
Access the mapped thread priority value.
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
void record_and_set_non_blocking_mode(ACE_HANDLE handle, int &val)
virtual void prepare_header_i()
Specific implementation processing of prepared packet header.
void bind(TransportSendStrategy *strategy)
virtual void retain_all(const GUID_t &pub_id)
size_t max_samples_per_packet_
Max number of samples that should ever be in a single packet.
Definition: TransportInst.h:99
TransportHeader header_
Current transport packet header.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
virtual bool matches(const TransportQueueElement &candidate) const =0
RemoveResult remove_sample(const DataSampleElement *sample)
LM_DEBUG
unique_ptr< DataAllocator > header_data_allocator_
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
void send(TransportQueueElement *element, bool relink=true)
size_t max_samples_
Configuration - max number of samples per transport packet.
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define VDBG(DBG_ARGS)
LM_TRACE
virtual bool requires_exclusive_packet() const
Does the sample require an exclusive transport packet?
void dump()
Diagnostic aid.
virtual void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)=0
virtual ACE_Message_Block * release(void)
ACE_Data_Block * data_block(void) const
virtual void add_delayed_notification(TransportQueueElement *element)
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
const TqePair null_tqe_pair
virtual bool owned_by_transport()=0
Is the sample created by the transport?
virtual void prepare_packet_i()
Specific implementation processing of prepared packet.
size_t total_length(void) const
#define NUM_REPLACED_ELEMENT_CHUNKS
LM_WARNING
virtual bool start_i()
Let the subclass start.
ACE_UINT32 max_packet_size_
Max size (in bytes) of a packet (packet header + sample(s))
Definition: TransportInst.h:96
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
virtual ssize_t non_blocking_send(const iovec iov[], int n, int &bp)
ACE_TEXT("TCP_Factory")
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
map TRANSPORT_PRIORITY values directly.
unsigned long long ACE_UINT64
unique_ptr< ThreadSynch > synch_
The thread synch object.
virtual Security::SecurityConfig_rch security_config() const
void accept_remove_visitor(VisitorType &visitor)
Definition: BasicQueue_T.h:95
ACE_CDR::Long Priority
size_t max_header_size_
Maximum marshalled size of the transport packet header.
#define ACE_DEFAULT_THREAD_PRIORITY
unique_ptr< DataBlockLockPool > header_db_lock_pool_
DataBlockLockPool.
bool fragmentation_helper(TransportQueueElement *original_element, TqeVector &elements_to_send)
unique_ptr< TransportDataBlockAllocator > header_db_allocator_
Allocator for header message block.
void accept_replace_visitor(VisitorType &visitor)
Definition: BasicQueue_T.h:113
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define EWOULDBLOCK
static const ACE_Time_Value zero
OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_
virtual ssize_t send_bytes(const iovec iov[], int n, int &bp)
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)
void restore_non_blocking_mode(ACE_HANDLE handle, int val)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
void accept_visitor(VisitorType &visitor) const
Definition: BasicQueue_T.h:74
void replace_head(T *value)
Definition: BasicQueue_T.h:47
#define ACE_ERROR_RETURN(X, Y)
virtual bool marshal_transport_header(ACE_Message_Block *mb)
T load() const
Definition: Atomic.h:33
static const char * mode_as_str(SendMode mode)
Helper function to debugging.
void deliver_ack_request(TransportQueueElement *element)
#define ENOBUFS
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
#define TheServiceParticipant
LM_ERROR
char * base(void) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
size_t space_available(size_t already_used=0) const
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
Allocator for data buffers.
MessageBlockAllocator replaced_element_mb_allocator_
Cached allocator for TransportReplaceElement.
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Definition: BasicQueue_T.h:36
Base wrapper class around a data/control sample to be sent.
Cached_Allocator_With_Overflow< ACE_Message_Block, RECEIVE_SYNCH > TransportMessageBlockAllocator
virtual void stop_i()=0
Let the subclass stop.
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
TransportInst_rch config() const