DataSampleHeader.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataSampleHeader.h"
00010 #include "Serializer.h"
00011 #include "GuidConverter.h"
00012 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00013 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00014 #include "dds/DCPS/SafetyProfileStreams.h"
00015 #include <cstdio>
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "DataSampleHeader.inl"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 namespace {
00022   struct AMB_Releaser {
00023     explicit AMB_Releaser(ACE_Message_Block* p) : p_(p) {}
00024     ~AMB_Releaser() { p_->release(); }
00025     ACE_Message_Block* p_;
00026   };
00027 
00028   bool mb_copy(char& dest, const ACE_Message_Block& mb, size_t offset, bool)
00029   {
00030     dest = mb.rd_ptr()[offset];
00031     return true;
00032   }
00033 
00034   template <typename T>
00035   bool mb_copy(T& dest, const ACE_Message_Block& mb, size_t offset, bool swap)
00036   {
00037     ACE_Message_Block* temp = mb.duplicate();
00038     if (!temp) { // couldn't allocate
00039       return false;
00040     }
00041     AMB_Releaser r(temp);
00042     temp->rd_ptr(offset);
00043     if (temp->total_length() < sizeof(T)) {
00044       return false;
00045     }
00046     OpenDDS::DCPS::Serializer ser(temp, swap);
00047     ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), swap);
00048     return true;
00049   }
00050 
00051   // Skip "offset" bytes from the mb and copy the subsequent data
00052   // (sizeof(T) bytes) into dest.  Return false if there is not enough data
00053   // in the mb to complete the operation.  Continuation pointers are followed.
00054   template <typename T>
00055   bool mb_peek(T& dest, const ACE_Message_Block& mb, size_t offset, bool swap)
00056   {
00057     for (const ACE_Message_Block* iter = &mb; iter; iter = iter->cont()) {
00058       const size_t len = iter->length();
00059       if (len > offset) {
00060         return mb_copy(dest, *iter, offset, swap);
00061       }
00062       offset -= len;
00063     }
00064     return false;
00065   }
00066 }
00067 
00068 namespace OpenDDS {
00069 namespace DCPS {
00070 
00071 // Allocate a new message block using the allocators from an existing
00072 // message block, "mb".  Use of mb's data_allocator_ is optional.
00073 ACE_Message_Block*
00074 DataSampleHeader::alloc_msgblock(const ACE_Message_Block& mb,
00075                                  size_t size, bool use_data_alloc)
00076 {
00077   enum { DATA, DB, MB, N_ALLOC };
00078   ACE_Allocator* allocators[N_ALLOC];
00079   // It's an ACE bug that access_allocators isn't const
00080   ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
00081   mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
00082   if (allocators[MB]) {
00083     ACE_Message_Block* result;
00084     ACE_NEW_MALLOC_RETURN(result,
00085       static_cast<ACE_Message_Block*>(
00086         allocators[MB]->malloc(sizeof(ACE_Message_Block))),
00087       ACE_Message_Block(size,
00088                         ACE_Message_Block::MB_DATA,
00089                         0, // cont
00090                         0, // data
00091                         use_data_alloc ? allocators[DATA] : 0,
00092                         mut_mb.locking_strategy(), // locking_strategy
00093                         ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00094                         ACE_Time_Value::zero,
00095                         ACE_Time_Value::max_time,
00096                         allocators[DB],
00097                         allocators[MB]),
00098       0);
00099     return result;
00100   } else {
00101     return new ACE_Message_Block(size,
00102                                   ACE_Message_Block::MB_DATA,
00103                                   0, // cont
00104                                   0, // data
00105                                   use_data_alloc ? allocators[DATA] : 0,
00106                                   mut_mb.locking_strategy(), // locking_strategy
00107                                   ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00108                                   ACE_Time_Value::zero,
00109                                   ACE_Time_Value::max_time,
00110                                   allocators[DB]);
00111   }
00112 }
00113 
00114 bool DataSampleHeader::partial(const ACE_Message_Block& mb)
00115 {
00116   static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
00117     LIFESPAN_LENGTH = 8,
00118     COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
00119     COHERENT_LENGTH = 16,
00120     CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
00121     BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
00122 
00123   const size_t len = mb.total_length();
00124 
00125   if (len <= FLAGS_OFFSET) return true;
00126 
00127   unsigned char msg_id;
00128   if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET,
00129                false /*swap ignored for char*/)
00130       || int(msg_id) >= MESSAGE_ID_MAX) {
00131     // This check, and the similar one below for submessage id, are actually
00132     // indicating an invalid header (and not a partial header) but we can
00133     // treat it the same as partial for the sake of the TransportRecvStrategy.
00134     return true;
00135   }
00136 
00137   if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET,
00138                false /*swap ignored for char*/)
00139       || int(msg_id) >= SUBMESSAGE_ID_MAX) {
00140     return true;
00141   }
00142 
00143   char flags;
00144   if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) {
00145     return true;
00146   }
00147 
00148   size_t expected = max_marshaled_size();
00149   if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
00150   if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
00151 
00152   if (flags & CONTENT_FILT_MASK) {
00153     CORBA::ULong seqLen;
00154     const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER;
00155     if (!mb_peek(seqLen, mb, expected, swap)) {
00156       return true;
00157     }
00158     size_t guidsize = 0, padding = 0;
00159     gen_find_size(GUID_t(), guidsize, padding);
00160     expected += sizeof(seqLen) + guidsize * seqLen;
00161   }
00162 
00163   return len < expected;
00164 }
00165 
00166 void
00167 DataSampleHeader::init(ACE_Message_Block* buffer)
00168 {
00169   this->marshaled_size_ = 0;
00170 
00171   Serializer reader(buffer);
00172 
00173   // Only byte-sized reads until we get the byte_order_ flag.
00174 
00175   reader >> this->message_id_;
00176 
00177   if (!reader.good_bit()) return;
00178   this->marshaled_size_ += sizeof(this->message_id_);
00179 
00180   reader >> this->submessage_id_;
00181 
00182   if (!reader.good_bit()) return;
00183   this->marshaled_size_ += sizeof(this->submessage_id_);
00184 
00185   // Extract the flag values.
00186   ACE_CDR::Octet byte;
00187   reader >> ACE_InputCDR::to_octet(byte);
00188 
00189   if (!reader.good_bit()) return;
00190   this->marshaled_size_ += sizeof(byte);
00191 
00192   this->byte_order_         = byte & mask_flag(BYTE_ORDER_FLAG);
00193   this->coherent_change_    = byte & mask_flag(COHERENT_CHANGE_FLAG);
00194   this->historic_sample_    = byte & mask_flag(HISTORIC_SAMPLE_FLAG);
00195   this->lifespan_duration_  = byte & mask_flag(LIFESPAN_DURATION_FLAG);
00196   this->group_coherent_     = byte & mask_flag(GROUP_COHERENT_FLAG);
00197   this->content_filter_     = byte & mask_flag(CONTENT_FILTER_FLAG);
00198   this->sequence_repair_    = byte & mask_flag(SEQUENCE_REPAIR_FLAG);
00199   this->more_fragments_     = byte & mask_flag(MORE_FRAGMENTS_FLAG);
00200 
00201   // Set swap_bytes flag to the Serializer if data sample from
00202   // the publisher is in different byte order.
00203   reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
00204 
00205   reader >> ACE_InputCDR::to_octet(byte);
00206 
00207   if (!reader.good_bit()) return;
00208   this->marshaled_size_ += sizeof(byte);
00209 
00210   this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG);
00211   this->key_fields_only_   = byte & mask_flag(KEY_ONLY_FLAG);
00212 
00213   reader >> this->message_length_;
00214 
00215   if (!reader.good_bit()) return;
00216   this->marshaled_size_ += sizeof(this->message_length_);
00217 
00218   reader >> this->sequence_;
00219 
00220   if (!reader.good_bit()) return;
00221   size_t padding = 0;
00222   gen_find_size(this->sequence_, this->marshaled_size_, padding);
00223 
00224   reader >> this->source_timestamp_sec_;
00225 
00226   if (!reader.good_bit()) return;
00227   this->marshaled_size_ += sizeof(this->source_timestamp_sec_);
00228 
00229   reader >> this->source_timestamp_nanosec_;
00230 
00231   if (!reader.good_bit()) return;
00232   this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_);
00233 
00234   if (this->lifespan_duration_) {
00235     reader >> this->lifespan_duration_sec_;
00236 
00237     if (!reader.good_bit()) return;
00238     this->marshaled_size_ += sizeof(this->lifespan_duration_sec_);
00239 
00240     reader >> this->lifespan_duration_nanosec_;
00241 
00242     if (!reader.good_bit()) return;
00243     this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_);
00244   }
00245 
00246   reader >> this->publication_id_;
00247 
00248   if (!reader.good_bit()) return;
00249   gen_find_size(this->publication_id_, this->marshaled_size_, padding);
00250 
00251 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00252   if (this->group_coherent_) {
00253     reader >> this->publisher_id_;
00254     if (!reader.good_bit()) return;
00255     gen_find_size(this->publisher_id_, this->marshaled_size_, padding);
00256   }
00257 #endif
00258 
00259   if (this->content_filter_) {
00260     reader >> this->content_filter_entries_;
00261     if (!reader.good_bit()) return;
00262     gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding);
00263   }
00264 }
00265 
00266 bool
00267 operator<<(ACE_Message_Block& buffer, const DataSampleHeader& value)
00268 {
00269   Serializer writer(&buffer, value.byte_order_ != ACE_CDR_BYTE_ORDER);
00270 
00271   writer << value.message_id_;
00272   writer << value.submessage_id_;
00273 
00274   // Write the flags as a single byte.
00275   ACE_CDR::Octet flags = (value.byte_order_           << BYTE_ORDER_FLAG)
00276                          | (value.coherent_change_    << COHERENT_CHANGE_FLAG)
00277                          | (value.historic_sample_    << HISTORIC_SAMPLE_FLAG)
00278                          | (value.lifespan_duration_  << LIFESPAN_DURATION_FLAG)
00279                          | (value.group_coherent_     << GROUP_COHERENT_FLAG)
00280                          | (value.content_filter_     << CONTENT_FILTER_FLAG)
00281                          | (value.sequence_repair_    << SEQUENCE_REPAIR_FLAG)
00282                          | (value.more_fragments_     << MORE_FRAGMENTS_FLAG)
00283                          ;
00284   writer << ACE_OutputCDR::from_octet(flags);
00285 
00286   flags = (value.cdr_encapsulation_ << CDR_ENCAP_FLAG)
00287         | (value.key_fields_only_   << KEY_ONLY_FLAG)
00288         ;
00289   writer << ACE_OutputCDR::from_octet(flags);
00290 
00291   writer << value.message_length_;
00292   writer << value.sequence_;
00293   writer << value.source_timestamp_sec_;
00294   writer << value.source_timestamp_nanosec_;
00295 
00296   if (value.lifespan_duration_) {
00297     writer << value.lifespan_duration_sec_;
00298     writer << value.lifespan_duration_nanosec_;
00299   }
00300 
00301   writer << value.publication_id_;
00302 
00303 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00304   if (value.group_coherent_) {
00305     writer << value.publisher_id_;
00306   }
00307 #endif
00308 
00309   // content_filter_entries_ is deliberately not marshaled here.
00310   // It's variable sized, so it won't fit into our pre-allocated data block.
00311   // It may be customized per-datalink so it will be handled later with a
00312   // a chained (continuation) ACE_Message_Block.
00313 
00314   return writer.good_bit();
00315 }
00316 
00317 void
00318 DataSampleHeader::add_cfentries(const GUIDSeq* guids, ACE_Message_Block* mb)
00319 {
00320   size_t size = 0;
00321   if (guids) {
00322     size_t padding = 0; // GUIDs are always aligned
00323     gen_find_size(*guids, size, padding);
00324   } else {
00325     size = sizeof(CORBA::ULong);
00326   }
00327   ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
00328 
00329   const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb));
00330   Serializer ser(optHdr, swap);
00331   if (guids) {
00332     ser << *guids;
00333   } else {
00334     ser << CORBA::ULong(0);
00335   }
00336 
00337   // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
00338   optHdr->cont(mb->cont());
00339   mb->cont(optHdr);
00340 }
00341 
00342 void
00343 DataSampleHeader::split_payload(const ACE_Message_Block& orig, size_t size,
00344                                 ACE_Message_Block*& head,
00345                                 ACE_Message_Block*& tail)
00346 {
00347   if (!head) {
00348     head = orig.duplicate();
00349   }
00350 
00351   ACE_Message_Block* frag = head;
00352   size_t frag_remain = size;
00353   for (; frag_remain > frag->length(); frag = frag->cont()) {
00354     frag_remain -= frag->length();
00355   }
00356 
00357   if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
00358     tail = frag->cont();
00359   } else {
00360     tail = frag->duplicate();
00361     frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
00362     ACE_Message_Block::release(frag->cont());
00363     tail->rd_ptr(frag_remain);
00364   }
00365   frag->cont(0);
00366 }
00367 
00368 void
00369 DataSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00370                         ACE_Message_Block*& head, ACE_Message_Block*& tail)
00371 {
00372   ACE_Message_Block* dup = orig.duplicate();
00373   AMB_Releaser rel(dup);
00374 
00375   const size_t length = dup->total_length();
00376   DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
00377   const size_t hdr_len = length - dup->total_length();
00378 
00379   ACE_Message_Block* payload = dup;
00380   ACE_Message_Block* prev = 0;
00381   for (; payload->length() == 0; payload = payload->cont()) {
00382     prev = payload;
00383   }
00384   prev->cont(0);
00385 
00386   if (size < hdr_len) { // need to fragment the content_filter_entries_
00387     head = alloc_msgblock(*dup, max_marshaled_size(), true);
00388     hdr.more_fragments_ = true;
00389     hdr.message_length_ = 0; // no room for payload data
00390     *head << hdr;
00391     const size_t avail = size - head->length() - 4 /* sequence length */;
00392     const CORBA::ULong n_entries =
00393       static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t()));
00394     GUIDSeq entries(n_entries);
00395     entries.length(n_entries);
00396     // remove from the end of hdr's entries (order doesn't matter)
00397     for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
00398          i < n_entries; ++i) {
00399       entries[i] = hdr.content_filter_entries_[--x];
00400       hdr.content_filter_entries_.length(x);
00401     }
00402     add_cfentries(&entries, head);
00403 
00404     tail = alloc_msgblock(*dup, max_marshaled_size(), true);
00405     hdr.more_fragments_ = false;
00406     hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
00407     hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00408     *tail << hdr;
00409     tail->cont(payload);
00410     if (hdr.content_filter_) {
00411       add_cfentries(&hdr.content_filter_entries_, tail);
00412     }
00413     return;
00414   }
00415 
00416   ACE_Message_Block* payload_tail;
00417   split_payload(*payload, size - hdr_len, payload, payload_tail);
00418 
00419   hdr.more_fragments_ = true;
00420   hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00421 
00422   head = alloc_msgblock(*dup, max_marshaled_size(), true);
00423   *head << hdr;
00424   head->cont(payload);
00425   if (hdr.content_filter_) {
00426     add_cfentries(&hdr.content_filter_entries_, head);
00427   }
00428 
00429   hdr.more_fragments_ = false;
00430   hdr.content_filter_ = false;
00431   hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
00432 
00433   tail = alloc_msgblock(*dup, max_marshaled_size(), true);
00434   *tail << hdr;
00435   tail->cont(payload_tail);
00436 }
00437 
00438 bool
00439 DataSampleHeader::join(const DataSampleHeader& first,
00440                        const DataSampleHeader& second, DataSampleHeader& result)
00441 {
00442   if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
00443     return false;
00444   }
00445   result = second;
00446   result.message_length_ += first.message_length_;
00447   if (first.content_filter_) {
00448     result.content_filter_ = true;
00449     const CORBA::ULong entries = first.content_filter_entries_.length();
00450     CORBA::ULong x = result.content_filter_entries_.length();
00451     result.content_filter_entries_.length(x + entries);
00452     for (CORBA::ULong i(entries); i > 0;) {
00453       result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
00454     }
00455   }
00456   return true;
00457 }
00458 
00459 const char *
00460 to_string(const MessageId value)
00461 {
00462   switch (value) {
00463   case SAMPLE_DATA:
00464     return "SAMPLE_DATA";
00465   case DATAWRITER_LIVELINESS:
00466     return "DATAWRITER_LIVELINESS";
00467   case INSTANCE_REGISTRATION:
00468     return "INSTANCE_REGISTRATION";
00469   case UNREGISTER_INSTANCE:
00470     return "UNREGISTER_INSTANCE";
00471   case DISPOSE_INSTANCE:
00472     return "DISPOSE_INSTANCE";
00473   case GRACEFUL_DISCONNECT:
00474     return "GRACEFUL_DISCONNECT";
00475   case REQUEST_ACK:
00476     return "REQUEST_ACK";
00477   case SAMPLE_ACK:
00478     return "SAMPLE_ACK";
00479   case END_COHERENT_CHANGES:
00480     return "END_COHERENT_CHANGES";
00481   case TRANSPORT_CONTROL:
00482     return "TRANSPORT_CONTROL";
00483   case DISPOSE_UNREGISTER_INSTANCE:
00484     return "DISPOSE_UNREGISTER_INSTANCE";
00485   case END_HISTORIC_SAMPLES:
00486     return "END_HISTORIC_SAMPLES";
00487   default:
00488     return "Unknown";
00489   }
00490 }
00491 
00492 const char *
00493 to_string(const SubMessageId value)
00494 {
00495   switch (value) {
00496   case SUBMESSAGE_NONE:
00497     return "SUBMESSAGE_NONE";
00498   case MULTICAST_SYN:
00499     return "MULTICAST_SYN";
00500   case MULTICAST_SYNACK:
00501     return "MULTICAST_SYNACK";
00502   case MULTICAST_NAK:
00503     return "MULTICAST_NAK";
00504   case MULTICAST_NAKACK:
00505     return "MULTICAST_NAKACK";
00506   default:
00507     return "Unknown";
00508   }
00509 }
00510 
00511 OPENDDS_STRING to_string(const DataSampleHeader& value)
00512 {
00513   OPENDDS_STRING ret;
00514   if (value.submessage_id_ != SUBMESSAGE_NONE) {
00515     ret += to_string(SubMessageId(value.submessage_id_));
00516     ret += " 0x";
00517     ret += to_dds_string(unsigned(value.submessage_id_), true);
00518     ret += "), ";
00519   } else {
00520     ret += to_string(MessageId(value.message_id_));
00521     ret += " (0x";
00522     ret += to_dds_string(unsigned(value.message_id_), true);
00523     ret += "), ";
00524   }
00525 
00526   ret += "Length: ";
00527   ret += to_dds_string(value.message_length_);
00528   ret += ", ";
00529 
00530   ret += "Byte order: ";
00531   ret += (value.byte_order_ == 1 ? "Little" : "Big");
00532   ret += " Endian";
00533 
00534   if (value.message_id_ != TRANSPORT_CONTROL) {
00535     ret += ", ";
00536 
00537     if (value.coherent_change_ == 1) ret += "Coherent, ";
00538     if (value.historic_sample_ == 1) ret += "Historic, ";
00539     if (value.lifespan_duration_ == 1) ret += "Lifespan, ";
00540 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00541     if (value.group_coherent_ == 1) ret += "Group-Coherent, ";
00542 #endif
00543     if (value.content_filter_ == 1) ret += "Content-Filtered, ";
00544     if (value.sequence_repair_ == 1) ret += "Sequence Repair, ";
00545     if (value.more_fragments_ == 1) ret += "More Fragments, ";
00546     if (value.cdr_encapsulation_ == 1) ret += "CDR Encapsulation, ";
00547     if (value.key_fields_only_ == 1) ret += "Key Fields Only, ";
00548 
00549     ret += "Sequence: 0x";
00550     ret += to_dds_string(unsigned(value.sequence_.getValue()), true);
00551     ret += ", ";
00552 
00553     ret += "Timestamp: ";
00554     ret += to_dds_string(value.source_timestamp_sec_);
00555     ret += ".";
00556     ret += to_dds_string(value.source_timestamp_nanosec_);
00557     ret += ", ";
00558 
00559     if (value.lifespan_duration_) {
00560       ret += "Lifespan: ";
00561       ret += to_dds_string(value.lifespan_duration_sec_);
00562       ret += ".";
00563       ret += to_dds_string(value.lifespan_duration_nanosec_);
00564       ret += ", ";
00565     }
00566 
00567     ret += "Publication: " + OPENDDS_STRING(GuidConverter(value.publication_id_));
00568 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00569     if (value.group_coherent_) {
00570       ret += ", Publisher: " + OPENDDS_STRING(GuidConverter(value.publisher_id_));
00571     }
00572 #endif
00573 
00574     if (value.content_filter_) {
00575       const CORBA::ULong len = value.content_filter_entries_.length();
00576       ret += ", Content-Filter Entries (";
00577       ret += to_dds_string(len);
00578       ret += "): [";
00579       for (CORBA::ULong i(0); i < len; ++i) {
00580         ret += OPENDDS_STRING(GuidConverter(value.content_filter_entries_[i])) + ' ';
00581       }
00582       ret += ']';
00583     }
00584   }
00585   return ret;
00586 }
00587 
00588 #ifndef OPENDDS_SAFETY_PROFILE
00589 /// Message Id enumeration insertion onto an ostream.
00590 std::ostream& operator<<(std::ostream& str, const MessageId value)
00591 {
00592   switch (value) {
00593   case SAMPLE_DATA:
00594     return str << "SAMPLE_DATA";
00595   case DATAWRITER_LIVELINESS:
00596     return str << "DATAWRITER_LIVELINESS";
00597   case INSTANCE_REGISTRATION:
00598     return str << "INSTANCE_REGISTRATION";
00599   case UNREGISTER_INSTANCE:
00600     return str << "UNREGISTER_INSTANCE";
00601   case DISPOSE_INSTANCE:
00602     return str << "DISPOSE_INSTANCE";
00603   case GRACEFUL_DISCONNECT:
00604     return str << "GRACEFUL_DISCONNECT";
00605   case REQUEST_ACK:
00606     return str << "REQUEST_ACK";
00607   case SAMPLE_ACK:
00608     return str << "SAMPLE_ACK";
00609   case END_COHERENT_CHANGES:
00610     return str << "END_COHERENT_CHANGES";
00611   case TRANSPORT_CONTROL:
00612     return str << "TRANSPORT_CONTROL";
00613   case DISPOSE_UNREGISTER_INSTANCE:
00614     return str << "DISPOSE_UNREGISTER_INSTANCE";
00615   case END_HISTORIC_SAMPLES:
00616     return str << "END_HISTORIC_SAMPLES";
00617   default:
00618     return str << "Unknown";
00619   }
00620 }
00621 
00622 /// Sub-Message Id enumeration insertion onto an ostream.
00623 std::ostream& operator<<(std::ostream& os, const SubMessageId rhs)
00624 {
00625   switch (rhs) {
00626   case SUBMESSAGE_NONE:
00627     return os << "SUBMESSAGE_NONE";
00628   case MULTICAST_SYN:
00629     return os << "MULTICAST_SYN";
00630   case MULTICAST_SYNACK:
00631     return os << "MULTICAST_SYNACK";
00632   case MULTICAST_NAK:
00633     return os << "MULTICAST_NAK";
00634   case MULTICAST_NAKACK:
00635     return os << "MULTICAST_NAKACK";
00636   default:
00637     return os << "Unknown";
00638   }
00639 }
00640 
00641 /// Message header insertion onto an ostream.
00642 extern OpenDDS_Dcps_Export
00643 std::ostream& operator<<(std::ostream& str, const DataSampleHeader& value)
00644 {
00645   struct SaveAndRestoreStreamState {
00646     explicit SaveAndRestoreStreamState(std::ostream& s)
00647       : fill_(s.fill()), fmt_(s.flags()), s_(s) {}
00648     ~SaveAndRestoreStreamState()
00649     {
00650       s_.fill(fill_);
00651       s_.flags(fmt_);
00652     }
00653     char fill_;
00654     std::ios_base::fmtflags fmt_;
00655     std::ostream& s_;
00656   } stream_state(str);
00657 
00658   if (value.submessage_id_ != SUBMESSAGE_NONE) {
00659     str << SubMessageId(value.submessage_id_)
00660       << " (0x" << std::hex << std::setw(2) << std::setfill('0')
00661       << unsigned(value.submessage_id_) << "), ";
00662 
00663   } else {
00664     str << MessageId(value.message_id_)
00665         << " (0x" << std::hex << std::setw(2) << std::setfill('0')
00666         << unsigned(value.message_id_) << "), ";
00667   }
00668 
00669   str << "Length: " << std::dec << value.message_length_ << ", ";
00670 
00671   str << "Byte order: " << (value.byte_order_ == 1 ? "Little" : "Big")
00672       << " Endian";
00673 
00674   if (value.message_id_ != TRANSPORT_CONTROL) {
00675     str << ", ";
00676 
00677     if (value.coherent_change_ == 1) str << "Coherent, ";
00678     if (value.historic_sample_ == 1) str << "Historic, ";
00679     if (value.lifespan_duration_ == 1) str << "Lifespan, ";
00680 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00681     if (value.group_coherent_ == 1) str << "Group-Coherent, ";
00682 #endif
00683     if (value.content_filter_ == 1) str << "Content-Filtered, ";
00684     if (value.sequence_repair_ == 1) str << "Sequence Repair, ";
00685     if (value.more_fragments_ == 1) str << "More Fragments, ";
00686     if (value.cdr_encapsulation_ == 1) str << "CDR Encapsulation, ";
00687     if (value.key_fields_only_ == 1) str << "Key Fields Only, ";
00688 
00689     str << "Sequence: 0x" << std::hex << std::setw(4) << std::setfill('0')
00690         << value.sequence_.getValue() << ", ";
00691 
00692     str << "Timestamp: " << std::dec << value.source_timestamp_sec_ << "."
00693         << std::dec << value.source_timestamp_nanosec_ << ", ";
00694 
00695     if (value.lifespan_duration_) {
00696       str << "Lifespan: " << std::dec << value.lifespan_duration_sec_ << "."
00697           << std::dec << value.lifespan_duration_nanosec_ << ", ";
00698     }
00699 
00700     str << "Publication: " << GuidConverter(value.publication_id_);
00701 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00702     if (value.group_coherent_) {
00703       str << ", Publisher: " << GuidConverter(value.publisher_id_);
00704     }
00705 #endif
00706 
00707     if (value.content_filter_) {
00708       const CORBA::ULong len = value.content_filter_entries_.length();
00709       str << ", Content-Filter Entries (" << len << "): [";
00710       for (CORBA::ULong i(0); i < len; ++i) {
00711         str << GuidConverter(value.content_filter_entries_[i]) << ' ';
00712       }
00713       str << ']';
00714     }
00715   }
00716 
00717   return str;
00718 }
00719 #endif //OPENDDS_SAFETY_PROFILE
00720 
00721 
00722 bool
00723 DataSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00724 {
00725   rds.header_ = *this;
00726   return true;
00727 }
00728 
00729 } // namespace DCPS
00730 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7