TransportQueueElement.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportQueueElement.h"
00010 #include "EntryExit.h"
00011 #include "TransportCustomizedElement.h"
00012 #include "dds/DCPS/DataSampleHeader.h"
00013 
00014 #if !defined (__ACE_INLINE__)
00015 # include "TransportQueueElement.inl"
00016 #endif /* !__ACE_INLINE__ */
00017 
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 namespace OpenDDS {
00021 namespace DCPS {
00022 
00023 TransportQueueElement::~TransportQueueElement()
00024 {
00025   DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
00026 }
00027 
00028 bool
00029 TransportQueueElement::requires_exclusive_packet() const
00030 {
00031   DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
00032   return false;
00033 }
00034 
00035 bool
00036 TransportQueueElement::is_control(RepoId /*pub_id*/) const
00037 {
00038   DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
00039   return false;
00040 }
00041 
00042 ElementPair
00043 TransportQueueElement::fragment(size_t size)
00044 {
00045   Message_Block_Ptr head;
00046   Message_Block_Ptr tail;
00047   DataSampleHeader::split(*msg(), size, head, tail);
00048 
00049   TransportCustomizedElement* frag = new TransportCustomizedElement(0, true);
00050   frag->set_publication_id(publication_id());
00051   frag->set_msg(move(head));
00052 
00053   TransportCustomizedElement* rest =
00054     new TransportCustomizedElement(this, true);
00055   rest->set_msg(move(tail));
00056 
00057   return ElementPair(frag, rest);
00058 }
00059 
00060 ACE_Message_Block*
00061 TransportQueueElement::clone_mb(const ACE_Message_Block* msg,
00062                                 MessageBlockAllocator* mb_allocator,
00063                                 DataBlockAllocator* db_allocator)
00064 {
00065   ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
00066   ACE_Message_Block* head_copy = 0;
00067   ACE_Message_Block* cur_copy  = 0;
00068   ACE_Message_Block* prev_copy = 0;
00069   // deep copy sample data
00070   while (cur_block != 0) {
00071     ACE_NEW_MALLOC_RETURN(cur_copy,
00072                           static_cast<ACE_Message_Block*>(
00073                           mb_allocator->malloc(sizeof(ACE_Message_Block))),
00074                           ACE_Message_Block(cur_block->capacity(),
00075                                             ACE_Message_Block::MB_DATA,
00076                                             0, //cont
00077                                             0, //data
00078                                             0, //alloc_strategy
00079                                             0, //locking_strategy
00080                                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00081                                             ACE_Time_Value::zero,
00082                                             ACE_Time_Value::max_time,
00083                                             db_allocator,
00084                                             mb_allocator),
00085                           0);
00086 
00087     cur_copy->copy(cur_block->base(), cur_block->size());
00088     cur_copy->rd_ptr(cur_copy->base() +
00089                      (cur_block->rd_ptr() - cur_block->base()));
00090     cur_copy->wr_ptr(cur_copy->base() +
00091                      (cur_block->wr_ptr() - cur_block->base()));
00092 
00093     if (head_copy == 0) {
00094       head_copy = cur_copy;
00095     } else {
00096       prev_copy->cont(cur_copy);
00097     }
00098 
00099     prev_copy = cur_copy;
00100 
00101     cur_block = cur_block->cont();
00102   }
00103 
00104   return head_copy;
00105 }
00106 
00107 TransportQueueElement::MatchCriteria::~MatchCriteria()
00108 {
00109 }
00110 
00111 TransportQueueElement::MatchOnPubId::~MatchOnPubId()
00112 {
00113 }
00114 
00115 TransportQueueElement::MatchOnDataPayload::~MatchOnDataPayload()
00116 {
00117 }
00118 
00119 }
00120 }
00121 
00122 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1