Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "TransportSendStrategy.h"
11 :
12 : #include "RemoveAllVisitor.h"
13 : #include "TransportInst.h"
14 : #include "ThreadSynchStrategy.h"
15 : #include "ThreadSynchResource.h"
16 : #include "TransportQueueElement.h"
17 : #include "TransportSendElement.h"
18 : #include "TransportSendBuffer.h"
19 : #include "BuildChainVisitor.h"
20 : #include "QueueRemoveVisitor.h"
21 : #include "PacketRemoveVisitor.h"
22 : #include "TransportDefs.h"
23 : #include "DirectPriorityMapper.h"
24 : #include "EntryExit.h"
25 :
26 : #include <dds/DCPS/DataSampleHeader.h>
27 : #include <dds/DCPS/DataSampleElement.h>
28 : #include <dds/DCPS/Service_Participant.h>
29 :
30 : #include <ace/Reverse_Lock_T.h>
31 :
32 : #if !defined (__ACE_INLINE__)
33 : #include "TransportSendStrategy.inl"
34 : #endif /* __ACE_INLINE__ */
35 :
36 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
37 :
38 : namespace OpenDDS {
39 : namespace DCPS {
40 :
41 : //TBD: The number of chunks of the replace element allocator
42 : // is hard coded for now. This will be configurable when
43 : // we implement the dds configurations. This value should
44 : // be the number of marshalled DataSampleHeader that a
45 : // packet could contain.
46 : #define NUM_REPLACED_ELEMENT_CHUNKS 20
47 :
48 : namespace {
49 : /// Arbitrary small constant that represents the minimum
50 : /// amount of payload data we'll have in one fragment.
51 : /// In this case "payload data" includes the content-filtering
52 : /// GUID sequence, so this is chosen to be 4 + (16 * N).
53 : static const size_t MIN_FRAG = 68;
54 : }
55 :
56 : // I think 2 chunks for the header message block is enough
57 : // - one for the original copy and one for duplicate which
58 : // occurs every packet and is released after packet is sent.
59 : // The data block only needs 1 chunk since the duplicate()
60 : // just increases the ref count.
61 0 : TransportSendStrategy::TransportSendStrategy(
62 : std::size_t id,
63 : const TransportImpl_rch& transport,
64 : ThreadSynchResource* synch_resource,
65 : Priority priority,
66 0 : const ThreadSynchStrategy_rch& thread_sync_strategy)
67 : : ThreadSynchWorker(id),
68 0 : max_samples_(DEFAULT_CONFIG_MAX_SAMPLES_PER_PACKET),
69 0 : optimum_size_(DEFAULT_CONFIG_OPTIMUM_PACKET_SIZE),
70 0 : max_size_(DEFAULT_CONFIG_MAX_PACKET_SIZE),
71 0 : max_header_size_(0),
72 0 : header_block_(0),
73 0 : pkt_chain_(0),
74 0 : header_complete_(false),
75 0 : start_counter_(0),
76 0 : mode_(MODE_DIRECT),
77 0 : mode_before_suspend_(MODE_NOT_SET),
78 0 : lock_(),
79 0 : replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
80 0 : replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
81 0 : transport_(transport),
82 0 : graceful_disconnecting_(false),
83 0 : link_released_(true),
84 0 : send_buffer_(0)
85 : {
86 : DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
87 :
88 0 : TransportInst_rch cfg = transport->config();
89 0 : if (cfg) {
90 0 : max_samples_ = cfg->max_samples_per_packet_;
91 0 : optimum_size_ = cfg->optimum_packet_size_;
92 0 : max_size_ = cfg->max_packet_size_;
93 : }
94 :
95 : // Create a ThreadSynch object just for us.
96 0 : DirectPriorityMapper mapper(priority);
97 0 : synch_.reset(thread_sync_strategy->create_synch_object(
98 : synch_resource,
99 : #ifdef ACE_WIN32
100 : ACE_DEFAULT_THREAD_PRIORITY,
101 : #else
102 0 : mapper.thread_priority(),
103 : #endif
104 0 : TheServiceParticipant->scheduler()));
105 :
106 : // We cache this value in data member since it doesn't change, and we
107 : // don't want to keep asking for it over and over.
108 0 : max_header_size_ = TransportHeader::get_max_serialized_size();
109 :
110 0 : delayed_delivered_notification_queue_.reserve(max_samples_);
111 0 : }
112 :
113 0 : TransportSendStrategy::~TransportSendStrategy()
114 : {
115 : DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
116 :
117 :
118 0 : delayed_delivered_notification_queue_.clear();
119 0 : }
120 :
121 : void
122 0 : TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
123 : {
124 0 : send_buffer_ = send_buffer;
125 :
126 0 : if (send_buffer_ != 0) {
127 0 : send_buffer_->bind(this);
128 : }
129 0 : }
130 :
131 : ThreadSynchWorker::WorkOutcome
132 0 : TransportSendStrategy::perform_work()
133 : {
134 : DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
135 :
136 : SendPacketOutcome outcome;
137 0 : bool no_more_work = false;
138 :
139 : { // scope for the guard(lock_);
140 0 : GuardType guard(lock_);
141 :
142 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(mode_)), 5);
143 :
144 0 : if (mode_ == MODE_TERMINATED) {
145 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
146 : "Entered perform_work() and mode_ is MODE_TERMINATED - "
147 : "we lost connection and could not reconnect, just return "
148 : "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
149 0 : return WORK_OUTCOME_BROKEN_RESOURCE;
150 : }
151 :
152 : // The perform_work() is called by our synch_ object using
153 : // a thread designated to call this method when it thinks
154 : // we need to be called in order to "service" the queue_ and/or
155 : // deal with a partially sent current packet.
156 : //
157 : // We will return a 0 if we don't see a need to have our perform_work()
158 : // called again, and we will return a 1 if we do see the need to have our
159 : // perform_work() method called again.
160 :
161 : // First, make sure that the mode_ indicates that we are, indeed, in
162 : // the MODE_QUEUE mode. If we are not in MODE_QUEUE mode (meaning we are
163 : // in MODE_DIRECT), then it means we didn't need to have this perform_work()
164 : // method called - in this case, do nothing other than return
165 : // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
166 : // see a need for it to call our perform_work() again (at least not
167 : // right now).
168 0 : if (mode_ != MODE_QUEUE && mode_ != MODE_SUSPEND) {
169 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
170 : "Entered perform_work() and mode_ is %C - just return "
171 : "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(mode_)), 5);
172 0 : return WORK_OUTCOME_NO_MORE_TO_DO;
173 : }
174 :
175 : // Check the "state" of the current packet. We will either find that the
176 : // current packet is in a state of being "partially sent", or we will find
177 : // it in a state of being "empty". When the current packet is "empty", it
178 : // means that it is time to build up the current packet using elements
179 : // extracted from the queue_, and then we will attempt to send the
180 : // packet. When we find the current packet in the "partially sent" state,
181 : // we will not touch the queue_ - we will just try to send the unsent
182 : // bytes in the current (partially sent) packet.
183 0 : const size_t header_length = header_.length_;
184 :
185 0 : if (header_length == 0) {
186 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
187 : "The current packet doesn't have any unsent bytes - we "
188 : "need to 'populate' the current packet with elems from "
189 : "the queue.\n"), 5);
190 :
191 : // The current packet is "empty". Build up the current packet using
192 : // elements from the queue_, and prepare the current packet to be sent.
193 :
194 : // Before we build the packet from the queue_, let's make sure that
195 : // there is actually something on the queue_ to build from.
196 0 : if (queue_.size() == 0) {
197 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
198 : "But the queue is empty. We have cleared the "
199 : "backpressure situation.\n"),5);
200 : // We are here because the queue_ is empty, and there isn't
201 : // any "partial packet" bytes left to send. We have overcome
202 : // the backpressure situation and don't have anything to do
203 : // right now.
204 :
205 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
206 : "Flip mode to MODE_DIRECT, and return "
207 : "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
208 :
209 : // Flip the mode back to MODE_DIRECT.
210 0 : mode_ = MODE_DIRECT;
211 :
212 : // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
213 : // perform_work() doesn't need to be called again (at this time).
214 0 : return WORK_OUTCOME_NO_MORE_TO_DO;
215 : }
216 :
217 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
218 : "There is at least one elem in the queue - get the packet "
219 : "elems from the queue.\n"), 5);
220 :
221 : // There is stuff in the queue_ if we get to this point in the logic.
222 : // Build-up the current packet using element(s) from the queue_.
223 0 : get_packet_elems_from_queue();
224 :
225 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
226 : "Prepare the packet from the packet elems_.\n"), 5);
227 :
228 : // Now we can prepare the new packet to be sent.
229 0 : prepare_packet();
230 :
231 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
232 : "Packet has been prepared from packet elems_.\n"), 5);
233 :
234 : } else {
235 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
236 : "We have a current packet that still has unsent bytes.\n"), 5);
237 : }
238 :
239 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
240 : "Attempt to send the current packet.\n"), 5);
241 :
242 : // Now we can attempt to send the current packet - whether it is
243 : // a "partially sent" packet or one that we just built-up using elements
244 : // from the queue_ (and subsequently prepared for sending) - it doesn't
245 : // matter. Just attempt to send as many of the "unsent" bytes in the
246 : // packet as possible.
247 0 : outcome = send_packet();
248 :
249 : // If we sent the whole packet (eg, partial_send is false), and the queue_
250 : // is now empty, then we've cleared the backpressure situation.
251 0 : if ((outcome == OUTCOME_COMPLETE_SEND) && (queue_.size() == 0)) {
252 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
253 : "Flip the mode to MODE_DIRECT, and then return "
254 : "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
255 :
256 : // Revert back to MODE_DIRECT mode.
257 0 : mode_ = MODE_DIRECT;
258 0 : no_more_work = true;
259 : }
260 0 : } // End of scope for guard(lock_);
261 :
262 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
263 : "The outcome of the send_packet() was %d.\n", outcome), 5);
264 :
265 0 : send_delayed_notifications();
266 :
267 : // If we sent the whole packet (eg, partial_send is false), and the queue_
268 : // is now empty, then we've cleared the backpressure situation.
269 0 : if (no_more_work) {
270 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
271 : "We sent the whole packet, and there is nothing left on "
272 : "the queue now.\n"), 5);
273 :
274 : // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
275 : // don't desire another call to this perform_work() method.
276 0 : return WORK_OUTCOME_NO_MORE_TO_DO;
277 : }
278 :
279 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
280 : "We still have unsent bytes in the current packet AND/OR there "
281 : "are still elements in the queue.\n"), 5);
282 :
283 0 : if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
284 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
285 : "We lost our connection, or had some fatal connection "
286 : "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
287 :
288 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
289 : "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
290 :
291 0 : bool do_suspend = true;
292 0 : relink(do_suspend);
293 :
294 0 : if (mode_ == MODE_SUSPEND) {
295 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
296 : "The reconnect has not done yet and we are still in MODE_SUSPEND. "
297 : "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
298 : // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
299 : // don't desire another call to this perform_work() method.
300 0 : return WORK_OUTCOME_NO_MORE_TO_DO;
301 :
302 0 : } else if (mode_ == MODE_TERMINATED) {
303 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
304 : "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
305 0 : return WORK_OUTCOME_BROKEN_RESOURCE;
306 :
307 : } else {
308 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
309 : "Reconnect succeeded, Notify synch thread of work "
310 : "availability.\n"), 5);
311 : // If the datalink is re-established then notify the synch
312 : // thread to perform work. We do not hold the object lock at
313 : // this point.
314 0 : synch_->work_available();
315 : }
316 : }
317 :
318 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
319 : "We still have an 'unbroken' connection.\n"), 5);
320 :
321 0 : if (outcome == OUTCOME_BACKPRESSURE) {
322 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
323 : "We experienced backpressure on our attempt to send the "
324 : "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
325 : // We have a "clogged resource".
326 0 : return WORK_OUTCOME_CLOGGED_RESOURCE;
327 : }
328 :
329 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
330 : "We may have sent the whole current packet, but still have "
331 : "elements on the queue.\n"), 5);
332 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
333 : "Or, we may have only partially sent the current packet.\n"), 5);
334 :
335 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
336 : "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
337 :
338 : // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
339 : // in the queue_ to be sent. *OR* we have have had an OUTCOME_PARTIAL_SEND,
340 : // which equates to the same thing - we still have work to do.
341 :
342 : // We are still in MODE_QUEUE mode, thus there is still work to be
343 : // done to service the queue_ and/or a partially sent current packet.
344 : // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
345 : // want it to call this perform_work() method.
346 0 : return WORK_OUTCOME_MORE_TO_DO;
347 : }
348 :
349 : // Now we need to "peel off" those message blocks that were fully
350 : // sent, adjust the first message block with an unsent byte to have
351 : // its rd_ptr() pointing to that first unsent byte, and set the
352 : // pkt_chain_ to that first message block with an unsent byte.
353 : // As we "peel off" fully sent message blocks, we need to also deal with
354 : // fully sent elements by removing them from the elems_ and
355 : // calling their data_delivered() method. In addition, as we peel off
356 : // the message blocks that are fully sent, we need to untie them from
357 : // the chain and release them.
358 : // And finally, don't forget to adjust the header_.length_ to
359 : // account for the num_bytes_sent (beware that some of the num_bytes_sent
360 : // may be packet header bytes and shouldn't affect the header_.length_
361 : // which doesn't include the packet header bytes.
362 : int
363 0 : TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
364 : {
365 : DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
366 :
367 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
368 : "Adjusting the current packet because %d bytes of the packet "
369 : "have been sent.\n", num_bytes_sent));
370 :
371 0 : ssize_t num_bytes_left = num_bytes_sent;
372 0 : ssize_t num_non_header_bytes_sent = 0;
373 :
374 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
375 : "Set num_bytes_left to %d.\n", num_bytes_left));
376 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
377 : "Set num_non_header_bytes_sent to %d.\n",
378 : num_non_header_bytes_sent));
379 :
380 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
381 : "Peek at the element at the front of the packet elems_.\n"));
382 :
383 : // This is the element currently at the front of elems_.
384 0 : TransportQueueElement* element = elems_.peek();
385 :
386 0 : if(!element){
387 0 : ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
388 : } else {
389 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
390 : "Use the element's msg() to find the last block in "
391 : "the msg() chain.\n"));
392 :
393 : // Get a pointer to the last message block in the element.
394 0 : const ACE_Message_Block* elem_tail_block = element->msg();
395 :
396 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
397 : "Start with tail block == element->msg().\n"));
398 :
399 0 : while (elem_tail_block->cont() != 0) {
400 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
401 : "Set tail block to its cont() block (next in chain).\n"));
402 0 : elem_tail_block = elem_tail_block->cont();
403 : }
404 :
405 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
406 : "Tail block now set (because tail block's cont() is 0).\n"));
407 :
408 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
409 : "Start the 'while (num_bytes_left > 0)' loop.\n"));
410 :
411 0 : while (num_bytes_left > 0) {
412 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
413 : "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
414 : num_bytes_left));
415 :
416 0 : const int block_length = static_cast<int>(pkt_chain_->length());
417 :
418 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
419 : "Length of block at front of pkt_chain_ is [%d].\n",
420 : block_length));
421 :
422 0 : if (block_length <= num_bytes_left) {
423 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
424 : "The whole block at the front of pkt_chain_ was sent.\n"));
425 :
426 : // The entire message block at the front of the chain has been sent.
427 : // Detach the head message block from the chain and adjust
428 : // the pkt_chain_ to point to the next block (if any) in
429 : // the chain.
430 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
431 : "Extract the fully sent block from the pkt_chain_.\n"));
432 :
433 0 : ACE_Message_Block* fully_sent_block = pkt_chain_;
434 :
435 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
436 : "Set pkt_chain_ to pkt_chain_->cont().\n"));
437 :
438 0 : pkt_chain_ = pkt_chain_->cont();
439 :
440 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
441 : "Set the fully sent block's cont() to 0.\n"));
442 :
443 0 : fully_sent_block->cont(0);
444 :
445 : // Update the num_bytes_left to indicate that we have
446 : // processed the entire length of the block.
447 0 : num_bytes_left -= block_length;
448 :
449 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
450 : "Updated num_bytes_left to account for fully sent "
451 : "block (block_length == [%d]).\n", block_length));
452 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
453 : "Now, num_bytes_left == [%d].\n", num_bytes_left));
454 :
455 0 : if (!header_complete_) {
456 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
457 : "Since the header_complete_ flag is false, it means "
458 : "that the packet header block was still in the "
459 : "pkt_chain_.\n"));
460 :
461 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
462 : "Not anymore... Set the header_complete_ flag "
463 : "to true.\n"));
464 :
465 : // That was the packet header block. And now we know that it
466 : // has been completely sent.
467 0 : header_complete_ = true;
468 :
469 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
470 : "Release the fully sent block.\n"));
471 :
472 : // Release the fully_sent_block
473 0 : fully_sent_block->release();
474 :
475 : } else {
476 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
477 : "Since the header_complete_ flag is true, it means "
478 : "that the packet header block was not in the "
479 : "pkt_chain_.\n"));
480 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
481 : "So, the fully sent block was part of an element.\n"));
482 :
483 : // That wasn't the packet header block. It was from the
484 : // element currently at the front of the elems_
485 : // collection. If it was the last block from the
486 : // element, then we need to extract the element from the
487 : // elems_ collection and invoke data_delivered() on it.
488 0 : num_non_header_bytes_sent += block_length;
489 :
490 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
491 : "Updated num_non_header_bytes_sent to account for "
492 : "fully sent block (block_length == [%d]).\n",
493 : block_length));
494 :
495 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
496 : "Now, num_non_header_bytes_sent == [%d].\n",
497 : num_non_header_bytes_sent));
498 :
499 0 : if (fully_sent_block->base() == elem_tail_block->base()) {
500 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
501 : "Ok. The fully sent block was a duplicate of "
502 : "the tail block of the element that is at the "
503 : "front of the packet elems_.\n"));
504 :
505 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
506 : "This means that we have completely sent the "
507 : "element at the front of the packet elems_.\n"));
508 :
509 : // This means that we have completely sent the element
510 : // that is currently at the front of the elems_ collection.
511 :
512 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
513 : "We can release the fully sent block now.\n"));
514 :
515 : // Release the fully_sent_block
516 0 : fully_sent_block->release();
517 :
518 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
519 : "We can extract the element from the front of "
520 : "the packet elems_ (we were just peeking).\n"));
521 :
522 : // Extract the element from the elems_ collection
523 0 : element = elems_.get();
524 :
525 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
526 : "Tell the element that a decision has been made "
527 : "regarding its fate - data_delivered().\n"));
528 :
529 : // Inform the element that the data has been delivered.
530 0 : add_delayed_notification(element);
531 :
532 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
533 : "Peek at the next element in the packet "
534 : "elems_.\n"));
535 :
536 : // Set up for the next element in elems_ by peek()'ing.
537 0 : element = elems_.peek();
538 :
539 0 : if (element != 0) {
540 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
541 : "The is an element still in the packet "
542 : "elems_ (we are peeking at it now).\n"));
543 :
544 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
545 : "We are going to find the tail block for the "
546 : "current element (we are peeking at).\n"));
547 :
548 : // There was a "next element". Determine the
549 : // elem_tail_block for it.
550 0 : elem_tail_block = element->msg();
551 :
552 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
553 : "Start w/tail block == element->msg().\n"));
554 :
555 0 : while (elem_tail_block->cont() != 0) {
556 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
557 : "Set tail block to next in chain.\n"));
558 0 : elem_tail_block = elem_tail_block->cont();
559 : }
560 :
561 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
562 : "Done finding tail block.\n"));
563 : }
564 :
565 : } else {
566 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
567 : "Ok. The fully sent block is *not* a "
568 : "duplicate of the tail block of the element "
569 : "at the front of the packet elems_.\n"));
570 :
571 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
572 : "Thus, we have not completely sent the "
573 : "element yet.\n"));
574 :
575 : // We didn't completely send the element - it has more
576 : // message blocks that haven't been sent (that we know of).
577 :
578 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
579 : "We can release the fully_sent_block now.\n"));
580 :
581 : // Release the fully_sent_block
582 0 : fully_sent_block->release();
583 : }
584 : }
585 :
586 : } else {
587 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
588 : "Only part of the block at the front of pkt_chain_ "
589 : "was sent.\n"));
590 :
591 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
592 : "Advance the rd_ptr() of the front block (of pkt_chain_) "
593 : "by the num_bytes_left (%d).\n", num_bytes_left));
594 :
595 : // Only part of the current block was sent.
596 0 : pkt_chain_->rd_ptr(num_bytes_left);
597 :
598 0 : if (header_complete_) {
599 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
600 : "And since the packet header block has already been "
601 : "completely sent, add num_bytes_left to the "
602 : "num_non_header_bytes_sent.\n"));
603 :
604 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
605 : "Before, num_non_header_bytes_sent == %d.\n",
606 : num_non_header_bytes_sent));
607 :
608 : // We know that the current block isn't the packet header
609 : // block because the packet header block has already been
610 : // completely sent. We need to count these bytes in the
611 : // num_non_header_bytes_sent.
612 0 : num_non_header_bytes_sent += num_bytes_left;
613 :
614 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
615 : "After, num_non_header_bytes_sent == %d.\n",
616 : num_non_header_bytes_sent));
617 : }
618 :
619 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
620 : "Set the num_bytes_left to 0 now.\n"));
621 :
622 0 : num_bytes_left = 0;
623 : }
624 : }
625 : }
626 :
627 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
628 : "The 'num_bytes_left' loop has completed.\n"));
629 :
630 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
631 : "Adjust the header_.length_ to account for the "
632 : "num_non_header_bytes_sent.\n"));
633 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
634 : "Before, header_.length_ == %d.\n",
635 : header_.length_));
636 :
637 : // Adjust the packet header_.length_ to indicate how many non header
638 : // bytes are left to send.
639 0 : header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
640 :
641 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
642 : "After, header_.length_ == %d.\n",
643 : header_.length_));
644 :
645 : // Returns 0 if the entire packet was sent, and returns 1 otherwise.
646 0 : int rc = (header_.length_ == 0) ? 0 : 1;
647 :
648 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
649 : "Adjustments all done. Returning [%d]. 0 means entire packet "
650 : "has been sent. 1 means otherwise.\n",
651 : rc));
652 :
653 0 : return rc;
654 : }
655 :
656 : bool
657 0 : TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
658 : {
659 : DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
660 0 : TransportQueueElement* sample = 0;
661 0 : SendMode mode = MODE_NOT_SET;
662 :
663 0 : OPENDDS_VECTOR(TQESendModePair) samples;
664 :
665 0 : size_t num_delayed_notifications = 0;
666 0 : bool found_element = false;
667 :
668 : {
669 0 : GuardType guard(lock_);
670 :
671 0 : num_delayed_notifications = delayed_delivered_notification_queue_.size();
672 :
673 0 : if (num_delayed_notifications == 0) {
674 0 : return false;
675 :
676 0 : } else if (num_delayed_notifications == 1) {
677 : // Optimization for the most common case (doesn't need vectors)
678 :
679 0 : if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
680 0 : found_element = true;
681 0 : sample = delayed_delivered_notification_queue_[0].first;
682 0 : mode = delayed_delivered_notification_queue_[0].second;
683 :
684 0 : delayed_delivered_notification_queue_.clear();
685 : }
686 :
687 : } else {
688 0 : OPENDDS_VECTOR(TQESendModePair)::iterator iter;
689 0 : for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
690 0 : sample = iter->first;
691 0 : mode = iter->second;
692 0 : if (!match || match->matches(*sample)) {
693 0 : found_element = true;
694 0 : samples.push_back(*iter);
695 0 : iter = delayed_delivered_notification_queue_.erase(iter);
696 : } else {
697 0 : ++iter;
698 : }
699 : }
700 : }
701 0 : }
702 :
703 0 : if (!found_element) {
704 0 : return false;
705 : }
706 :
707 0 : bool transport_shutdown = true;
708 0 : TransportImpl_rch transport = transport_.lock();
709 0 : if (transport) {
710 0 : transport_shutdown = transport->is_shut_down();
711 : }
712 :
713 0 : if (num_delayed_notifications == 1) {
714 : // optimization for the common case
715 0 : if (mode == MODE_TERMINATED) {
716 0 : if (!transport_shutdown || sample->owned_by_transport()) {
717 0 : sample->data_dropped(true);
718 : }
719 : } else {
720 0 : if (!transport_shutdown || sample->owned_by_transport()) {
721 0 : sample->data_delivered();
722 : }
723 : }
724 :
725 : } else {
726 0 : for (size_t i = 0; i < samples.size(); ++i) {
727 0 : if (samples[i].second == MODE_TERMINATED) {
728 0 : if (!transport_shutdown || samples[i].first->owned_by_transport()) {
729 0 : samples[i].first->data_dropped(true);
730 : }
731 : } else {
732 0 : if (!transport_shutdown || samples[i].first->owned_by_transport()) {
733 0 : samples[i].first->data_delivered();
734 : }
735 : }
736 : }
737 : }
738 0 : return true;
739 0 : }
740 :
741 : /// Remove all samples in the backpressure queue and packet queue.
742 : void
743 0 : TransportSendStrategy::terminate_send(bool graceful_disconnecting)
744 : {
745 : DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
746 :
747 0 : bool reset_flag = true;
748 :
749 : {
750 0 : GuardType guard(lock_);
751 :
752 : // If the terminate_send call due to a non-graceful disconnection before
753 : // a datalink shutdown then we will not try to send the graceful disconnect
754 : // message.
755 0 : if ((mode_ == MODE_TERMINATED || mode_ == MODE_SUSPEND)
756 0 : && !graceful_disconnecting_) {
757 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
758 : "It was already terminated non gracefully, will not set to graceful disconnecting\n"));
759 0 : reset_flag = false;
760 : }
761 0 : }
762 :
763 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED\n"));
764 :
765 0 : clear(MODE_TERMINATED);
766 :
767 0 : if (reset_flag) {
768 0 : GuardType guard(lock_);
769 0 : graceful_disconnecting_ = graceful_disconnecting;
770 0 : }
771 0 : }
772 :
773 : void
774 0 : TransportSendStrategy::terminate_send_if_suspended()
775 : {
776 0 : }
777 :
778 : void
779 0 : TransportSendStrategy::clear(SendMode new_mode, SendMode old_mode)
780 : {
781 : DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
782 :
783 0 : send_delayed_notifications();
784 0 : QueueType elems;
785 0 : QueueType queue;
786 : {
787 0 : GuardType guard(lock_);
788 :
789 0 : if (old_mode != MODE_NOT_SET && mode_ != old_mode)
790 0 : return;
791 :
792 0 : if (header_.length_ > 0 && pkt_chain_) {
793 : // Clear the messages in the pkt_chain_ that is partially sent.
794 : // We just reuse these functions for normal partial send except actual sending.
795 0 : int num_bytes_left = static_cast<int>(pkt_chain_->total_length());
796 0 : int result = adjust_packet_after_send(num_bytes_left);
797 :
798 0 : if (result == 0) {
799 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
800 : "The adjustment logic says that the packet is cleared.\n"));
801 :
802 : } else {
803 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
804 : "The adjustment returned partial sent.\n"));
805 : }
806 : }
807 :
808 0 : elems.swap(elems_);
809 0 : queue.swap(queue_);
810 :
811 0 : header_.length_ = 0;
812 0 : pkt_chain_ = 0;
813 0 : header_complete_ = false;
814 0 : start_counter_ = 0;
815 0 : mode_ = new_mode;
816 0 : mode_before_suspend_ = MODE_NOT_SET;
817 0 : }
818 :
819 : // We need remove the queued elements outside the lock,
820 : // otherwise we have a deadlock situation when remove visitor
821 : // calls the data_dropped on each dropped elements.
822 :
823 : // Clear all samples in queue.
824 0 : RemoveAllVisitor remove_all_visitor;
825 :
826 0 : elems.accept_remove_visitor(remove_all_visitor);
827 0 : queue.accept_remove_visitor(remove_all_visitor);
828 0 : }
829 :
830 : int
831 0 : TransportSendStrategy::start()
832 : {
833 : DBG_ENTRY_LVL("TransportSendStrategy","start",6);
834 :
835 : {
836 0 : GuardType guard(lock_);
837 :
838 0 : if (!start_i()) {
839 0 : return -1;
840 : }
841 0 : }
842 :
843 0 : size_t header_chunks(1);
844 :
845 : // If a secondary send buffer is bound, sent headers should
846 : // be cached to properly maintain the buffer:
847 0 : if (send_buffer_ != 0) {
848 0 : header_chunks += send_buffer_->capacity();
849 :
850 : } else {
851 0 : header_chunks += 1;
852 : }
853 :
854 0 : header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
855 0 : header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
856 0 : header_db_lock_pool_.reset(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())));
857 0 : header_data_allocator_.reset(new DataAllocator(TheServiceParticipant->association_chunk_multiplier(), max_header_size_));
858 :
859 : // Since we (the TransportSendStrategy object) are a reference-counted
860 : // object, but the synch_ object doesn't necessarily know this, we need
861 : // to give a "copy" of a reference to ourselves to the synch_ object here.
862 : // We will do the reverse when we unregister ourselves (as a worker) from
863 : // the synch_ object.
864 :
865 0 : if (synch_->register_worker(*this) == -1) {
866 :
867 0 : ACE_ERROR_RETURN((LM_ERROR,
868 : "(%P|%t) ERROR: TransportSendStrategy failed to register "
869 : "as a worker with the ThreadSynch object.\n"),
870 : -1);
871 : }
872 :
873 0 : return 0;
874 : }
875 :
876 : void
877 0 : TransportSendStrategy::stop()
878 : {
879 : DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
880 :
881 0 : if (header_block_ != 0) {
882 0 : header_block_->release ();
883 0 : header_block_ = 0;
884 : }
885 :
886 0 : synch_->unregister_worker();
887 :
888 0 : QueueType elems;
889 0 : QueueType queue;
890 : {
891 0 : GuardType guard(lock_);
892 :
893 0 : if (pkt_chain_ != 0) {
894 0 : size_t size = pkt_chain_->total_length();
895 0 : if (size > 0) {
896 0 : pkt_chain_->release();
897 0 : ACE_DEBUG((LM_WARNING,
898 : ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
899 : ACE_TEXT("terminating with %d unsent bytes.\n"),
900 : size));
901 : }
902 0 : pkt_chain_ = 0;
903 : }
904 :
905 0 : if (elems_.size()) {
906 0 : elems_.swap(elems);
907 0 : ACE_DEBUG((LM_WARNING,
908 : ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
909 : ACE_TEXT("terminating with %d unsent elements.\n"),
910 : elems_.size()));
911 : }
912 :
913 0 : if (queue_.size()) {
914 0 : queue_.swap(queue);
915 0 : ACE_DEBUG((LM_WARNING,
916 : ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
917 : ACE_TEXT("terminating with %d queued elements.\n"),
918 : queue_.size()));
919 : }
920 0 : }
921 :
922 0 : RemoveAllVisitor remove_all_visitor;
923 :
924 0 : elems.accept_remove_visitor(remove_all_visitor);
925 0 : queue.accept_remove_visitor(remove_all_visitor);
926 :
927 : {
928 0 : GuardType guard(lock_);
929 :
930 0 : stop_i();
931 0 : }
932 0 : }
933 :
934 : void
935 0 : TransportSendStrategy::send(TransportQueueElement* element, bool relink)
936 : {
937 0 : if (Transport_debug_level > 9) {
938 0 : ACE_DEBUG((LM_DEBUG,
939 : ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
940 : ACE_TEXT("sending data at 0x%x.\n"),
941 : id(), element));
942 : }
943 :
944 : DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
945 :
946 : {
947 0 : GuardType guard(lock_);
948 :
949 0 : if (link_released_) {
950 0 : add_delayed_notification(element);
951 :
952 : } else {
953 0 : if (mode_ == MODE_TERMINATED && !graceful_disconnecting_) {
954 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
955 : "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
956 : "graceful disconnecting, so discard message.\n"));
957 0 : guard.release();
958 0 : element->data_dropped(true);
959 0 : return;
960 : }
961 :
962 0 : size_t element_length = element->msg()->total_length();
963 :
964 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
965 : "Send element msg() has total_length() == [%d].\n",
966 : element_length));
967 :
968 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
969 : "max_header_size_ == [%d].\n",
970 : max_header_size_));
971 :
972 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
973 : "max_size_ == [%d].\n",
974 : max_size_));
975 :
976 0 : const size_t max_msg_size = max_message_size();
977 :
978 : // Really an assert. We can't accept any element that wouldn't fit into
979 : // a transport packet by itself (ie, it would be the only element in the
980 : // packet). This max_size_ is the user-configurable maximum, not based
981 : // on the transport's inherent maximum message size. If max_msg_size
982 : // is non-zero, we will fragment so max_size_ doesn't apply per-element.
983 0 : if (max_msg_size == 0 &&
984 0 : max_header_size_ + element_length > max_size_) {
985 0 : ACE_ERROR((LM_ERROR,
986 : "(%P|%t) ERROR: Element too large (%Q) "
987 : "- won't fit into packet.\n", ACE_UINT64(element_length)));
988 0 : return;
989 : }
990 :
991 : // Check the mode_ to see if we simply put the element on the queue.
992 0 : if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
993 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
994 : "mode_ == %C, so queue elem and leave.\n",
995 : mode_as_str(mode_)), 5);
996 :
997 0 : queue_.put(element);
998 :
999 0 : if (mode_ != MODE_SUSPEND) {
1000 0 : synch_->work_available();
1001 : }
1002 :
1003 0 : return;
1004 : }
1005 :
1006 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1007 : "mode_ == MODE_DIRECT.\n"));
1008 :
1009 : // We are in the MODE_DIRECT send mode. When in this mode, the send()
1010 : // calls will "build up" the transport packet to be sent directly when it
1011 : // reaches the optimal size, contains the maximum number of samples, etc.
1012 :
1013 : // We need to check if the current element (the arg passed-in to this
1014 : // send() method) should be appended to the transport packet, or if the
1015 : // transport packet should be sent (directly) first, dealing with the
1016 : // current element afterwards.
1017 :
1018 : // We will decide to send the packet as it is now, under two circumstances:
1019 : //
1020 : // Either:
1021 : //
1022 : // (1) The current element won't fit into the current packet since it
1023 : // would violate the max_packet_size_.
1024 : //
1025 : // -OR-
1026 : //
1027 : // (2) There is at least one element already in the current packet,
1028 : // and the current element says that it must be sent in an
1029 : // exclusive packet (ie, in a packet all by itself).
1030 : //
1031 0 : const bool exclusive = element->requires_exclusive_packet();
1032 :
1033 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1034 : "The element %C require an exclusive packet.\n",
1035 : (exclusive ? "DOES" : "does NOT")
1036 : ));
1037 :
1038 : const size_t space_needed =
1039 : (max_msg_size > 0)
1040 0 : ? /* fragmenting */ DataSampleHeader::get_max_serialized_size() + MIN_FRAG
1041 0 : : /* not fragmenting */ element_length;
1042 :
1043 0 : if ((exclusive && (elems_.size() != 0))
1044 0 : || (current_space_available() < space_needed)) {
1045 :
1046 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1047 : "Element won't fit in current packet or requires exclusive"
1048 : " - send current packet (directly) now.\n"));
1049 :
1050 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1051 : "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
1052 : , max_header_size_, header_.length_, element_length));
1053 :
1054 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1055 : "Tot possible length: %d, max_len: %d\n"
1056 : , max_header_size_ + header_.length_ + element_length
1057 : , max_size_));
1058 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1059 : "current elem size: %d\n"
1060 : , elems_.size()));
1061 :
1062 : // Send the current packet, and deal with the current element
1063 : // afterwards.
1064 : // The invocation's relink status should dictate the direct_send's
1065 : // do_relink. We don't want a (relink == false) invocation to end up
1066 : // doing a relink. Think of (relink == false) as a non-blocking call.
1067 0 : direct_send(relink);
1068 :
1069 : // Now check to see if we flipped into MODE_QUEUE, which would mean
1070 : // that the direct_send() experienced backpressure, and the
1071 : // packet was only partially sent. If this has happened, we deal with
1072 : // the current element by placing it on the queue (and then we are done).
1073 : //
1074 : // Otherwise, if the mode_ is still MODE_DIRECT, we can just
1075 : // "drop" through to the next step in the logic where we append the
1076 : // current element to the current packet.
1077 0 : if (mode_ == MODE_QUEUE) {
1078 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1079 : "We experienced backpressure on that direct send, as "
1080 : "the mode_ is now MODE_QUEUE or MODE_SUSPEND. "
1081 : "Queue elem and leave.\n"), 5);
1082 0 : queue_.put(element);
1083 0 : synch_->work_available();
1084 :
1085 0 : return;
1086 : }
1087 : }
1088 :
1089 : // Loop for sending 'element', in fragments if needed
1090 0 : bool first_pkt = true; // enter the loop 1st time through unconditionally
1091 0 : for (TransportQueueElement* next_fragment = 0;
1092 0 : (first_pkt || next_fragment)
1093 0 : && (mode_ == MODE_DIRECT || mode_ == MODE_TERMINATED);) {
1094 : // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
1095 :
1096 0 : if (next_fragment) {
1097 0 : element = next_fragment;
1098 0 : element_length = next_fragment->msg()->total_length();
1099 0 : header_.first_fragment_ = false;
1100 : }
1101 :
1102 0 : header_.last_fragment_ = false;
1103 0 : if (max_msg_size) { // fragmentation enabled
1104 0 : const size_t avail = current_space_available();
1105 0 : if (element_length > avail) {
1106 0 : VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting %B > %B\n", element_length, avail), 0);
1107 0 : const TqePair ep = element->fragment(avail);
1108 0 : if (ep == null_tqe_pair) {
1109 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::send: "
1110 : "Element Fragmentation Failed\n"));
1111 0 : return;
1112 : }
1113 0 : element = ep.first;
1114 0 : element_length = element->msg()->total_length();
1115 0 : next_fragment = ep.second;
1116 0 : header_.first_fragment_ = first_pkt;
1117 0 : } else if (next_fragment) {
1118 : // We are sending the "tail" element of a previous fragment()
1119 : // operation, and this element didn't itself require fragmentation
1120 0 : header_.last_fragment_ = true;
1121 0 : next_fragment = 0;
1122 : }
1123 : }
1124 0 : first_pkt = false;
1125 :
1126 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1127 : "Start the 'append elem' to current packet logic.\n"));
1128 :
1129 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1130 : "Put element into current packet elems_.\n"));
1131 :
1132 : // Now that we know the current element should go into the current
1133 : // packet, we can just go ahead and "append" the current element to
1134 : // the current packet.
1135 :
1136 : // Add the current element to the collection of packet elements.
1137 0 : elems_.put(element);
1138 :
1139 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1140 : "Before, the header_.length_ == [%d].\n",
1141 : header_.length_));
1142 :
1143 : // Adjust the header_.length_ to account for the length of the element.
1144 0 : header_.length_ += static_cast<ACE_UINT32>(element_length);
1145 0 : const size_t message_length = header_.length_;
1146 :
1147 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1148 : "After adding element's length, the header_.length_ == [%d].\n",
1149 : message_length));
1150 :
1151 : // The current packet now contains the current element. We need to
1152 : // check to see if the conditions are such that we should go ahead and
1153 : // attempt to send the packet "directly" now, or if we can just leave
1154 : // and send the current packet later (in another send() call or in a
1155 : // send_stop() call).
1156 :
1157 : // There a few conditions that will cause us to attempt to send the
1158 : // packet (directly) right now:
1159 : // - Fragmentation was needed
1160 : // - The current packet has the maximum number of samples per packet.
1161 : // - The current packet's total length exceeds the optimum packet size.
1162 : // - The current element (currently part of the packet elems_)
1163 : // requires an exclusive packet.
1164 : //
1165 0 : if (next_fragment || (elems_.size() >= max_samples_)
1166 0 : || (max_header_size_ + message_length > optimum_size_)
1167 0 : || exclusive) {
1168 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1169 : "Now the current packet looks full - send it (directly).\n"));
1170 :
1171 0 : direct_send(relink);
1172 :
1173 0 : if (next_fragment && mode_ != MODE_DIRECT) {
1174 0 : if (mode_ == MODE_QUEUE) {
1175 0 : queue_.put(next_fragment);
1176 0 : synch_->work_available();
1177 :
1178 : } else {
1179 0 : next_fragment->data_dropped(true /* dropped by transport */);
1180 : }
1181 0 : } else if (mode_ == MODE_QUEUE) {
1182 : // Background thread handles packets in progress
1183 0 : synch_->work_available();
1184 : }
1185 :
1186 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1187 : "Back from the direct_send() attempt.\n"));
1188 :
1189 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1190 : "And we %C as a result of the direct_send() call.\n",
1191 : ((mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
1192 : : "stayed in MODE_DIRECT")));
1193 :
1194 : } else {
1195 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1196 : "Packet not sent. Send conditions weren't satisfied.\n"));
1197 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1198 : "elems_.size(): %d, max_samples_: %d\n",
1199 : int(elems_.size()), int(max_samples_)));
1200 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1201 : "header_size_: %d, optimum_size_: %d\n",
1202 : int(max_header_size_ + message_length),
1203 : int(optimum_size_)));
1204 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1205 : "element_requires_exclusive_packet: %d\n", int(exclusive)));
1206 :
1207 0 : if (mode_ == MODE_QUEUE) {
1208 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1209 : "We flipped into MODE_QUEUE.\n"));
1210 :
1211 : } else {
1212 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1213 : "We stayed in MODE_DIRECT.\n"));
1214 : }
1215 : }
1216 : }
1217 : }
1218 0 : }
1219 :
1220 0 : send_delayed_notifications();
1221 : }
1222 :
1223 : void
1224 0 : TransportSendStrategy::send_stop(GUID_t /*repoId*/)
1225 : {
1226 : DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
1227 : {
1228 0 : GuardType guard(lock_);
1229 :
1230 0 : if (link_released_)
1231 0 : return;
1232 :
1233 0 : if (start_counter_ == 0) {
1234 : // This is an indication of a logic error. This is more of an assert.
1235 0 : VDBG_LVL((LM_ERROR,
1236 : "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
1237 0 : return;
1238 : }
1239 :
1240 0 : --start_counter_;
1241 :
1242 0 : if (start_counter_ != 0) {
1243 : // This wasn't the last send_stop() that we are expecting. We only
1244 : // really honor the first send_start() and the last send_stop().
1245 : // We can return without doing anything else in this case.
1246 0 : return;
1247 : }
1248 :
1249 0 : if (mode_ == MODE_TERMINATED && !graceful_disconnecting_) {
1250 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1251 : "TransportSendStrategy::send_stop: dont try to send current packet "
1252 : "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
1253 0 : return;
1254 : }
1255 :
1256 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1257 : "This is an 'important' send_stop() event since our "
1258 : "start_counter_ is 0.\n"));
1259 :
1260 : // We just caused the start_counter_ to become zero. This
1261 : // means that we aren't expecting another send() or send_stop() at any
1262 : // time in the near future (ie, it isn't imminent).
1263 :
1264 : // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
1265 : // anything to do here because samples have already been going to the
1266 : // queue.
1267 :
1268 : // We only need to do something if the mode_ is
1269 : // MODE_DIRECT. It means that we may have some sample(s) in the
1270 : // current packet that have never been sent. This is our
1271 : // opportunity to send the current packet directly if this is the case.
1272 0 : if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
1273 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1274 : "But since we are in %C, we don't have to do "
1275 : "anything more in this important send_stop().\n",
1276 : mode_as_str(mode_)));
1277 : // We don't do anything if we are in MODE_QUEUE. Just leave.
1278 0 : return;
1279 : }
1280 :
1281 0 : size_t header_length = header_.length_;
1282 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1283 : "We are in MODE_DIRECT in an important send_stop() - "
1284 : "header_.length_ == [%d].\n", header_length));
1285 :
1286 : // Only attempt to send the current packet (directly) if the current
1287 : // packet actually contains something (it could be empty).
1288 0 : if ((header_length > 0) &&
1289 : //(elems_.size ()+not_yet_pac_q_->size() > 0))
1290 0 : (elems_.size() > 0)) {
1291 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1292 : "There is something in the current packet - attempt to send "
1293 : "it (directly) now.\n"));
1294 : // If a relink needs to be done for this packet to be sent, do it.
1295 0 : direct_send(true);
1296 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1297 : "Back from the attempt to send leftover packet directly.\n"));
1298 :
1299 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1300 : "But we %C as a result.\n",
1301 : ((mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
1302 : "stayed in MODE_DIRECT" )));
1303 0 : if (mode_ == MODE_QUEUE && mode_ != MODE_SUSPEND) {
1304 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1305 : "Notify Synch thread of work availability\n"));
1306 0 : synch_->work_available();
1307 : }
1308 : }
1309 0 : }
1310 :
1311 0 : send_delayed_notifications();
1312 : }
1313 :
1314 : void
1315 0 : TransportSendStrategy::remove_all_msgs(const GUID_t& pub_id)
1316 : {
1317 : DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
1318 :
1319 0 : const TransportQueueElement::MatchOnPubId match(pub_id);
1320 0 : send_delayed_notifications(&match);
1321 :
1322 0 : GuardType guard(lock_);
1323 :
1324 0 : if (send_buffer_ != 0) {
1325 : // If a secondary send buffer is bound, removed samples must
1326 : // be retained in order to properly maintain the buffer:
1327 0 : send_buffer_->retain_all(pub_id);
1328 : }
1329 :
1330 0 : do_remove_sample(pub_id, match, true);
1331 0 : }
1332 :
1333 : RemoveResult
1334 0 : TransportSendStrategy::remove_sample(const DataSampleElement* sample)
1335 : {
1336 : DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
1337 :
1338 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5);
1339 :
1340 : // The sample to remove is either in temporary delayed notification list or
1341 : // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
1342 : // notification list by transport thread, it needs acquire WriterDataContainer lock for
1343 : // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
1344 : // complete as this call already hold the WriterContainer's lock. So this call is safe to
1345 : // access the sample to remove. If it's going to be removed by this remove_sample() calling
1346 : // thread, it will be removed either from delayed notification list or from internal list
1347 : // in which case the element carry the info if the sample is released so the datalinkset
1348 : // can stop calling rest datalinks to remove this sample if it's already released..
1349 :
1350 0 : const char* const payload = sample->get_sample()->cont()->rd_ptr();
1351 0 : GUID_t pub_id = sample->get_pub_id();
1352 0 : const TransportQueueElement::MatchOnDataPayload modp(payload);
1353 0 : if (send_delayed_notifications(&modp)) {
1354 0 : return REMOVE_RELEASED;
1355 : }
1356 :
1357 0 : GuardType guard(lock_);
1358 0 : return do_remove_sample(pub_id, modp);
1359 0 : }
1360 :
1361 : RemoveResult
1362 0 : TransportSendStrategy::do_remove_sample(const GUID_t&,
1363 : const TransportQueueElement::MatchCriteria& criteria, bool remove_all)
1364 : {
1365 : DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
1366 :
1367 : //ciju: Tim had the idea that we could do the following check
1368 : // if ((mode_ == MODE_DIRECT) ||
1369 : // ((pkt_chain_ == 0) && (queue_ == empty)))
1370 : // then we can assume that the sample can be safely removed (no need for
1371 : // replacement) from the elems_ queue.
1372 0 : if ((mode_ == MODE_DIRECT)
1373 0 : || ((pkt_chain_ == 0) && (queue_.size() == 0))) {
1374 : //ciju: I believe this is the only mode where a safe
1375 : // assumption can be made that the samples
1376 : // in the elems_ queue aren't part of a packet.
1377 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1378 : "The mode is MODE_DIRECT, or the queue is empty and no "
1379 : "transport packet is in progress.\n"));
1380 :
1381 0 : QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1382 0 : elems_.accept_remove_visitor(simple_rem_vis);
1383 :
1384 0 : const RemoveResult status = simple_rem_vis.status();
1385 :
1386 0 : if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1387 0 : header_.length_ -= simple_rem_vis.removed_bytes();
1388 :
1389 0 : } else if (status == REMOVE_NOT_FOUND) {
1390 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1391 : "Failed to find the sample to remove.\n"));
1392 : }
1393 :
1394 0 : if (criteria.unique() || !remove_all) return status;
1395 0 : }
1396 :
1397 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1398 : "Visit the queue_ with the RemoveElementVisitor.\n"));
1399 :
1400 0 : QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1401 0 : queue_.accept_remove_visitor(simple_rem_vis);
1402 :
1403 0 : RemoveResult status = simple_rem_vis.status();
1404 :
1405 0 : if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1406 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1407 : "The sample was removed from the queue_.\n"));
1408 : // This means that the visitor did not encounter any fatal error
1409 : // along the way, *AND* the sample was found in the queue_,
1410 : // and has now been removed. We are done.
1411 0 : if (criteria.unique() || !remove_all) return status;
1412 : }
1413 :
1414 0 : if (status == REMOVE_ERROR) {
1415 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1416 : "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
1417 : // This means that the visitor encountered some fatal error along
1418 : // the way (and it already reported something to the log).
1419 : // Return our failure code.
1420 0 : return status;
1421 : }
1422 :
1423 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1424 : "The RemoveElementVisitor did not find the sample in queue_.\n"));
1425 :
1426 : // We get here if the visitor did not encounter any fatal error, but it
1427 : // also didn't find the sample - and hence it didn't perform any
1428 : // "remove sample" logic.
1429 :
1430 : // Now we need to turn our attention to the current transport packet,
1431 : // since the packet is likely in a "partially sent" state, and the
1432 : // sample may still be contributing unsent bytes in the pkt_chain_.
1433 :
1434 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1435 : "Visit our elems_ with the PacketRemoveVisitor.\n"));
1436 :
1437 : PacketRemoveVisitor pac_rem_vis(criteria,
1438 0 : pkt_chain_,
1439 : header_block_,
1440 0 : replaced_element_mb_allocator_,
1441 0 : replaced_element_db_allocator_,
1442 0 : remove_all);
1443 :
1444 0 : elems_.accept_replace_visitor(pac_rem_vis);
1445 :
1446 0 : status = pac_rem_vis.status();
1447 :
1448 0 : if (status == REMOVE_ERROR) {
1449 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1450 : "The PacketRemoveVisitor encountered a fatal error.\n"));
1451 :
1452 0 : } else if (status == REMOVE_NOT_FOUND) {
1453 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1454 : "The PacketRemoveVisitor didn't find the sample.\n"));
1455 :
1456 : } else {
1457 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1458 : "The PacketRemoveVisitor found the sample and removed it.\n"));
1459 : }
1460 :
1461 0 : return status;
1462 0 : }
1463 :
1464 : void
1465 0 : TransportSendStrategy::direct_send(bool do_relink)
1466 : {
1467 : DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
1468 :
1469 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1470 : "Prepare the current packet for a direct send attempt.\n"));
1471 :
1472 : // Prepare the packet for sending.
1473 0 : prepare_packet();
1474 :
1475 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1476 : "Now attempt to send the packet.\n"));
1477 :
1478 : // We will try resend the packet if the send() fails and then connection
1479 : // is re-established. Only loops if the "continue" line is hit.
1480 : while (true) {
1481 : // Attempt to send the packet
1482 0 : const SendPacketOutcome outcome = send_packet();
1483 :
1484 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1485 : "The outcome of the send_packet() was %d.\n", outcome));
1486 :
1487 0 : if ((outcome == OUTCOME_BACKPRESSURE) ||
1488 : (outcome == OUTCOME_PARTIAL_SEND)) {
1489 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1490 : "The outcome of the send_packet() was either "
1491 : "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
1492 :
1493 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1494 : "Flip into the MODE_QUEUE mode_.\n"), 5);
1495 :
1496 : // We encountered backpressure, or only sent part of the packet.
1497 0 : mode_ = MODE_QUEUE;
1498 :
1499 0 : } else if ((outcome == OUTCOME_PEER_LOST) ||
1500 : (outcome == OUTCOME_SEND_ERROR)) {
1501 0 : if (outcome == OUTCOME_SEND_ERROR) {
1502 0 : VDBG_LVL((LM_WARNING,
1503 : "(%P|%t) WARNING: Problem detected in "
1504 : "send buffer management: %p.\n",
1505 : "send_bytes"), 1);
1506 :
1507 0 : if (Transport_debug_level > 0) {
1508 0 : TransportImpl_rch transport = transport_.lock();
1509 0 : if (transport) {
1510 0 : transport->dump();
1511 : }
1512 0 : }
1513 : } else {
1514 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1515 : "The outcome of the send_packet() was "
1516 : "OUTCOME_PEER_LOST.\n"));
1517 : }
1518 :
1519 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1520 : "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
1521 :
1522 0 : if (mode_ != MODE_SUSPEND) {
1523 0 : mode_before_suspend_ = mode_;
1524 0 : mode_ = MODE_SUSPEND;
1525 : }
1526 :
1527 0 : if (do_relink) {
1528 0 : bool do_suspend = false;
1529 0 : relink(do_suspend);
1530 :
1531 0 : if (mode_ == MODE_SUSPEND) {
1532 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1533 : "The reconnect has not done yet and we are "
1534 : "still in MODE_SUSPEND.\n"), 5);
1535 :
1536 0 : } else if (mode_ == MODE_TERMINATED) {
1537 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1538 : "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
1539 :
1540 : } else {
1541 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1542 : "Try send the packet again since the connection "
1543 : "is re-established.\n"), 5);
1544 :
1545 : // Try send the packet again since the connection is re-established.
1546 0 : continue;
1547 0 : }
1548 : }
1549 :
1550 0 : } else {
1551 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1552 : "The outcome of the send_packet() must have been "
1553 : "OUTCOME_COMPLETE_SEND.\n"));
1554 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1555 : "So, we will just stay in MODE_DIRECT.\n"));
1556 : }
1557 :
1558 0 : break;
1559 0 : }
1560 :
1561 : // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
1562 0 : }
1563 :
1564 : void
1565 0 : TransportSendStrategy::get_packet_elems_from_queue()
1566 : {
1567 : DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
1568 :
1569 0 : for (TransportQueueElement* element = queue_.peek(); element != 0;
1570 0 : element = queue_.peek()) {
1571 :
1572 : // Total number of bytes in the current element's message block chain.
1573 0 : size_t element_length = element->msg()->total_length();
1574 :
1575 : // Flag used to determine if the element requires a packet all to itself.
1576 0 : const bool exclusive_packet = element->requires_exclusive_packet();
1577 :
1578 0 : const size_t avail = current_space_available();
1579 :
1580 0 : bool frag = false;
1581 0 : if (element_length > avail) {
1582 : // The current element won't fit into the current packet
1583 0 : if (max_message_size()) { // fragmentation enabled
1584 0 : header_.first_fragment_ = !element->is_fragment();
1585 0 : VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0);
1586 0 : const TqePair ep = element->fragment(avail);
1587 0 : if (ep == null_tqe_pair) {
1588 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::get_packet_elems_from_queue: "
1589 : "Element Fragmentation Failed\n"));
1590 0 : return;
1591 : }
1592 0 : element = ep.first;
1593 0 : element_length = element->msg()->total_length();
1594 0 : queue_.replace_head(ep.second);
1595 0 : frag = true; // queue_ is already taken care of, don't get() later
1596 : } else {
1597 0 : break;
1598 : }
1599 : }
1600 :
1601 : // If exclusive and the current packet is empty, we won't violate the
1602 : // exclusive_packet requirement by put()'ing the element
1603 : // into the elems_ collection.
1604 0 : if ((exclusive_packet && elems_.size() == 0)
1605 0 : || !exclusive_packet) {
1606 : // At this point, we have passed all of the pre-conditions and we can
1607 : // now extract the current element from the queue_, put it into the
1608 : // packet elems_, and adjust the packet header_.length_.
1609 0 : elems_.put(frag ? element : queue_.get());
1610 0 : if (header_.length_ == 0) {
1611 0 : header_.last_fragment_ = !frag && element->is_fragment();
1612 : }
1613 0 : header_.length_ += static_cast<ACE_UINT32>(element_length);
1614 0 : VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0);
1615 : }
1616 :
1617 : // With exclusive and (elems_.size() != 0), we don't use the current
1618 : // element as part of the packet. We know that there is already
1619 : // at least one element in the packet, and the current element
1620 : // is going to need its own (exclusive) packet. We will just
1621 : // use the packet elems_ as it is now. Always break once
1622 : // we've encountered and dealt with the exclusive_packet case.
1623 : // Also break if fragmentation was required.
1624 0 : if (exclusive_packet || frag
1625 : // If the current number of packet elems_ has reached the maximum
1626 : // number of samples per packet, then we are done.
1627 0 : || elems_.size() == max_samples_
1628 : // If the current value of the header_.length_ exceeds (or equals)
1629 : // the optimum_size_ for a packet, then we are done.
1630 0 : || header_.length_ >= optimum_size_) {
1631 0 : break;
1632 : }
1633 : }
1634 : }
1635 :
1636 : void
1637 0 : TransportSendStrategy::prepare_header()
1638 : {
1639 : DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
1640 :
1641 : // Increment header sequence for packet:
1642 0 : header_.sequence_ = ++header_sequence_;
1643 :
1644 : // Allow the specific implementation the opportunity to set
1645 : // values in the packet header.
1646 0 : prepare_header_i();
1647 0 : }
1648 :
1649 : void
1650 0 : TransportSendStrategy::prepare_header_i()
1651 : {
1652 : DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
1653 :
1654 : // Default implementation does nothing.
1655 0 : }
1656 :
1657 : void
1658 0 : TransportSendStrategy::prepare_packet()
1659 : {
1660 : DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
1661 :
1662 : // Prepare the header for sending.
1663 0 : prepare_header();
1664 :
1665 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1666 : "Marshal the packet header.\n"));
1667 :
1668 0 : if (header_block_ != 0) {
1669 0 : header_block_->release();
1670 : }
1671 :
1672 0 : ACE_NEW_MALLOC(header_block_,
1673 : static_cast<ACE_Message_Block*>(header_mb_allocator_->malloc()),
1674 : ACE_Message_Block(max_header_size_,
1675 : ACE_Message_Block::MB_DATA,
1676 : 0, // cont
1677 : 0, // data
1678 : header_data_allocator_.get(),
1679 : header_db_lock_pool_->get_lock(),
1680 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
1681 : ACE_Time_Value::zero,
1682 : ACE_Time_Value::max_time,
1683 : header_db_allocator_.get(),
1684 : header_mb_allocator_.get()));
1685 :
1686 0 : marshal_transport_header(header_block_);
1687 :
1688 0 : pkt_chain_ = header_block_->duplicate();
1689 :
1690 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1691 : "Use a BuildChainVisitor to visit the packet elems_.\n"));
1692 :
1693 : // Build up a chain of blocks by duplicating the message block chain
1694 : // held by each element (in elems_), and then chaining the new duplicate
1695 : // blocks together to form one long chain.
1696 0 : BuildChainVisitor visitor;
1697 0 : elems_.accept_visitor(visitor);
1698 :
1699 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1700 : "Attach the visitor's chain of blocks to the lone (packet "
1701 : "header) block currently in the pkt_chain_.\n"));
1702 :
1703 : // Attach the visitor's chain of blocks to the packet header block.
1704 0 : pkt_chain_->cont(visitor.chain());
1705 :
1706 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1707 : "Increment header sequence for next packet.\n"));
1708 :
1709 : // Allow the specific implementation the opportunity to process the
1710 : // newly prepared packet.
1711 0 : prepare_packet_i();
1712 :
1713 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1714 : "Set the header_complete_ flag to false.\n"));
1715 :
1716 : // Set the header_complete_ to false to indicate
1717 : // that the first block in the pkt_chain_ is the packet header block
1718 : // (actually a duplicate() of the packet header_block_).
1719 0 : header_complete_ = false;
1720 0 : }
1721 :
1722 : bool
1723 0 : TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
1724 : {
1725 0 : return *mb << header_;
1726 : }
1727 :
1728 : void
1729 0 : TransportSendStrategy::prepare_packet_i()
1730 : {
1731 : DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
1732 :
1733 : // Default implementation does nothing.
1734 0 : }
1735 :
1736 : void
1737 0 : TransportSendStrategy::set_graceful_disconnecting(bool flag)
1738 : {
1739 0 : graceful_disconnecting_ = flag;
1740 0 : }
1741 :
1742 : ssize_t
1743 0 : TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
1744 : {
1745 0 : if (Transport_debug_level > 9) {
1746 0 : ACE_DEBUG((LM_DEBUG,
1747 : ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
1748 : ACE_TEXT("sending data at 0x%x.\n"),
1749 : id(), packet));
1750 : }
1751 : DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
1752 :
1753 : #ifdef OPENDDS_SECURITY
1754 0 : Message_Block_Ptr substitute;
1755 0 : if (security_config()) {
1756 0 : const DDS::Security::CryptoTransform_var crypto = security_config()->get_crypto_transform();
1757 : // pre_send_packet may provide different data that takes the place of the
1758 : // original "packet" (used for security encryption/authentication)
1759 0 : if (crypto) {
1760 0 : substitute.reset(pre_send_packet(packet));
1761 0 : if (!substitute) {
1762 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: pre_send_packet returned NULL, dropping.\n"));
1763 0 : return packet->total_length();
1764 : }
1765 : }
1766 0 : }
1767 : #endif
1768 :
1769 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1770 : "Populate the iovec array using the packet.\n"), 5);
1771 :
1772 : iovec iov[MAX_SEND_BLOCKS];
1773 :
1774 : #ifdef OPENDDS_SECURITY
1775 0 : const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
1776 : #else
1777 : const int num_blocks = mb_to_iov(*packet, iov);
1778 : #endif
1779 :
1780 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1781 : "There are [%d] number of entries in the iovec array.\n",
1782 : num_blocks), 5);
1783 :
1784 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1785 : "Attempt to send_bytes() now.\n"), 5);
1786 :
1787 0 : const ssize_t num_bytes_sent = send_bytes(iov, num_blocks, bp);
1788 :
1789 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1790 : "The send_bytes() said that num_bytes_sent == [%d].\n",
1791 : num_bytes_sent), 5);
1792 :
1793 : #ifdef OPENDDS_SECURITY
1794 0 : if (num_bytes_sent > 0 && substitute && packet->data_block() != substitute->data_block()) {
1795 : // Although the "substitute" data took the place of "packet", the rest
1796 : // of the framework needs to account for the bytes in "packet" being taken
1797 : // care of, as if they were actually sent.
1798 : // Since this is done with datagram sockets, partial sends aren't possible.
1799 0 : return packet->total_length();
1800 : }
1801 : #endif
1802 :
1803 0 : return num_bytes_sent;
1804 0 : }
1805 :
1806 : TransportSendStrategy::SendPacketOutcome
1807 0 : TransportSendStrategy::send_packet()
1808 : {
1809 : DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
1810 :
1811 0 : int bp_flag = 0;
1812 : const ssize_t num_bytes_sent =
1813 0 : do_send_packet(pkt_chain_, bp_flag);
1814 :
1815 0 : if (num_bytes_sent == 0) {
1816 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1817 : "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
1818 : // This means that the peer has disconnected.
1819 0 : return OUTCOME_PEER_LOST;
1820 : }
1821 :
1822 0 : if (num_bytes_sent < 0) {
1823 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1824 : "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
1825 :
1826 : // Check for backpressure...
1827 0 : if (bp_flag == 1) {
1828 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1829 : "Since backpressure flag is true, return "
1830 : "OUTCOME_BACKPRESSURE.\n"), 5);
1831 : // Ok. Not really an error - just backpressure.
1832 0 : return OUTCOME_BACKPRESSURE;
1833 : }
1834 :
1835 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1836 : "Since backpressure flag is false, return "
1837 : "OUTCOME_SEND_ERROR.\n"), 5);
1838 :
1839 : // Not backpressure - it's a real error.
1840 : // Note: moved this to send_bytes so the errno msg could be written.
1841 : //ACE_ERROR((LM_ERROR,
1842 : // "(%P|%t) ERROR: Call to peer().send() failed with negative "
1843 : // "return code.\n"));
1844 :
1845 0 : return OUTCOME_SEND_ERROR;
1846 : }
1847 :
1848 0 : if (send_buffer_ != 0) {
1849 : // If a secondary send buffer is bound, sent samples must
1850 : // be inserted in order to properly maintain the buffer:
1851 0 : send_buffer_->insert(header_.sequence_,
1852 : &elems_, pkt_chain_);
1853 : }
1854 :
1855 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1856 : "Since num_bytes_sent > 0, adjust the packet to account for "
1857 : "the bytes that did get sent.\n"),5);
1858 :
1859 : // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
1860 : // to account for the bytes that have been sent.
1861 : const int result =
1862 0 : adjust_packet_after_send(num_bytes_sent);
1863 :
1864 0 : if (result == 0) {
1865 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1866 : "The adjustment logic says that the complete packet was "
1867 : "sent. Return OUTCOME_COMPLETE_SEND.\n"));
1868 0 : return OUTCOME_COMPLETE_SEND;
1869 : }
1870 :
1871 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
1872 : "The adjustment logic says that only a part of the packet was "
1873 : "sent. Return OUTCOME_PARTIAL_SEND.\n"));
1874 :
1875 0 : return OUTCOME_PARTIAL_SEND;
1876 : }
1877 :
1878 : ssize_t
1879 0 : TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
1880 : {
1881 0 : int val = 0;
1882 0 : ACE_HANDLE handle = get_handle();
1883 :
1884 0 : if (handle == ACE_INVALID_HANDLE)
1885 0 : return -1;
1886 :
1887 0 : ACE::record_and_set_non_blocking_mode(handle, val);
1888 :
1889 : // Set the back-pressure flag to false.
1890 0 : bp = 0;
1891 :
1892 : // Clear errno
1893 0 : errno = 0;
1894 :
1895 0 : ssize_t result = send_bytes_i(iov, n);
1896 :
1897 0 : if (result == -1) {
1898 0 : if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
1899 0 : VDBG((LM_DEBUG,"(%P|%t) DBG: "
1900 : "Backpressure encountered.\n"));
1901 : // Set the back-pressure flag to true
1902 0 : bp = 1;
1903 :
1904 0 : } else {
1905 0 : VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
1906 : ACE_TEXT("sendv"), n),1);
1907 :
1908 : // try to get the application to core when "Bad Address" is returned
1909 : // by looking at the iovec
1910 0 : for (int ii = 0; ii < n; ii++) {
1911 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
1912 : ii, iov[ii].iov_len, iov[ii].iov_base));
1913 : }
1914 : }
1915 : }
1916 :
1917 0 : VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
1918 : "The sendv() returned [%d].\n", result), 5);
1919 :
1920 0 : ACE::restore_non_blocking_mode(handle, val);
1921 :
1922 0 : return result;
1923 : }
1924 :
1925 : void
1926 0 : TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
1927 : {
1928 0 : if (Transport_debug_level) {
1929 0 : size_t size = delayed_delivered_notification_queue_.size();
1930 0 : if ((size > 0) && (size % max_samples_ == 0)) {
1931 0 : ACE_DEBUG((LM_DEBUG,
1932 : "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
1933 : size));
1934 : }
1935 : }
1936 :
1937 0 : delayed_delivered_notification_queue_.push_back(std::make_pair(element, mode_.load()));
1938 0 : }
1939 :
1940 0 : void TransportSendStrategy::deliver_ack_request(TransportQueueElement* element)
1941 : {
1942 0 : const TransportQueueElement::MatchOnElement moe(element);
1943 : {
1944 0 : GuardType guard(lock_);
1945 0 : do_remove_sample(GUID_UNKNOWN, moe);
1946 0 : }
1947 :
1948 0 : element->data_delivered();
1949 0 : }
1950 :
1951 0 : size_t TransportSendStrategy::space_available(size_t already_used) const
1952 : {
1953 0 : const size_t used = max_header_size_ + already_used;
1954 0 : const size_t max_msg = max_message_size();
1955 0 : if (max_msg) {
1956 0 : return std::min(static_cast<size_t>(max_size_), max_msg) - used;
1957 : }
1958 0 : return max_size_ - used;
1959 : }
1960 :
1961 0 : size_t TransportSendStrategy::current_space_available() const
1962 : {
1963 0 : return space_available(header_.length_);
1964 : }
1965 :
1966 : int
1967 0 : TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
1968 : {
1969 0 : int num_blocks = 0;
1970 : #ifdef _MSC_VER
1971 : #pragma warning(push)
1972 : // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
1973 : // since on other platforms iov_len is 64-bit
1974 : #pragma warning(disable : 4267)
1975 : #endif
1976 0 : for (const ACE_Message_Block* block = &msg;
1977 0 : block && num_blocks < MAX_SEND_BLOCKS;
1978 0 : block = block->cont()) {
1979 0 : iov[num_blocks].iov_len = block->length();
1980 0 : iov[num_blocks++].iov_base = block->rd_ptr();
1981 : }
1982 : #ifdef _MSC_VER
1983 : #pragma warning(pop)
1984 : #endif
1985 0 : return num_blocks;
1986 : }
1987 :
1988 0 : bool TransportSendStrategy::fragmentation_helper(
1989 : TransportQueueElement* original_element, TqeVector& elements_to_send)
1990 : {
1991 0 : original_element->increment_loan();
1992 0 : const size_t space = space_available();
1993 0 : for (TransportQueueElement* e = original_element; e;) {
1994 0 : const size_t esize = e->msg()->total_length();
1995 0 : if (esize > space) {
1996 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportSendStrategy::fragmentation_helper: "
1997 : "message size %B > space %B: Fragmenting\n", esize, space), 0);
1998 0 : const TqePair pair = e->fragment(space);
1999 0 : if (pair == null_tqe_pair) {
2000 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::fragmentation_helper: "
2001 : "Element Fragmentation Failed\n"));
2002 0 : return false;
2003 : }
2004 0 : elements_to_send.push_back(pair.first);
2005 0 : e = pair.second;
2006 : } else {
2007 0 : elements_to_send.push_back(e);
2008 0 : e = 0;
2009 : }
2010 : }
2011 0 : return true;
2012 : }
2013 :
2014 : } // namespace DCPS
2015 : } // namespace OpenDDS
2016 :
2017 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|