OpenDDS::RTPS::Sedp::Writer Class Reference

Inheritance diagram for OpenDDS::RTPS::Sedp::Writer:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Sedp::Writer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Writer (const DCPS::RepoId &pub_id, Sedp &sedp)
virtual ~Writer ()
bool assoc (const DCPS::AssociationData &subscription)
void data_delivered (const DCPS::DataSampleElement *)
void data_dropped (const DCPS::DataSampleElement *, bool by_transport)
void control_delivered (ACE_Message_Block *sample)
void control_dropped (ACE_Message_Block *sample, bool dropped_by_transport)
void notify_publication_disconnected (const DCPS::ReaderIdSeq &)
void notify_publication_reconnected (const DCPS::ReaderIdSeq &)
void notify_publication_lost (const DCPS::ReaderIdSeq &)
void notify_connection_deleted (const DCPS::RepoId &)
void remove_associations (const DCPS::ReaderIdSeq &, bool)
void retrieve_inline_qos_data (InlineQosData &) const
DDS::ReturnCode_t write_sample (const ParameterList &plist, const DCPS::RepoId &reader, DCPS::SequenceNumber &sequence)
DDS::ReturnCode_t write_sample (const ParticipantMessageData &pmd, const DCPS::RepoId &reader, DCPS::SequenceNumber &sequence)
DDS::ReturnCode_t write_unregister_dispose (const DCPS::RepoId &rid)
void end_historic_samples (const DCPS::RepoId &reader)

Private Member Functions

void write_control_msg (ACE_Message_Block &payload, size_t size, DCPS::MessageId id, DCPS::SequenceNumber seq=DCPS::SequenceNumber())
void set_header_fields (DCPS::DataSampleHeader &dsh, size_t size, const DCPS::RepoId &reader, DCPS::SequenceNumber &sequence, DCPS::MessageId id=DCPS::SAMPLE_DATA)

Private Attributes

DCPS::TransportSendElementAllocator alloc_
Header header_
DCPS::SequenceNumber seq_

Detailed Description

Definition at line 156 of file Sedp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Sedp::Writer::Writer ( const DCPS::RepoId pub_id,
Sedp sedp 
)

OpenDDS::RTPS::Sedp::Writer::~Writer (  )  [virtual]

Definition at line 1389 of file Sedp.cpp.

01390 {
01391 }


Member Function Documentation

bool OpenDDS::RTPS::Sedp::Writer::assoc ( const DCPS::AssociationData subscription  ) 

Definition at line 1394 of file Sedp.cpp.

References OpenDDS::DCPS::TransportClient::associate().

Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().

01395 {
01396   return associate(subscription, true);
01397 }

void OpenDDS::RTPS::Sedp::Writer::control_delivered ( ACE_Message_Block *  sample  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 1412 of file Sedp.cpp.

01413 {
01414   if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01415     // We allocated mb on stack, its continuation block on heap
01416     delete mb->cont();
01417   } else {
01418     mb->release();
01419   }
01420 }

void OpenDDS::RTPS::Sedp::Writer::control_dropped ( ACE_Message_Block *  sample,
bool  dropped_by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 1423 of file Sedp.cpp.

01424 {
01425   if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01426     // We allocated mb on stack, its continuation block on heap
01427     delete mb->cont();
01428   } else {
01429     mb->release();
01430   }
01431 }

void OpenDDS::RTPS::Sedp::Writer::data_delivered ( const DCPS::DataSampleElement  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 1400 of file Sedp.cpp.

01401 {
01402   delete dsle;
01403 }

void OpenDDS::RTPS::Sedp::Writer::data_dropped ( const DCPS::DataSampleElement ,
bool  by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 1406 of file Sedp.cpp.

01407 {
01408   delete dsle;
01409 }

void OpenDDS::RTPS::Sedp::Writer::end_historic_samples ( const DCPS::RepoId reader  ) 

Definition at line 1584 of file Sedp.cpp.

References OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), and write_control_msg().

Referenced by OpenDDS::RTPS::Sedp::write_durable_participant_message_data(), OpenDDS::RTPS::Sedp::write_durable_publication_data(), and OpenDDS::RTPS::Sedp::write_durable_subscription_data().

01585 {
01586   const void* pReader = static_cast<const void*>(&reader);
01587   ACE_Message_Block mb(DCPS::DataSampleHeader::max_marshaled_size(),
01588                        ACE_Message_Block::MB_DATA,
01589                        new ACE_Message_Block(static_cast<const char*>(pReader),
01590                                              sizeof(reader)));
01591   mb.set_flags(ACE_Message_Block::DONT_DELETE);
01592   mb.cont()->wr_ptr(sizeof(reader));
01593   // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data
01594   write_control_msg(mb, sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
01595                     DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN());
01596 }

void OpenDDS::RTPS::Sedp::Writer::notify_connection_deleted ( const DCPS::RepoId  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 176 of file Sedp.h.

00176 {}

void OpenDDS::RTPS::Sedp::Writer::notify_publication_disconnected ( const DCPS::ReaderIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 173 of file Sedp.h.

00173 {}

void OpenDDS::RTPS::Sedp::Writer::notify_publication_lost ( const DCPS::ReaderIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 175 of file Sedp.h.

00175 {}

void OpenDDS::RTPS::Sedp::Writer::notify_publication_reconnected ( const DCPS::ReaderIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 174 of file Sedp.h.

00174 {}

void OpenDDS::RTPS::Sedp::Writer::remove_associations ( const DCPS::ReaderIdSeq ,
bool   
) [inline, virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 177 of file Sedp.h.

00177 {}

void OpenDDS::RTPS::Sedp::Writer::retrieve_inline_qos_data ( InlineQosData &   )  const [inline]

Definition at line 178 of file Sedp.h.

00178 {}

void OpenDDS::RTPS::Sedp::Writer::set_header_fields ( DCPS::DataSampleHeader dsh,
size_t  size,
const DCPS::RepoId reader,
DCPS::SequenceNumber sequence,
DCPS::MessageId  id = DCPS::SAMPLE_DATA 
) [private]

Definition at line 1611 of file Sedp.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::RTPS::Sedp::Endpoint::repo_id_, seq_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, and OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_.

Referenced by write_control_msg(), and write_sample().

01616 {
01617   dsh.message_id_ = id;
01618   dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
01619   dsh.message_length_ = static_cast<ACE_UINT32>(size);
01620   dsh.publication_id_ = repo_id_;
01621 
01622   if (reader == GUID_UNKNOWN ||
01623       sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01624     sequence = seq_++;
01625   }
01626 
01627   if (reader != GUID_UNKNOWN) {
01628     // retransmit with same seq# for durability
01629     dsh.historic_sample_ = true;
01630   }
01631 
01632   dsh.sequence_ = sequence;
01633 
01634   const ACE_Time_Value now = ACE_OS::gettimeofday();
01635   dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec());
01636   dsh.source_timestamp_nanosec_ = now.usec() * 1000;
01637 }

void OpenDDS::RTPS::Sedp::Writer::write_control_msg ( ACE_Message_Block &  payload,
size_t  size,
DCPS::MessageId  id,
DCPS::SequenceNumber  seq = DCPS::SequenceNumber() 
) [private]

Definition at line 1599 of file Sedp.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::TransportClient::send_control(), and set_header_fields().

Referenced by end_historic_samples().

01603 {
01604   DCPS::DataSampleHeader header;
01605   set_header_fields(header, size, GUID_UNKNOWN, seq, id);
01606   // no need to serialize header since rtps_udp transport ignores it
01607   send_control(header, &payload);
01608 }

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_sample ( const ParticipantMessageData pmd,
const DCPS::RepoId reader,
DCPS::SequenceNumber sequence 
)

Definition at line 1485 of file Sedp.cpp.

References alloc_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::find_size_ulong(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Sedp::host_is_bigendian_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::RTPS::Sedp::Endpoint::repo_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::TransportClient::send(), set_header_fields(), OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::DataSampleElement::set_sub_id().

01488 {
01489   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01490 
01491   // Determine message length
01492   size_t size = 0, padding = 0;
01493   DCPS::find_size_ulong(size, padding);
01494   DCPS::gen_find_size(pmd, size, padding);
01495 
01496   // Build RTPS message
01497   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01498                             ACE_Message_Block::MB_DATA,
01499                             new ACE_Message_Block(size));
01500   using DCPS::Serializer;
01501   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01502   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // CDR_LE = 0x0001
01503             (ser << ACE_OutputCDR::from_octet(1)) &&
01504             (ser << ACE_OutputCDR::from_octet(0)) &&
01505             (ser << ACE_OutputCDR::from_octet(0)) &&
01506             (ser << pmd);
01507   if (!ok) {
01508     result = DDS::RETCODE_ERROR;
01509   }
01510 
01511   if (result == DDS::RETCODE_OK) {
01512     // Send sample
01513     DCPS::DataSampleElement* list_el =
01514       new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01515     set_header_fields(list_el->get_header(), size, reader, sequence);
01516 
01517     list_el->set_sample(new ACE_Message_Block(size));
01518     *list_el->get_sample() << list_el->get_header();
01519     list_el->get_sample()->cont(payload.duplicate());
01520 
01521     if (reader != GUID_UNKNOWN) {
01522       list_el->set_sub_id(0, reader);
01523       list_el->set_num_subs(1);
01524     }
01525 
01526     DCPS::SendStateDataSampleList list;
01527     list.enqueue_tail(list_el);
01528 
01529     send(list);
01530   }
01531   delete payload.cont();
01532   return result;
01533 }

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_sample ( const ParameterList plist,
const DCPS::RepoId reader,
DCPS::SequenceNumber sequence 
)

Definition at line 1434 of file Sedp.cpp.

References alloc_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::find_size_ulong(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Sedp::host_is_bigendian_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::RTPS::Sedp::Endpoint::repo_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::TransportClient::send(), set_header_fields(), OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::DataSampleElement::set_sub_id().

Referenced by OpenDDS::RTPS::Sedp::signal_liveliness().

01437 {
01438   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01439 
01440   // Determine message length
01441   size_t size = 0, padding = 0;
01442   DCPS::find_size_ulong(size, padding);
01443   DCPS::gen_find_size(plist, size, padding);
01444 
01445   // Build RTPS message
01446   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01447                             ACE_Message_Block::MB_DATA,
01448                             new ACE_Message_Block(size));
01449   using DCPS::Serializer;
01450   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01451   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // PL_CDR_LE = 0x0003
01452             (ser << ACE_OutputCDR::from_octet(3)) &&
01453             (ser << ACE_OutputCDR::from_octet(0)) &&
01454             (ser << ACE_OutputCDR::from_octet(0)) &&
01455             (ser << plist);
01456   if (!ok) {
01457     result = DDS::RETCODE_ERROR;
01458   }
01459 
01460   if (result == DDS::RETCODE_OK) {
01461     // Send sample
01462     DCPS::DataSampleElement* list_el =
01463       new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01464     set_header_fields(list_el->get_header(), size, reader, sequence);
01465 
01466     list_el->set_sample(new ACE_Message_Block(size));
01467     *list_el->get_sample() << list_el->get_header();
01468     list_el->get_sample()->cont(payload.duplicate());
01469 
01470     if (reader != GUID_UNKNOWN) {
01471       list_el->set_sub_id(0, reader);
01472       list_el->set_num_subs(1);
01473     }
01474 
01475     DCPS::SendStateDataSampleList list;
01476     list.enqueue_tail(list_el);
01477 
01478     send(list);
01479   }
01480   delete payload.cont();
01481   return result;
01482 }

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_unregister_dispose ( const DCPS::RepoId rid  ) 


Member Data Documentation

DCPS::TransportSendElementAllocator OpenDDS::RTPS::Sedp::Writer::alloc_ [private]

Definition at line 191 of file Sedp.h.

Referenced by write_sample().

Header OpenDDS::RTPS::Sedp::Writer::header_ [private]

Definition at line 192 of file Sedp.h.

DCPS::SequenceNumber OpenDDS::RTPS::Sedp::Writer::seq_ [private]

Definition at line 193 of file Sedp.h.

Referenced by set_header_fields().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:50 2016 for OpenDDS by  doxygen 1.4.7