TransportQueueElement.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportQueueElement.h"
00010 #include "EntryExit.h"
00011 #include "TransportCustomizedElement.h"
00012 #include "dds/DCPS/DataSampleHeader.h"
00013
00014 #if !defined (__ACE_INLINE__)
00015 # include "TransportQueueElement.inl"
00016 #endif
00017
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 TransportQueueElement::~TransportQueueElement()
00024 {
00025 DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
00026 }
00027
00028 bool
00029 TransportQueueElement::requires_exclusive_packet() const
00030 {
00031 DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
00032 return false;
00033 }
00034
00035 bool
00036 TransportQueueElement::is_control(RepoId ) const
00037 {
00038 DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
00039 return false;
00040 }
00041
00042 ElementPair
00043 TransportQueueElement::fragment(size_t size)
00044 {
00045 Message_Block_Ptr head;
00046 Message_Block_Ptr tail;
00047 DataSampleHeader::split(*msg(), size, head, tail);
00048
00049 TransportCustomizedElement* frag = new TransportCustomizedElement(0, true);
00050 frag->set_publication_id(publication_id());
00051 frag->set_msg(move(head));
00052
00053 TransportCustomizedElement* rest =
00054 new TransportCustomizedElement(this, true);
00055 rest->set_msg(move(tail));
00056
00057 return ElementPair(frag, rest);
00058 }
00059
00060 ACE_Message_Block*
00061 TransportQueueElement::clone_mb(const ACE_Message_Block* msg,
00062 MessageBlockAllocator* mb_allocator,
00063 DataBlockAllocator* db_allocator)
00064 {
00065 ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
00066 ACE_Message_Block* head_copy = 0;
00067 ACE_Message_Block* cur_copy = 0;
00068 ACE_Message_Block* prev_copy = 0;
00069
00070 while (cur_block != 0) {
00071 ACE_NEW_MALLOC_RETURN(cur_copy,
00072 static_cast<ACE_Message_Block*>(
00073 mb_allocator->malloc(sizeof(ACE_Message_Block))),
00074 ACE_Message_Block(cur_block->capacity(),
00075 ACE_Message_Block::MB_DATA,
00076 0,
00077 0,
00078 0,
00079 0,
00080 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00081 ACE_Time_Value::zero,
00082 ACE_Time_Value::max_time,
00083 db_allocator,
00084 mb_allocator),
00085 0);
00086
00087 cur_copy->copy(cur_block->base(), cur_block->size());
00088 cur_copy->rd_ptr(cur_copy->base() +
00089 (cur_block->rd_ptr() - cur_block->base()));
00090 cur_copy->wr_ptr(cur_copy->base() +
00091 (cur_block->wr_ptr() - cur_block->base()));
00092
00093 if (head_copy == 0) {
00094 head_copy = cur_copy;
00095 } else {
00096 prev_copy->cont(cur_copy);
00097 }
00098
00099 prev_copy = cur_copy;
00100
00101 cur_block = cur_block->cont();
00102 }
00103
00104 return head_copy;
00105 }
00106
00107 TransportQueueElement::MatchCriteria::~MatchCriteria()
00108 {
00109 }
00110
00111 TransportQueueElement::MatchOnPubId::~MatchOnPubId()
00112 {
00113 }
00114
00115 TransportQueueElement::MatchOnDataPayload::~MatchOnDataPayload()
00116 {
00117 }
00118
00119 }
00120 }
00121
00122 OPENDDS_END_VERSIONED_NAMESPACE_DECL