00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportQueueElement.h"
00010 #include "EntryExit.h"
00011 #include "TransportCustomizedElement.h"
00012 #include "dds/DCPS/DataSampleHeader.h"
00013
00014 #if !defined (__ACE_INLINE__)
00015 # include "TransportQueueElement.inl"
00016 #endif
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 TransportQueueElement::~TransportQueueElement()
00022 {
00023 DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
00024 }
00025
00026 bool
00027 TransportQueueElement::requires_exclusive_packet() const
00028 {
00029 DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
00030 return false;
00031 }
00032
00033 bool
00034 TransportQueueElement::is_control(RepoId ) const
00035 {
00036 DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
00037 return false;
00038 }
00039
00040 ElementPair
00041 TransportQueueElement::fragment(size_t size)
00042 {
00043 ACE_Message_Block* head;
00044 ACE_Message_Block* tail;
00045 DataSampleHeader::split(*msg(), size, head, tail);
00046
00047 TransportCustomizedElement* frag = TransportCustomizedElement::alloc(0, true);
00048 frag->set_publication_id(publication_id());
00049 frag->set_msg(head);
00050
00051 TransportCustomizedElement* rest =
00052 TransportCustomizedElement::alloc(this, true);
00053 rest->set_msg(tail);
00054
00055 return ElementPair(frag, rest);
00056 }
00057
00058 ACE_Message_Block*
00059 TransportQueueElement::clone_mb(const ACE_Message_Block* msg,
00060 MessageBlockAllocator* mb_allocator,
00061 DataBlockAllocator* db_allocator)
00062 {
00063 ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
00064 ACE_Message_Block* head_copy = 0;
00065 ACE_Message_Block* cur_copy = 0;
00066 ACE_Message_Block* prev_copy = 0;
00067
00068 while (cur_block != 0) {
00069 ACE_NEW_MALLOC_RETURN(cur_copy,
00070 static_cast<ACE_Message_Block*>(
00071 mb_allocator->malloc(sizeof(ACE_Message_Block))),
00072 ACE_Message_Block(cur_block->capacity(),
00073 ACE_Message_Block::MB_DATA,
00074 0,
00075 0,
00076 0,
00077 0,
00078 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00079 ACE_Time_Value::zero,
00080 ACE_Time_Value::max_time,
00081 db_allocator,
00082 mb_allocator),
00083 0);
00084
00085 cur_copy->copy(cur_block->base(), cur_block->size());
00086 cur_copy->rd_ptr(cur_copy->base() +
00087 (cur_block->rd_ptr() - cur_block->base()));
00088 cur_copy->wr_ptr(cur_copy->base() +
00089 (cur_block->wr_ptr() - cur_block->base()));
00090
00091 if (head_copy == 0) {
00092 head_copy = cur_copy;
00093 } else {
00094 prev_copy->cont(cur_copy);
00095 }
00096
00097 prev_copy = cur_copy;
00098
00099 cur_block = cur_block->cont();
00100 }
00101
00102 return head_copy;
00103 }
00104
00105 TransportQueueElement::MatchCriteria::~MatchCriteria()
00106 {
00107 }
00108
00109 TransportQueueElement::MatchOnPubId::~MatchOnPubId()
00110 {
00111 }
00112
00113 TransportQueueElement::MatchOnDataPayload::~MatchOnDataPayload()
00114 {
00115 }
00116
00117 }
00118 }