Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : #include "TransportQueueElement.h" 10 : #include "EntryExit.h" 11 : #include "TransportCustomizedElement.h" 12 : #include "dds/DCPS/DataSampleHeader.h" 13 : 14 : #if !defined (__ACE_INLINE__) 15 : # include "TransportQueueElement.inl" 16 : #endif /* !__ACE_INLINE__ */ 17 : 18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 19 : 20 : namespace OpenDDS { 21 : namespace DCPS { 22 : 23 : const TqePair null_tqe_pair; 24 : 25 4 : TransportQueueElement::~TransportQueueElement() 26 : { 27 : DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6); 28 4 : } 29 : 30 : bool 31 0 : TransportQueueElement::requires_exclusive_packet() const 32 : { 33 : DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6); 34 0 : return false; 35 : } 36 : 37 : bool 38 0 : TransportQueueElement::is_control(GUID_t /*pub_id*/) const 39 : { 40 : DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6); 41 0 : return false; 42 : } 43 : 44 0 : TqePair TransportQueueElement::fragment(size_t size) 45 : { 46 0 : Message_Block_Ptr head; 47 0 : Message_Block_Ptr tail; 48 0 : DataSampleHeader::split(*msg(), size, head, tail); 49 : 50 0 : TransportCustomizedElement* frag = new TransportCustomizedElement(0); 51 0 : frag->set_fragment(this); 52 0 : frag->set_msg(move(head)); 53 : 54 0 : TransportCustomizedElement* rest = new TransportCustomizedElement(this); 55 0 : rest->set_fragment(this); 56 0 : rest->set_msg(move(tail)); 57 : 58 0 : return TqePair(frag, rest); 59 0 : } 60 : 61 : ACE_Message_Block* 62 1 : TransportQueueElement::clone_mb(const ACE_Message_Block* msg, 63 : MessageBlockAllocator* mb_allocator, 64 : DataBlockAllocator* db_allocator) 65 : { 66 1 : ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg); 67 1 : ACE_Message_Block* head_copy = 0; 68 1 : ACE_Message_Block* cur_copy = 0; 69 1 : ACE_Message_Block* prev_copy = 0; 70 : // deep copy sample data 71 4 : while (cur_block != 0) { 72 3 : ACE_NEW_MALLOC_RETURN(cur_copy, 73 : static_cast<ACE_Message_Block*>( 74 : mb_allocator->malloc(sizeof(ACE_Message_Block))), 75 : ACE_Message_Block(cur_block->capacity(), 76 : ACE_Message_Block::MB_DATA, 77 : 0, //cont 78 : 0, //data 79 : 0, //alloc_strategy 80 : 0, //locking_strategy 81 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 82 : ACE_Time_Value::zero, 83 : ACE_Time_Value::max_time, 84 : db_allocator, 85 : mb_allocator), 86 : 0); 87 : 88 3 : cur_copy->copy(cur_block->base(), cur_block->size()); 89 3 : cur_copy->rd_ptr(cur_copy->base() + 90 3 : (cur_block->rd_ptr() - cur_block->base())); 91 3 : cur_copy->wr_ptr(cur_copy->base() + 92 3 : (cur_block->wr_ptr() - cur_block->base())); 93 : 94 3 : if (head_copy == 0) { 95 1 : head_copy = cur_copy; 96 : } else { 97 2 : prev_copy->cont(cur_copy); 98 : } 99 : 100 3 : prev_copy = cur_copy; 101 : 102 3 : cur_block = cur_block->cont(); 103 : } 104 : 105 1 : return head_copy; 106 : } 107 : 108 0 : TransportQueueElement::MatchCriteria::~MatchCriteria() 109 : { 110 0 : } 111 : 112 0 : TransportQueueElement::MatchOnPubId::~MatchOnPubId() 113 : { 114 0 : } 115 : 116 0 : TransportQueueElement::MatchOnDataPayload::~MatchOnDataPayload() 117 : { 118 0 : } 119 : 120 0 : TransportQueueElement::MatchOnElement::~MatchOnElement() 121 : { 122 0 : } 123 : 124 : } 125 : } 126 : 127 : OPENDDS_END_VERSIONED_NAMESPACE_DECL