DataSampleHeader.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataSampleHeader.h"
00010 #include "Serializer.h"
00011 #include "GuidConverter.h"
00012 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00013 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00014 #include "dds/DCPS/SafetyProfileStreams.h"
00015 #include <cstdio>
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "DataSampleHeader.inl"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 namespace {
00022 
00023   bool mb_copy(char& dest, const ACE_Message_Block& mb, size_t offset, bool)
00024   {
00025     dest = mb.rd_ptr()[offset];
00026     return true;
00027   }
00028 
00029   template <typename T>
00030   bool mb_copy(T& dest, const ACE_Message_Block& mb, size_t offset, bool swap)
00031   {
00032     if (mb.length() >= sizeof(T)) {
00033       // Avoid creating ACE_Message_Block from the heap if we just need one.
00034       ACE_Message_Block temp(mb.data_block (), ACE_Message_Block::DONT_DELETE);
00035       temp.rd_ptr(mb.rd_ptr()+offset);
00036       temp.wr_ptr(mb.wr_ptr());
00037       OpenDDS::DCPS::Serializer ser(&temp, swap);
00038       ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), swap);
00039       return true;
00040     }
00041 
00042     OpenDDS::DCPS::Message_Block_Ptr temp(mb.duplicate());
00043     if (!temp) { // couldn't allocate
00044       return false;
00045     }
00046     temp->rd_ptr(offset);
00047     if (temp->total_length() < sizeof(T)) {
00048       return false;
00049     }
00050     OpenDDS::DCPS::Serializer ser(temp.get(), swap);
00051     ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), swap);
00052     return true;
00053   }
00054 
00055   // Skip "offset" bytes from the mb and copy the subsequent data
00056   // (sizeof(T) bytes) into dest.  Return false if there is not enough data
00057   // in the mb to complete the operation.  Continuation pointers are followed.
00058   template <typename T>
00059   bool mb_peek(T& dest, const ACE_Message_Block& mb, size_t offset, bool swap)
00060   {
00061     for (const ACE_Message_Block* iter = &mb; iter; iter = iter->cont()) {
00062       const size_t len = iter->length();
00063       if (len > offset) {
00064         return mb_copy(dest, *iter, offset, swap);
00065       }
00066       offset -= len;
00067     }
00068     return false;
00069   }
00070 }
00071 
00072 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00073 
00074 namespace OpenDDS {
00075 namespace DCPS {
00076 
00077 // Allocate a new message block using the allocators from an existing
00078 // message block, "mb".  Use of mb's data_allocator_ is optional.
00079 ACE_Message_Block*
00080 DataSampleHeader::alloc_msgblock(const ACE_Message_Block& mb,
00081                                  size_t size, bool use_data_alloc)
00082 {
00083   enum { DATA, DB, MB, N_ALLOC };
00084   ACE_Allocator* allocators[N_ALLOC];
00085   // It's an ACE bug that access_allocators isn't const
00086   ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
00087   mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
00088   if (allocators[MB]) {
00089     ACE_Message_Block* result;
00090     ACE_NEW_MALLOC_RETURN(result,
00091       static_cast<ACE_Message_Block*>(
00092         allocators[MB]->malloc(sizeof(ACE_Message_Block))),
00093       ACE_Message_Block(size,
00094                         ACE_Message_Block::MB_DATA,
00095                         0, // cont
00096                         0, // data
00097                         use_data_alloc ? allocators[DATA] : 0,
00098                         mut_mb.locking_strategy(), // locking_strategy
00099                         ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00100                         ACE_Time_Value::zero,
00101                         ACE_Time_Value::max_time,
00102                         allocators[DB],
00103                         allocators[MB]),
00104       0);
00105     return result;
00106   } else {
00107     return new ACE_Message_Block(size,
00108                                   ACE_Message_Block::MB_DATA,
00109                                   0, // cont
00110                                   0, // data
00111                                   use_data_alloc ? allocators[DATA] : 0,
00112                                   mut_mb.locking_strategy(), // locking_strategy
00113                                   ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00114                                   ACE_Time_Value::zero,
00115                                   ACE_Time_Value::max_time,
00116                                   allocators[DB]);
00117   }
00118 }
00119 
00120 bool DataSampleHeader::partial(const ACE_Message_Block& mb)
00121 {
00122   static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
00123     LIFESPAN_LENGTH = 8,
00124     COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
00125     COHERENT_LENGTH = 16,
00126     CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
00127     BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
00128 
00129   const size_t len = mb.total_length();
00130 
00131   if (len <= FLAGS_OFFSET) return true;
00132 
00133   unsigned char msg_id;
00134   if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET,
00135                false /*swap ignored for char*/)
00136       || int(msg_id) >= MESSAGE_ID_MAX) {
00137     // This check, and the similar one below for submessage id, are actually
00138     // indicating an invalid header (and not a partial header) but we can
00139     // treat it the same as partial for the sake of the TransportRecvStrategy.
00140     return true;
00141   }
00142 
00143   if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET,
00144                false /*swap ignored for char*/)
00145       || int(msg_id) >= SUBMESSAGE_ID_MAX) {
00146     return true;
00147   }
00148 
00149   char flags;
00150   if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) {
00151     return true;
00152   }
00153 
00154   size_t expected = max_marshaled_size();
00155   if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
00156   if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
00157 
00158   if (flags & CONTENT_FILT_MASK) {
00159     CORBA::ULong seqLen;
00160     const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER;
00161     if (!mb_peek(seqLen, mb, expected, swap)) {
00162       return true;
00163     }
00164     size_t guidsize = 0, padding = 0;
00165     gen_find_size(GUID_t(), guidsize, padding);
00166     expected += sizeof(seqLen) + guidsize * seqLen;
00167   }
00168 
00169   return len < expected;
00170 }
00171 
00172 void
00173 DataSampleHeader::init(ACE_Message_Block* buffer)
00174 {
00175   this->marshaled_size_ = 0;
00176 
00177   Serializer reader(buffer);
00178 
00179   // Only byte-sized reads until we get the byte_order_ flag.
00180 
00181   if (!(reader >> this->message_id_)) {
00182     return;
00183   }
00184 
00185   this->marshaled_size_ += sizeof(this->message_id_);
00186 
00187   if (!(reader >> this->submessage_id_)) {
00188     return;
00189   }
00190 
00191   this->marshaled_size_ += sizeof(this->submessage_id_);
00192 
00193   // Extract the flag values.
00194   ACE_CDR::Octet byte;
00195   if (!(reader >> ACE_InputCDR::to_octet(byte))) {
00196     return;
00197   }
00198 
00199   this->marshaled_size_ += sizeof(byte);
00200 
00201   this->byte_order_         = byte & mask_flag(BYTE_ORDER_FLAG);
00202   this->coherent_change_    = byte & mask_flag(COHERENT_CHANGE_FLAG);
00203   this->historic_sample_    = byte & mask_flag(HISTORIC_SAMPLE_FLAG);
00204   this->lifespan_duration_  = byte & mask_flag(LIFESPAN_DURATION_FLAG);
00205   this->group_coherent_     = byte & mask_flag(GROUP_COHERENT_FLAG);
00206   this->content_filter_     = byte & mask_flag(CONTENT_FILTER_FLAG);
00207   this->sequence_repair_    = byte & mask_flag(SEQUENCE_REPAIR_FLAG);
00208   this->more_fragments_     = byte & mask_flag(MORE_FRAGMENTS_FLAG);
00209 
00210   // Set swap_bytes flag to the Serializer if data sample from
00211   // the publisher is in different byte order.
00212   reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
00213 
00214   if (!(reader >> ACE_InputCDR::to_octet(byte))) {
00215     return;
00216   }
00217 
00218   this->marshaled_size_ += sizeof(byte);
00219 
00220   this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG);
00221   this->key_fields_only_   = byte & mask_flag(KEY_ONLY_FLAG);
00222 
00223   if (!(reader >> this->message_length_)) {
00224     return;
00225   }
00226 
00227   this->marshaled_size_ += sizeof(this->message_length_);
00228 
00229   if (!(reader >> this->sequence_)) {
00230     return;
00231   }
00232 
00233   size_t padding = 0;
00234   gen_find_size(this->sequence_, this->marshaled_size_, padding);
00235 
00236   if (!(reader >> this->source_timestamp_sec_)) {
00237     return;
00238   }
00239 
00240   this->marshaled_size_ += sizeof(this->source_timestamp_sec_);
00241 
00242   if (!(reader >> this->source_timestamp_nanosec_)) {
00243     return;
00244   }
00245 
00246   this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_);
00247 
00248   if (this->lifespan_duration_) {
00249     if (!(reader >> this->lifespan_duration_sec_)) {
00250       return;
00251     }
00252 
00253     this->marshaled_size_ += sizeof(this->lifespan_duration_sec_);
00254 
00255     if (!(reader >> this->lifespan_duration_nanosec_)) {
00256       return;
00257     }
00258 
00259     this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_);
00260   }
00261 
00262   if (!(reader >> this->publication_id_)) {
00263     return;
00264   }
00265 
00266   gen_find_size(this->publication_id_, this->marshaled_size_, padding);
00267 
00268 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00269   if (this->group_coherent_) {
00270     if (!(reader >> this->publisher_id_)) {
00271       return;
00272     }
00273     gen_find_size(this->publisher_id_, this->marshaled_size_, padding);
00274   }
00275 #endif
00276 
00277   if (this->content_filter_) {
00278     if (!(reader >> this->content_filter_entries_)) {
00279       return;
00280     }
00281     gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding);
00282   }
00283 }
00284 
00285 bool
00286 operator<<(ACE_Message_Block& buffer, const DataSampleHeader& value)
00287 {
00288   Serializer writer(&buffer, value.byte_order_ != ACE_CDR_BYTE_ORDER);
00289 
00290   writer << value.message_id_;
00291   writer << value.submessage_id_;
00292 
00293   // Write the flags as a single byte.
00294   ACE_CDR::Octet flags = (value.byte_order_           << BYTE_ORDER_FLAG)
00295                          | (value.coherent_change_    << COHERENT_CHANGE_FLAG)
00296                          | (value.historic_sample_    << HISTORIC_SAMPLE_FLAG)
00297                          | (value.lifespan_duration_  << LIFESPAN_DURATION_FLAG)
00298                          | (value.group_coherent_     << GROUP_COHERENT_FLAG)
00299                          | (value.content_filter_     << CONTENT_FILTER_FLAG)
00300                          | (value.sequence_repair_    << SEQUENCE_REPAIR_FLAG)
00301                          | (value.more_fragments_     << MORE_FRAGMENTS_FLAG)
00302                          ;
00303   writer << ACE_OutputCDR::from_octet(flags);
00304 
00305   flags = (value.cdr_encapsulation_ << CDR_ENCAP_FLAG)
00306         | (value.key_fields_only_   << KEY_ONLY_FLAG)
00307         ;
00308   writer << ACE_OutputCDR::from_octet(flags);
00309 
00310   writer << value.message_length_;
00311   writer << value.sequence_;
00312   writer << value.source_timestamp_sec_;
00313   writer << value.source_timestamp_nanosec_;
00314 
00315   if (value.lifespan_duration_) {
00316     writer << value.lifespan_duration_sec_;
00317     writer << value.lifespan_duration_nanosec_;
00318   }
00319 
00320   writer << value.publication_id_;
00321 
00322 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00323   if (value.group_coherent_) {
00324     writer << value.publisher_id_;
00325   }
00326 #endif
00327 
00328   // content_filter_entries_ is deliberately not marshaled here.
00329   // It's variable sized, so it won't fit into our pre-allocated data block.
00330   // It may be customized per-datalink so it will be handled later with a
00331   // a chained (continuation) ACE_Message_Block.
00332 
00333   return writer.good_bit();
00334 }
00335 
00336 void
00337 DataSampleHeader::add_cfentries(const GUIDSeq* guids, ACE_Message_Block* mb)
00338 {
00339   size_t size = 0;
00340   if (guids) {
00341     size_t padding = 0; // GUIDs are always aligned
00342     gen_find_size(*guids, size, padding);
00343   } else {
00344     size = sizeof(CORBA::ULong);
00345   }
00346   ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
00347 
00348   const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb));
00349   Serializer ser(optHdr, swap);
00350   if (guids) {
00351     ser << *guids;
00352   } else {
00353     ser << CORBA::ULong(0);
00354   }
00355 
00356   // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
00357   optHdr->cont(mb->cont());
00358   mb->cont(optHdr);
00359 }
00360 
00361 void
00362 DataSampleHeader::split_payload(const ACE_Message_Block& orig, size_t size,
00363                                 Message_Block_Ptr& head,
00364                                 Message_Block_Ptr& tail)
00365 {
00366   if (!head) {
00367     head.reset(orig.duplicate());
00368   }
00369 
00370   ACE_Message_Block* frag = head.get();
00371   size_t frag_remain = size;
00372   for (; frag_remain > frag->length(); frag = frag->cont()) {
00373     frag_remain -= frag->length();
00374   }
00375 
00376   if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
00377     tail.reset(frag->cont());
00378   } else {
00379     tail.reset(frag->duplicate());
00380     frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
00381     ACE_Message_Block::release(frag->cont());
00382     tail->rd_ptr(frag_remain);
00383   }
00384   frag->cont(0);
00385 }
00386 
00387 void
00388 DataSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00389                         Message_Block_Ptr& head, Message_Block_Ptr& tail)
00390 {
00391   Message_Block_Ptr dup (orig.duplicate());
00392 
00393   const size_t length = dup->total_length();
00394   DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
00395   const size_t hdr_len = length - dup->total_length();
00396 
00397   ACE_Message_Block* payload = dup.get();
00398   //skip zero length message blocks
00399   ACE_Message_Block* prev = 0;
00400   for (; payload->length() == 0; payload = payload->cont()) {
00401     prev = payload;
00402   }
00403   prev->cont(0);
00404   Message_Block_Ptr payload_head(payload);
00405 
00406   if (size < hdr_len) { // need to fragment the content_filter_entries_
00407     head.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00408     hdr.more_fragments_ = true;
00409     hdr.message_length_ = 0; // no room for payload data
00410     *head << hdr;
00411     const size_t avail = size - head->length() - 4 /* sequence length */;
00412     const CORBA::ULong n_entries =
00413       static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t()));
00414     GUIDSeq entries(n_entries);
00415     entries.length(n_entries);
00416     // remove from the end of hdr's entries (order doesn't matter)
00417     for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
00418          i < n_entries; ++i) {
00419       entries[i] = hdr.content_filter_entries_[--x];
00420       hdr.content_filter_entries_.length(x);
00421     }
00422     add_cfentries(&entries, head.get());
00423 
00424     tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00425     hdr.more_fragments_ = false;
00426     hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
00427     hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00428     *tail << hdr;
00429     tail->cont(payload);
00430     if (hdr.content_filter_) {
00431       add_cfentries(&hdr.content_filter_entries_, tail.get());
00432     }
00433     return;
00434   }
00435 
00436   Message_Block_Ptr payload_tail;
00437   split_payload(*payload, size - hdr_len, payload_head, payload_tail);
00438 
00439   hdr.more_fragments_ = true;
00440   hdr.message_length_ = static_cast<ACE_UINT32>(payload_head->total_length());
00441 
00442   head.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00443   *head << hdr;
00444   head->cont(payload_head.release());
00445   if (hdr.content_filter_) {
00446     add_cfentries(&hdr.content_filter_entries_, head.get());
00447   }
00448 
00449   hdr.more_fragments_ = false;
00450   hdr.content_filter_ = false;
00451   hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
00452 
00453   tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00454   *tail << hdr;
00455   tail->cont(payload_tail.release());
00456 }
00457 
00458 bool
00459 DataSampleHeader::join(const DataSampleHeader& first,
00460                        const DataSampleHeader& second, DataSampleHeader& result)
00461 {
00462   if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
00463     return false;
00464   }
00465   result = second;
00466   result.message_length_ += first.message_length_;
00467   if (first.content_filter_) {
00468     result.content_filter_ = true;
00469     const CORBA::ULong entries = first.content_filter_entries_.length();
00470     CORBA::ULong x = result.content_filter_entries_.length();
00471     result.content_filter_entries_.length(x + entries);
00472     for (CORBA::ULong i(entries); i > 0;) {
00473       result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
00474     }
00475   }
00476   return true;
00477 }
00478 
00479 const char *
00480 to_string(const MessageId value)
00481 {
00482   switch (value) {
00483   case SAMPLE_DATA:
00484     return "SAMPLE_DATA";
00485   case DATAWRITER_LIVELINESS:
00486     return "DATAWRITER_LIVELINESS";
00487   case INSTANCE_REGISTRATION:
00488     return "INSTANCE_REGISTRATION";
00489   case UNREGISTER_INSTANCE:
00490     return "UNREGISTER_INSTANCE";
00491   case DISPOSE_INSTANCE:
00492     return "DISPOSE_INSTANCE";
00493   case GRACEFUL_DISCONNECT:
00494     return "GRACEFUL_DISCONNECT";
00495   case REQUEST_ACK:
00496     return "REQUEST_ACK";
00497   case SAMPLE_ACK:
00498     return "SAMPLE_ACK";
00499   case END_COHERENT_CHANGES:
00500     return "END_COHERENT_CHANGES";
00501   case TRANSPORT_CONTROL:
00502     return "TRANSPORT_CONTROL";
00503   case DISPOSE_UNREGISTER_INSTANCE:
00504     return "DISPOSE_UNREGISTER_INSTANCE";
00505   case END_HISTORIC_SAMPLES:
00506     return "END_HISTORIC_SAMPLES";
00507   default:
00508     return "Unknown";
00509   }
00510 }
00511 
00512 const char *
00513 to_string(const SubMessageId value)
00514 {
00515   switch (value) {
00516   case SUBMESSAGE_NONE:
00517     return "SUBMESSAGE_NONE";
00518   case MULTICAST_SYN:
00519     return "MULTICAST_SYN";
00520   case MULTICAST_SYNACK:
00521     return "MULTICAST_SYNACK";
00522   case MULTICAST_NAK:
00523     return "MULTICAST_NAK";
00524   case MULTICAST_NAKACK:
00525     return "MULTICAST_NAKACK";
00526   default:
00527     return "Unknown";
00528   }
00529 }
00530 
00531 OPENDDS_STRING to_string(const DataSampleHeader& value)
00532 {
00533   OPENDDS_STRING ret;
00534   if (value.submessage_id_ != SUBMESSAGE_NONE) {
00535     ret += to_string(SubMessageId(value.submessage_id_));
00536     ret += " 0x";
00537     ret += to_dds_string(unsigned(value.submessage_id_), true);
00538     ret += "), ";
00539   } else {
00540     ret += to_string(MessageId(value.message_id_));
00541     ret += " (0x";
00542     ret += to_dds_string(unsigned(value.message_id_), true);
00543     ret += "), ";
00544   }
00545 
00546   ret += "Length: ";
00547   ret += to_dds_string(value.message_length_);
00548   ret += ", ";
00549 
00550   ret += "Byte order: ";
00551   ret += (value.byte_order_ == 1 ? "Little" : "Big");
00552   ret += " Endian";
00553 
00554   if (value.message_id_ != TRANSPORT_CONTROL) {
00555     ret += ", ";
00556 
00557     if (value.coherent_change_ == 1) ret += "Coherent, ";
00558     if (value.historic_sample_ == 1) ret += "Historic, ";
00559     if (value.lifespan_duration_ == 1) ret += "Lifespan, ";
00560 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00561     if (value.group_coherent_ == 1) ret += "Group-Coherent, ";
00562 #endif
00563     if (value.content_filter_ == 1) ret += "Content-Filtered, ";
00564     if (value.sequence_repair_ == 1) ret += "Sequence Repair, ";
00565     if (value.more_fragments_ == 1) ret += "More Fragments, ";
00566     if (value.cdr_encapsulation_ == 1) ret += "CDR Encapsulation, ";
00567     if (value.key_fields_only_ == 1) ret += "Key Fields Only, ";
00568 
00569     ret += "Sequence: 0x";
00570     ret += to_dds_string(unsigned(value.sequence_.getValue()), true);
00571     ret += ", ";
00572 
00573     ret += "Timestamp: ";
00574     ret += to_dds_string(value.source_timestamp_sec_);
00575     ret += ".";
00576     ret += to_dds_string(value.source_timestamp_nanosec_);
00577     ret += ", ";
00578 
00579     if (value.lifespan_duration_) {
00580       ret += "Lifespan: ";
00581       ret += to_dds_string(value.lifespan_duration_sec_);
00582       ret += ".";
00583       ret += to_dds_string(value.lifespan_duration_nanosec_);
00584       ret += ", ";
00585     }
00586 
00587     ret += "Publication: " + OPENDDS_STRING(GuidConverter(value.publication_id_));
00588 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00589     if (value.group_coherent_) {
00590       ret += ", Publisher: " + OPENDDS_STRING(GuidConverter(value.publisher_id_));
00591     }
00592 #endif
00593 
00594     if (value.content_filter_) {
00595       const CORBA::ULong len = value.content_filter_entries_.length();
00596       ret += ", Content-Filter Entries (";
00597       ret += to_dds_string(len);
00598       ret += "): [";
00599       for (CORBA::ULong i(0); i < len; ++i) {
00600         ret += OPENDDS_STRING(GuidConverter(value.content_filter_entries_[i])) + ' ';
00601       }
00602       ret += ']';
00603     }
00604   }
00605   return ret;
00606 }
00607 
00608 #ifndef OPENDDS_SAFETY_PROFILE
00609 /// Message Id enumeration insertion onto an ostream.
00610 std::ostream& operator<<(std::ostream& str, const MessageId value)
00611 {
00612   switch (value) {
00613   case SAMPLE_DATA:
00614     return str << "SAMPLE_DATA";
00615   case DATAWRITER_LIVELINESS:
00616     return str << "DATAWRITER_LIVELINESS";
00617   case INSTANCE_REGISTRATION:
00618     return str << "INSTANCE_REGISTRATION";
00619   case UNREGISTER_INSTANCE:
00620     return str << "UNREGISTER_INSTANCE";
00621   case DISPOSE_INSTANCE:
00622     return str << "DISPOSE_INSTANCE";
00623   case GRACEFUL_DISCONNECT:
00624     return str << "GRACEFUL_DISCONNECT";
00625   case REQUEST_ACK:
00626     return str << "REQUEST_ACK";
00627   case SAMPLE_ACK:
00628     return str << "SAMPLE_ACK";
00629   case END_COHERENT_CHANGES:
00630     return str << "END_COHERENT_CHANGES";
00631   case TRANSPORT_CONTROL:
00632     return str << "TRANSPORT_CONTROL";
00633   case DISPOSE_UNREGISTER_INSTANCE:
00634     return str << "DISPOSE_UNREGISTER_INSTANCE";
00635   case END_HISTORIC_SAMPLES:
00636     return str << "END_HISTORIC_SAMPLES";
00637   default:
00638     return str << "Unknown";
00639   }
00640 }
00641 
00642 /// Sub-Message Id enumeration insertion onto an ostream.
00643 std::ostream& operator<<(std::ostream& os, const SubMessageId rhs)
00644 {
00645   switch (rhs) {
00646   case SUBMESSAGE_NONE:
00647     return os << "SUBMESSAGE_NONE";
00648   case MULTICAST_SYN:
00649     return os << "MULTICAST_SYN";
00650   case MULTICAST_SYNACK:
00651     return os << "MULTICAST_SYNACK";
00652   case MULTICAST_NAK:
00653     return os << "MULTICAST_NAK";
00654   case MULTICAST_NAKACK:
00655     return os << "MULTICAST_NAKACK";
00656   default:
00657     return os << "Unknown";
00658   }
00659 }
00660 
00661 /// Message header insertion onto an ostream.
00662 extern OpenDDS_Dcps_Export
00663 std::ostream& operator<<(std::ostream& str, const DataSampleHeader& value)
00664 {
00665   struct SaveAndRestoreStreamState {
00666     explicit SaveAndRestoreStreamState(std::ostream& s)
00667       : fill_(s.fill()), fmt_(s.flags()), s_(s) {}
00668     ~SaveAndRestoreStreamState()
00669     {
00670       s_.fill(fill_);
00671       s_.flags(fmt_);
00672     }
00673     char fill_;
00674     std::ios_base::fmtflags fmt_;
00675     std::ostream& s_;
00676   } stream_state(str);
00677 
00678   if (value.submessage_id_ != SUBMESSAGE_NONE) {
00679     str << SubMessageId(value.submessage_id_)
00680       << " (0x" << std::hex << std::setw(2) << std::setfill('0')
00681       << unsigned(value.submessage_id_) << "), ";
00682 
00683   } else {
00684     str << MessageId(value.message_id_)
00685         << " (0x" << std::hex << std::setw(2) << std::setfill('0')
00686         << unsigned(value.message_id_) << "), ";
00687   }
00688 
00689   str << "Length: " << std::dec << value.message_length_ << ", ";
00690 
00691   str << "Byte order: " << (value.byte_order_ == 1 ? "Little" : "Big")
00692       << " Endian";
00693 
00694   if (value.message_id_ != TRANSPORT_CONTROL) {
00695     str << ", ";
00696 
00697     if (value.coherent_change_ == 1) str << "Coherent, ";
00698     if (value.historic_sample_ == 1) str << "Historic, ";
00699     if (value.lifespan_duration_ == 1) str << "Lifespan, ";
00700 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00701     if (value.group_coherent_ == 1) str << "Group-Coherent, ";
00702 #endif
00703     if (value.content_filter_ == 1) str << "Content-Filtered, ";
00704     if (value.sequence_repair_ == 1) str << "Sequence Repair, ";
00705     if (value.more_fragments_ == 1) str << "More Fragments, ";
00706     if (value.cdr_encapsulation_ == 1) str << "CDR Encapsulation, ";
00707     if (value.key_fields_only_ == 1) str << "Key Fields Only, ";
00708 
00709     str << "Sequence: 0x" << std::hex << std::setw(4) << std::setfill('0')
00710         << value.sequence_.getValue() << ", ";
00711 
00712     str << "Timestamp: " << std::dec << value.source_timestamp_sec_ << "."
00713         << std::dec << value.source_timestamp_nanosec_ << ", ";
00714 
00715     if (value.lifespan_duration_) {
00716       str << "Lifespan: " << std::dec << value.lifespan_duration_sec_ << "."
00717           << std::dec << value.lifespan_duration_nanosec_ << ", ";
00718     }
00719 
00720     str << "Publication: " << GuidConverter(value.publication_id_);
00721 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00722     if (value.group_coherent_) {
00723       str << ", Publisher: " << GuidConverter(value.publisher_id_);
00724     }
00725 #endif
00726 
00727     if (value.content_filter_) {
00728       const CORBA::ULong len = value.content_filter_entries_.length();
00729       str << ", Content-Filter Entries (" << len << "): [";
00730       for (CORBA::ULong i(0); i < len; ++i) {
00731         str << GuidConverter(value.content_filter_entries_[i]) << ' ';
00732       }
00733       str << ']';
00734     }
00735   }
00736 
00737   return str;
00738 }
00739 #endif //OPENDDS_SAFETY_PROFILE
00740 
00741 
00742 bool
00743 DataSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00744 {
00745   rds.header_ = *this;
00746   return true;
00747 }
00748 
00749 } // namespace DCPS
00750 } // namespace OpenDDS
00751 
00752 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1