Definition at line 156 of file Sedp.h.
OpenDDS::RTPS::Sedp::Writer::Writer | ( | const DCPS::RepoId & | pub_id, | |
Sedp & | sedp | |||
) |
OpenDDS::RTPS::Sedp::Writer::~Writer | ( | ) | [virtual] |
bool OpenDDS::RTPS::Sedp::Writer::assoc | ( | const DCPS::AssociationData & | subscription | ) |
Definition at line 1394 of file Sedp.cpp.
References OpenDDS::DCPS::TransportClient::associate().
Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().
01395 { 01396 return associate(subscription, true); 01397 }
void OpenDDS::RTPS::Sedp::Writer::control_delivered | ( | ACE_Message_Block * | sample | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 1412 of file Sedp.cpp.
01413 { 01414 if (mb->flags() == ACE_Message_Block::DONT_DELETE) { 01415 // We allocated mb on stack, its continuation block on heap 01416 delete mb->cont(); 01417 } else { 01418 mb->release(); 01419 } 01420 }
void OpenDDS::RTPS::Sedp::Writer::control_dropped | ( | ACE_Message_Block * | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 1423 of file Sedp.cpp.
01424 { 01425 if (mb->flags() == ACE_Message_Block::DONT_DELETE) { 01426 // We allocated mb on stack, its continuation block on heap 01427 delete mb->cont(); 01428 } else { 01429 mb->release(); 01430 } 01431 }
void OpenDDS::RTPS::Sedp::Writer::data_delivered | ( | const DCPS::DataSampleElement * | ) | [virtual] |
void OpenDDS::RTPS::Sedp::Writer::data_dropped | ( | const DCPS::DataSampleElement * | , | |
bool | by_transport | |||
) | [virtual] |
void OpenDDS::RTPS::Sedp::Writer::end_historic_samples | ( | const DCPS::RepoId & | reader | ) |
Definition at line 1584 of file Sedp.cpp.
References OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), and write_control_msg().
Referenced by OpenDDS::RTPS::Sedp::write_durable_participant_message_data(), OpenDDS::RTPS::Sedp::write_durable_publication_data(), and OpenDDS::RTPS::Sedp::write_durable_subscription_data().
01585 { 01586 const void* pReader = static_cast<const void*>(&reader); 01587 ACE_Message_Block mb(DCPS::DataSampleHeader::max_marshaled_size(), 01588 ACE_Message_Block::MB_DATA, 01589 new ACE_Message_Block(static_cast<const char*>(pReader), 01590 sizeof(reader))); 01591 mb.set_flags(ACE_Message_Block::DONT_DELETE); 01592 mb.cont()->wr_ptr(sizeof(reader)); 01593 // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data 01594 write_control_msg(mb, sizeof(reader), DCPS::END_HISTORIC_SAMPLES, 01595 DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()); 01596 }
void OpenDDS::RTPS::Sedp::Writer::notify_connection_deleted | ( | const DCPS::RepoId & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Writer::notify_publication_disconnected | ( | const DCPS::ReaderIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Writer::notify_publication_lost | ( | const DCPS::ReaderIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Writer::notify_publication_reconnected | ( | const DCPS::ReaderIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Writer::remove_associations | ( | const DCPS::ReaderIdSeq & | , | |
bool | ||||
) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Writer::retrieve_inline_qos_data | ( | InlineQosData & | ) | const [inline] |
void OpenDDS::RTPS::Sedp::Writer::set_header_fields | ( | DCPS::DataSampleHeader & | dsh, | |
size_t | size, | |||
const DCPS::RepoId & | reader, | |||
DCPS::SequenceNumber & | sequence, | |||
DCPS::MessageId | id = DCPS::SAMPLE_DATA | |||
) | [private] |
Definition at line 1611 of file Sedp.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::RTPS::Sedp::Endpoint::repo_id_, seq_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, and OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_.
Referenced by write_control_msg(), and write_sample().
01616 { 01617 dsh.message_id_ = id; 01618 dsh.byte_order_ = ACE_CDR_BYTE_ORDER; 01619 dsh.message_length_ = static_cast<ACE_UINT32>(size); 01620 dsh.publication_id_ = repo_id_; 01621 01622 if (reader == GUID_UNKNOWN || 01623 sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01624 sequence = seq_++; 01625 } 01626 01627 if (reader != GUID_UNKNOWN) { 01628 // retransmit with same seq# for durability 01629 dsh.historic_sample_ = true; 01630 } 01631 01632 dsh.sequence_ = sequence; 01633 01634 const ACE_Time_Value now = ACE_OS::gettimeofday(); 01635 dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec()); 01636 dsh.source_timestamp_nanosec_ = now.usec() * 1000; 01637 }
void OpenDDS::RTPS::Sedp::Writer::write_control_msg | ( | ACE_Message_Block & | payload, | |
size_t | size, | |||
DCPS::MessageId | id, | |||
DCPS::SequenceNumber | seq = DCPS::SequenceNumber() | |||
) | [private] |
Definition at line 1599 of file Sedp.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::TransportClient::send_control(), and set_header_fields().
Referenced by end_historic_samples().
01603 { 01604 DCPS::DataSampleHeader header; 01605 set_header_fields(header, size, GUID_UNKNOWN, seq, id); 01606 // no need to serialize header since rtps_udp transport ignores it 01607 send_control(header, &payload); 01608 }
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_sample | ( | const ParticipantMessageData & | pmd, | |
const DCPS::RepoId & | reader, | |||
DCPS::SequenceNumber & | sequence | |||
) |
Definition at line 1485 of file Sedp.cpp.
References alloc_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::find_size_ulong(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Sedp::host_is_bigendian_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::RTPS::Sedp::Endpoint::repo_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::TransportClient::send(), set_header_fields(), OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::DataSampleElement::set_sub_id().
01488 { 01489 DDS::ReturnCode_t result = DDS::RETCODE_OK; 01490 01491 // Determine message length 01492 size_t size = 0, padding = 0; 01493 DCPS::find_size_ulong(size, padding); 01494 DCPS::gen_find_size(pmd, size, padding); 01495 01496 // Build RTPS message 01497 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(), 01498 ACE_Message_Block::MB_DATA, 01499 new ACE_Message_Block(size)); 01500 using DCPS::Serializer; 01501 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR); 01502 bool ok = (ser << ACE_OutputCDR::from_octet(0)) && // CDR_LE = 0x0001 01503 (ser << ACE_OutputCDR::from_octet(1)) && 01504 (ser << ACE_OutputCDR::from_octet(0)) && 01505 (ser << ACE_OutputCDR::from_octet(0)) && 01506 (ser << pmd); 01507 if (!ok) { 01508 result = DDS::RETCODE_ERROR; 01509 } 01510 01511 if (result == DDS::RETCODE_OK) { 01512 // Send sample 01513 DCPS::DataSampleElement* list_el = 01514 new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0); 01515 set_header_fields(list_el->get_header(), size, reader, sequence); 01516 01517 list_el->set_sample(new ACE_Message_Block(size)); 01518 *list_el->get_sample() << list_el->get_header(); 01519 list_el->get_sample()->cont(payload.duplicate()); 01520 01521 if (reader != GUID_UNKNOWN) { 01522 list_el->set_sub_id(0, reader); 01523 list_el->set_num_subs(1); 01524 } 01525 01526 DCPS::SendStateDataSampleList list; 01527 list.enqueue_tail(list_el); 01528 01529 send(list); 01530 } 01531 delete payload.cont(); 01532 return result; 01533 }
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_sample | ( | const ParameterList & | plist, | |
const DCPS::RepoId & | reader, | |||
DCPS::SequenceNumber & | sequence | |||
) |
Definition at line 1434 of file Sedp.cpp.
References alloc_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::find_size_ulong(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Sedp::host_is_bigendian_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::RTPS::Sedp::Endpoint::repo_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::TransportClient::send(), set_header_fields(), OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::DataSampleElement::set_sub_id().
Referenced by OpenDDS::RTPS::Sedp::signal_liveliness().
01437 { 01438 DDS::ReturnCode_t result = DDS::RETCODE_OK; 01439 01440 // Determine message length 01441 size_t size = 0, padding = 0; 01442 DCPS::find_size_ulong(size, padding); 01443 DCPS::gen_find_size(plist, size, padding); 01444 01445 // Build RTPS message 01446 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(), 01447 ACE_Message_Block::MB_DATA, 01448 new ACE_Message_Block(size)); 01449 using DCPS::Serializer; 01450 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR); 01451 bool ok = (ser << ACE_OutputCDR::from_octet(0)) && // PL_CDR_LE = 0x0003 01452 (ser << ACE_OutputCDR::from_octet(3)) && 01453 (ser << ACE_OutputCDR::from_octet(0)) && 01454 (ser << ACE_OutputCDR::from_octet(0)) && 01455 (ser << plist); 01456 if (!ok) { 01457 result = DDS::RETCODE_ERROR; 01458 } 01459 01460 if (result == DDS::RETCODE_OK) { 01461 // Send sample 01462 DCPS::DataSampleElement* list_el = 01463 new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0); 01464 set_header_fields(list_el->get_header(), size, reader, sequence); 01465 01466 list_el->set_sample(new ACE_Message_Block(size)); 01467 *list_el->get_sample() << list_el->get_header(); 01468 list_el->get_sample()->cont(payload.duplicate()); 01469 01470 if (reader != GUID_UNKNOWN) { 01471 list_el->set_sub_id(0, reader); 01472 list_el->set_num_subs(1); 01473 } 01474 01475 DCPS::SendStateDataSampleList list; 01476 list.enqueue_tail(list_el); 01477 01478 send(list); 01479 } 01480 delete payload.cont(); 01481 return result; 01482 }
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_unregister_dispose | ( | const DCPS::RepoId & | rid | ) |
Header OpenDDS::RTPS::Sedp::Writer::header_ [private] |