OpenDDS  Snapshot(2023/04/28-20:55)
TransportQueueElement.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
10 #include "EntryExit.h"
13 
14 #if !defined (__ACE_INLINE__)
15 # include "TransportQueueElement.inl"
16 #endif /* !__ACE_INLINE__ */
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
24 
26 {
27  DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
28 }
29 
30 bool
32 {
33  DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
34  return false;
35 }
36 
37 bool
39 {
40  DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
41  return false;
42 }
43 
45 {
46  Message_Block_Ptr head;
47  Message_Block_Ptr tail;
48  DataSampleHeader::split(*msg(), size, head, tail);
49 
51  frag->set_fragment(this);
52  frag->set_msg(move(head));
53 
55  rest->set_fragment(this);
56  rest->set_msg(move(tail));
57 
58  return TqePair(frag, rest);
59 }
60 
63  MessageBlockAllocator* mb_allocator,
64  DataBlockAllocator* db_allocator)
65 {
66  ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
67  ACE_Message_Block* head_copy = 0;
68  ACE_Message_Block* cur_copy = 0;
69  ACE_Message_Block* prev_copy = 0;
70  // deep copy sample data
71  while (cur_block != 0) {
72  ACE_NEW_MALLOC_RETURN(cur_copy,
73  static_cast<ACE_Message_Block*>(
74  mb_allocator->malloc(sizeof(ACE_Message_Block))),
75  ACE_Message_Block(cur_block->capacity(),
77  0, //cont
78  0, //data
79  0, //alloc_strategy
80  0, //locking_strategy
84  db_allocator,
85  mb_allocator),
86  0);
87 
88  cur_copy->copy(cur_block->base(), cur_block->size());
89  cur_copy->rd_ptr(cur_copy->base() +
90  (cur_block->rd_ptr() - cur_block->base()));
91  cur_copy->wr_ptr(cur_copy->base() +
92  (cur_block->wr_ptr() - cur_block->base()));
93 
94  if (head_copy == 0) {
95  head_copy = cur_copy;
96  } else {
97  prev_copy->cont(cur_copy);
98  }
99 
100  prev_copy = cur_copy;
101 
102  cur_block = cur_block->cont();
103  }
104 
105  return head_copy;
106 }
107 
109 {
110 }
111 
113 {
114 }
115 
117 {
118 }
119 
121 {
122 }
123 
124 }
125 }
126 
static const ACE_Time_Value max_time
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
static void split(const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
char * rd_ptr(void) const
int copy(const char *buf, size_t n)
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual bool requires_exclusive_packet() const
Does the sample require an exclusive transport packet?
size_t size(void) const
ACE_Message_Block * cont(void) const
const TqePair null_tqe_pair
size_t capacity(void) const
char * wr_ptr(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static ACE_Message_Block * clone_mb(const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)
virtual bool is_control(GUID_t pub_id) const
Is the element a "control" sample from the specified pub_id?
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
char * base(void) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)