OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
OpenDDS::RTPS::Sedp::Writer Class Reference
Inheritance diagram for OpenDDS::RTPS::Sedp::Writer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Sedp::Writer:
Collaboration graph
[legend]

Public Member Functions

 Writer (const DCPS::GUID_t &pub_id, Sedp &sedp, ACE_INT64 seq_init=1)
 
virtual ~Writer ()
 
bool assoc (const DCPS::AssociationData &subscription)
 
void transport_assoc_done (int flags, const DCPS::GUID_t &remote)
 
void data_delivered (const DCPS::DataSampleElement *)
 
void data_dropped (const DCPS::DataSampleElement *, bool by_transport)
 
void data_acked (const GUID_t &remote)
 
void control_delivered (const DCPS::Message_Block_Ptr &sample)
 
void control_dropped (const DCPS::Message_Block_Ptr &sample, bool dropped_by_transport)
 
void notify_publication_disconnected (const DCPS::ReaderIdSeq &)
 
void notify_publication_reconnected (const DCPS::ReaderIdSeq &)
 
void notify_publication_lost (const DCPS::ReaderIdSeq &)
 
void remove_associations (const DCPS::ReaderIdSeq &, bool)
 
void replay_durable_data_for (const DCPS::GUID_t &remote_sub_id)
 
void retrieve_inline_qos_data (InlineQosData &) const
 
const DCPS::SequenceNumberget_seq () const
 
DDS::ReturnCode_t write_parameter_list (const ParameterList &plist, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence)
 
void end_historic_samples (const DCPS::GUID_t &reader)
 
void request_ack (const DCPS::GUID_t &reader)
 
void send_deferred_samples (const GUID_t &reader)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendListener
virtual ~TransportSendListener ()
 
virtual SendControlStatus send_control_customized (const DataLinkSet_rch &links, const DataSampleHeader &header, ACE_Message_Block *msg, void *extra)
 
virtual void transport_discovery_change ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::RTPS::Sedp::Endpoint
 Endpoint (const DCPS::GUID_t &repo_id, Sedp &sedp)
 
virtual ~Endpoint ()
 
bool check_transport_qos (const DCPS::TransportInst &)
 
DCPS::GUID_t get_guid () const
 
DDS::DomainId_t domain_id () const
 
CORBA::Long get_priority_value (const DCPS::AssociationData &) const
 
void set_crypto_handles (DDS::Security::ParticipantCryptoHandle p, DDS::Security::NativeCryptoHandle e=DDS::HANDLE_NIL)
 
DDS::Security::ParticipantCryptoHandle get_crypto_handle () const
 
DDS::Security::NativeCryptoHandle get_endpoint_crypto_handle () const
 
void shutting_down ()
 
OPENDDS_STRING get_instance_name (const DCPS::GUID_t &id) const
 
EntityId_t counterpart_entity_id () const
 
GUID_t make_counterpart_guid (const DCPS::GUID_t &remote_part) const
 
bool associated_with_counterpart (const DCPS::GUID_t &remote_part) const
 
bool pending_association_with_counterpart (const DCPS::GUID_t &remote_part) const
 
bool associated_with_counterpart_if_not_pending (const DCPS::GUID_t &remote_part) const
 
RcHandle< DCPS::BitSubscriberget_builtin_subscriber_proxy () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 

Protected Member Functions

typedef OPENDDS_MAP (DCPS::SequenceNumber, DCPS::DataSampleElement *) PerReaderDeferredSamples
 
typedef OPENDDS_MAP (GUID_t, PerReaderDeferredSamples) DeferredSamples
 
void send_sample (DCPS::Message_Block_Ptr payload, size_t size, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence, bool historic=false)
 
void send_sample_i (DCPS::DataSampleElement *el)
 
void set_header_fields (DCPS::DataSampleHeader &dsh, size_t size, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence, bool historic_sample=false, DCPS::MessageId id=DCPS::SAMPLE_DATA)
 
void write_control_msg (DCPS::Message_Block_Ptr payload, size_t size, DCPS::MessageId id, DCPS::SequenceNumber seq=DCPS::SequenceNumber())
 
virtual bool deferrable () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendListener
 TransportSendListener ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 

Protected Attributes

DeferredSamples deferred_samples_
 
DCPS::SequenceNumber seq_
 
- Protected Attributes inherited from OpenDDS::RTPS::Sedp::Endpoint
DCPS::GUID_t repo_id_
 
Sedpsedp_
 
AtomicBool shutting_down_
 
DDS::Security::ParticipantCryptoHandle participant_crypto_handle_
 
DDS::Security::NativeCryptoHandle endpoint_crypto_handle_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Detailed Description

Definition at line 432 of file Sedp.h.

Constructor & Destructor Documentation

◆ Writer()

OpenDDS::RTPS::Sedp::Writer::Writer ( const DCPS::GUID_t pub_id,
Sedp sedp,
ACE_INT64  seq_init = 1 
)

Definition at line 3180 of file Sedp.cpp.

3181  : Endpoint(pub_id, sedp), seq_(seq_init)
3182 {
3183 }
Endpoint(const DCPS::GUID_t &repo_id, Sedp &sedp)
Definition: Sedp.h:346
DCPS::SequenceNumber seq_
Definition: Sedp.h:505

◆ ~Writer()

OpenDDS::RTPS::Sedp::Writer::~Writer ( )
virtual

Definition at line 3185 of file Sedp.cpp.

3186 {
3187 }

Member Function Documentation

◆ assoc()

bool OpenDDS::RTPS::Sedp::Writer::assoc ( const DCPS::AssociationData subscription)

Definition at line 3190 of file Sedp.cpp.

References OpenDDS::DCPS::TransportClient::associate().

3191 {
3192  return associate(subscription, true);
3193 }
bool associate(const AssociationData &peer, bool active)

◆ control_delivered()

void OpenDDS::RTPS::Sedp::Writer::control_delivered ( const DCPS::Message_Block_Ptr sample)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3231 of file Sedp.cpp.

3232 {
3233 }

◆ control_dropped()

void OpenDDS::RTPS::Sedp::Writer::control_dropped ( const DCPS::Message_Block_Ptr sample,
bool  dropped_by_transport 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3236 of file Sedp.cpp.

3237 {
3238 }

◆ data_acked()

void OpenDDS::RTPS::Sedp::Writer::data_acked ( const GUID_t remote)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3225 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::data_acked_i(), OpenDDS::RTPS::Sedp::Endpoint::get_guid(), and OpenDDS::RTPS::Sedp::Endpoint::sedp_.

3226 {
3227  sedp_.data_acked_i(get_guid(), remote);
3228 }
DCPS::GUID_t get_guid() const
Definition: Sedp.h:364
void data_acked_i(const DCPS::GUID_t &local_id, const DCPS::GUID_t &remote_id)
Definition: Sedp.cpp:2977

◆ data_delivered()

void OpenDDS::RTPS::Sedp::Writer::data_delivered ( const DCPS::DataSampleElement dsle)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3213 of file Sedp.cpp.

3214 {
3215  delete dsle;
3216 }

◆ data_dropped()

void OpenDDS::RTPS::Sedp::Writer::data_dropped ( const DCPS::DataSampleElement dsle,
bool  by_transport 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3219 of file Sedp.cpp.

3220 {
3221  delete dsle;
3222 }

◆ deferrable()

virtual bool OpenDDS::RTPS::Sedp::Writer::deferrable ( ) const
inlineprotectedvirtual

Reimplemented in OpenDDS::RTPS::Sedp::TypeLookupReplyWriter, and OpenDDS::RTPS::Sedp::TypeLookupRequestWriter.

Definition at line 500 of file Sedp.h.

Referenced by send_sample().

501  {
502  return false;
503  }

◆ end_historic_samples()

void OpenDDS::RTPS::Sedp::Writer::end_historic_samples ( const DCPS::GUID_t reader)

Definition at line 3513 of file Sedp.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_Message_Block::cont(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), LM_ERROR, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), ACE_Message_Block::wr_ptr(), and write_control_msg().

3514 {
3516  new ACE_Message_Block(
3519  new ACE_Message_Block(
3520  reinterpret_cast<const char*>(&reader),
3521  sizeof(reader))));
3522  if (mb.get()) {
3523  mb->cont()->wr_ptr(sizeof(reader));
3524  // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data
3525  write_control_msg(move(mb), sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
3527  } else {
3528  ACE_ERROR((LM_ERROR,
3529  ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::end_historic_samples")
3530  ACE_TEXT(" - Failed to allocate message block message\n")));
3531  }
3532 }
#define ACE_ERROR(X)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
void write_control_msg(DCPS::Message_Block_Ptr payload, size_t size, DCPS::MessageId id, DCPS::SequenceNumber seq=DCPS::SequenceNumber())
Definition: Sedp.cpp:3557
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ get_seq()

const DCPS::SequenceNumber& OpenDDS::RTPS::Sedp::Writer::get_seq ( ) const
inline

Definition at line 460 of file Sedp.h.

References OpenDDS::RTPS::Sedp::OPENDDS_MAP().

461  {
462  return seq_;
463  }
DCPS::SequenceNumber seq_
Definition: Sedp.h:505

◆ notify_publication_disconnected()

void OpenDDS::RTPS::Sedp::Writer::notify_publication_disconnected ( const DCPS::ReaderIdSeq )
inlinevirtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 453 of file Sedp.h.

453 {}

◆ notify_publication_lost()

void OpenDDS::RTPS::Sedp::Writer::notify_publication_lost ( const DCPS::ReaderIdSeq )
inlinevirtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 455 of file Sedp.h.

455 {}

◆ notify_publication_reconnected()

void OpenDDS::RTPS::Sedp::Writer::notify_publication_reconnected ( const DCPS::ReaderIdSeq )
inlinevirtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 454 of file Sedp.h.

454 {}

◆ OPENDDS_MAP() [1/2]

typedef OpenDDS::RTPS::Sedp::Writer::OPENDDS_MAP ( DCPS::SequenceNumber  ,
DCPS::DataSampleElement  
)
protected

◆ OPENDDS_MAP() [2/2]

typedef OpenDDS::RTPS::Sedp::Writer::OPENDDS_MAP ( GUID_t  ,
PerReaderDeferredSamples   
)
protected

◆ remove_associations()

void OpenDDS::RTPS::Sedp::Writer::remove_associations ( const DCPS::ReaderIdSeq ,
bool   
)
inlinevirtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 456 of file Sedp.h.

References OpenDDS::RTPS::Sedp::replay_durable_data_for().

456 {}

◆ replay_durable_data_for()

void OpenDDS::RTPS::Sedp::Writer::replay_durable_data_for ( const DCPS::GUID_t remote_sub_id)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 3241 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::replay_durable_data_for(), and OpenDDS::RTPS::Sedp::Endpoint::sedp_.

3242 {
3243  // Ideally, we would have the data cached and ready for replay but we do not.
3244  sedp_.replay_durable_data_for(remote_sub_id);
3245 }
void replay_durable_data_for(const DCPS::GUID_t &remote_sub_id)
Definition: Sedp.cpp:1634

◆ request_ack()

void OpenDDS::RTPS::Sedp::Writer::request_ack ( const DCPS::GUID_t reader)

Definition at line 3535 of file Sedp.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_Message_Block::cont(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), LM_ERROR, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), ACE_Message_Block::wr_ptr(), and write_control_msg().

3536 {
3538  new ACE_Message_Block(
3541  new ACE_Message_Block(
3542  reinterpret_cast<const char*>(&reader),
3543  sizeof(reader))));
3544  if (mb.get()) {
3545  mb->cont()->wr_ptr(sizeof(reader));
3546  // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data
3547  write_control_msg(move(mb), sizeof(reader), DCPS::REQUEST_ACK,
3549  } else {
3550  ACE_ERROR((LM_ERROR,
3551  ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::request_ack")
3552  ACE_TEXT(" - Failed to allocate message block message\n")));
3553  }
3554 }
#define ACE_ERROR(X)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
void write_control_msg(DCPS::Message_Block_Ptr payload, size_t size, DCPS::MessageId id, DCPS::SequenceNumber seq=DCPS::SequenceNumber())
Definition: Sedp.cpp:3557
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ retrieve_inline_qos_data()

void OpenDDS::RTPS::Sedp::Writer::retrieve_inline_qos_data ( InlineQosData ) const
inlinevirtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 458 of file Sedp.h.

458 {}

◆ send_deferred_samples()

void OpenDDS::RTPS::Sedp::Writer::send_deferred_samples ( const GUID_t reader)

Definition at line 3283 of file Sedp.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, deferred_samples_, LM_DEBUG, and send_sample_i().

3284 {
3285  DeferredSamples::iterator samples_for_reader = deferred_samples_.find(reader);
3286  if (samples_for_reader != deferred_samples_.end()) {
3287  if (DCPS::DCPS_debug_level >= 8) {
3288  ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Writer::send_deferred_samples to %C\n",
3289  DCPS::LogGuid(reader).c_str()));
3290  }
3291  for (PerReaderDeferredSamples::iterator i = samples_for_reader->second.begin();
3292  i != samples_for_reader->second.end(); ++i) {
3293  send_sample_i(i->second);
3294  }
3295  deferred_samples_.erase(samples_for_reader);
3296  }
3297 }
#define ACE_DEBUG(X)
DeferredSamples deferred_samples_
Definition: Sedp.h:478
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void send_sample_i(DCPS::DataSampleElement *el)
Definition: Sedp.cpp:3299

◆ send_sample()

void OpenDDS::RTPS::Sedp::Writer::send_sample ( DCPS::Message_Block_Ptr  payload,
size_t  size,
const DCPS::GUID_t reader,
DCPS::SequenceNumber sequence,
bool  historic = false 
)
protected

Definition at line 3247 of file Sedp.cpp.

References ACE_DEBUG, OpenDDS::RTPS::Sedp::Endpoint::associated_with_counterpart(), OpenDDS::DCPS::DCPS_debug_level, deferrable(), deferred_samples_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::GUID_UNKNOWN, LM_DEBUG, OpenDDS::DCPS::move(), OpenDDS::RTPS::Sedp::Endpoint::repo_id_, send_sample_i(), set_header_fields(), OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::DataSampleElement::set_sub_id().

Referenced by OpenDDS::RTPS::Sedp::TypeLookupReplyWriter::send_type_lookup_reply(), OpenDDS::RTPS::Sedp::TypeLookupRequestWriter::send_type_lookup_request(), write_parameter_list(), OpenDDS::RTPS::Sedp::LivelinessWriter::write_participant_message(), OpenDDS::RTPS::Sedp::SecurityWriter::write_stateless_message(), and OpenDDS::RTPS::Sedp::SecurityWriter::write_volatile_message_secure().

3252 {
3253  if (DCPS::DCPS_debug_level >= 8) {
3254  ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Writer::send_sample from %C to %C\n",
3255  DCPS::LogGuid(repo_id_).c_str(),
3256  DCPS::LogGuid(reader).c_str()));
3257  }
3258  DCPS::DataSampleElement* el = new DCPS::DataSampleElement(repo_id_, this, DCPS::PublicationInstance_rch());
3259  set_header_fields(el->get_header(), size, reader, sequence, historic);
3260 
3261  el->set_sample(DCPS::move(payload));
3262  *el->get_sample() << el->get_header();
3263 
3264  if (reader != GUID_UNKNOWN) {
3265  el->set_sub_id(0, reader);
3266  el->set_num_subs(1);
3267 
3268  if (deferrable() && !associated_with_counterpart(reader)) {
3269  if (DCPS::DCPS_debug_level >= 8) {
3270  ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Writer::send_sample: "
3271  "counterpart isn't associated, deferring\n"));
3272  }
3273  DeferredSamples::iterator samples_for_reader = deferred_samples_.insert(
3274  std::make_pair(reader, PerReaderDeferredSamples())).first;
3275  samples_for_reader->second.insert(std::make_pair(sequence, el));
3276  return;
3277  }
3278  }
3279 
3280  send_sample_i(el);
3281 }
RcHandle< PublicationInstance > PublicationInstance_rch
#define ACE_DEBUG(X)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
bool associated_with_counterpart(const DCPS::GUID_t &remote_part) const
Definition: Sedp.cpp:3162
DeferredSamples deferred_samples_
Definition: Sedp.h:478
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void set_header_fields(DCPS::DataSampleHeader &dsh, size_t size, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence, bool historic_sample=false, DCPS::MessageId id=DCPS::SAMPLE_DATA)
Definition: Sedp.cpp:3569
DCPS::GUID_t repo_id_
Definition: Sedp.h:423
void send_sample_i(DCPS::DataSampleElement *el)
Definition: Sedp.cpp:3299
virtual bool deferrable() const
Definition: Sedp.h:500

◆ send_sample_i()

void OpenDDS::RTPS::Sedp::Writer::send_sample_i ( DCPS::DataSampleElement el)
protected

Definition at line 3299 of file Sedp.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), and OpenDDS::DCPS::TransportClient::send().

Referenced by send_deferred_samples(), and send_sample().

3300 {
3301  DCPS::SendStateDataSampleList list;
3302  list.enqueue_tail(el);
3303 
3304  send(list);
3305 }
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)

◆ set_header_fields()

void OpenDDS::RTPS::Sedp::Writer::set_header_fields ( DCPS::DataSampleHeader dsh,
size_t  size,
const DCPS::GUID_t reader,
DCPS::SequenceNumber sequence,
bool  historic_sample = false,
DCPS::MessageId  id = DCPS::SAMPLE_DATA 
)
protected

Definition at line 3569 of file Sedp.cpp.

References ACE_CDR_BYTE_ORDER, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::RTPS::Sedp::Endpoint::repo_id_, OpenDDS::DCPS::REQUEST_ACK, seq_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, and OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_.

Referenced by send_sample(), and write_control_msg().

3575 {
3576  dsh.message_id_ = id;
3577  dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
3578  dsh.message_length_ = static_cast<ACE_UINT32>(size);
3579  dsh.publication_id_ = repo_id_;
3580 
3581  if (id != DCPS::END_HISTORIC_SAMPLES && id != DCPS::REQUEST_ACK &&
3582  (reader == GUID_UNKNOWN ||
3584  sequence = seq_++;
3585  }
3586 
3587  if (historic_sample && reader != GUID_UNKNOWN) {
3588  // retransmit with same seq# for durability
3589  dsh.historic_sample_ = true;
3590  }
3591 
3592  dsh.sequence_ = sequence;
3593 
3594  const SystemTimePoint now = SystemTimePoint::now();
3595  dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.value().sec());
3596  dsh.source_timestamp_nanosec_ = static_cast<ACE_UINT32>(now.value().usec() * 1000);
3597 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
TimePoint_T< SystemClock > SystemTimePoint
Definition: TimeTypes.h:32
#define ACE_CDR_BYTE_ORDER
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
DCPS::GUID_t repo_id_
Definition: Sedp.h:423
DCPS::SequenceNumber seq_
Definition: Sedp.h:505

◆ transport_assoc_done()

void OpenDDS::RTPS::Sedp::Writer::transport_assoc_done ( int  flags,
const DCPS::GUID_t remote 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 3195 of file Sedp.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_OK, OpenDDS::RTPS::Sedp::association_complete_i(), LM_ERROR, OpenDDS::RTPS::Sedp::Endpoint::repo_id_, OpenDDS::RTPS::Sedp::Endpoint::sedp_, and OpenDDS::RTPS::Sedp::Endpoint::shutting_down_.

3196 {
3197  if (!(flags & ASSOC_OK)) {
3198  ACE_ERROR((LM_ERROR,
3199  ACE_TEXT("(%P|%t) Sedp::Writer::transport_assoc_done: ")
3200  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
3201  DCPS::LogGuid(remote).c_str()));
3202  return;
3203  }
3204 
3205  if (shutting_down_) {
3206  return;
3207  }
3208 
3210 }
#define ACE_ERROR(X)
void association_complete_i(const DCPS::GUID_t &localId, const DCPS::GUID_t &remoteId)
Definition: Sedp.cpp:2920
AtomicBool shutting_down_
Definition: Sedp.h:425
ACE_TEXT("TCP_Factory")
DCPS::GUID_t repo_id_
Definition: Sedp.h:423

◆ write_control_msg()

void OpenDDS::RTPS::Sedp::Writer::write_control_msg ( DCPS::Message_Block_Ptr  payload,
size_t  size,
DCPS::MessageId  id,
DCPS::SequenceNumber  seq = DCPS::SequenceNumber() 
)
protected

Definition at line 3557 of file Sedp.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportClient::send_control(), and set_header_fields().

Referenced by end_historic_samples(), request_ack(), and OpenDDS::RTPS::Sedp::DiscoveryWriter::write_unregister_dispose().

3561 {
3562  DCPS::DataSampleHeader header;
3563  Writer::set_header_fields(header, size, GUID_UNKNOWN, seq, false, id);
3564  // no need to serialize header since rtps_udp transport ignores it
3565  send_control(header, DCPS::move(payload));
3566 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
void set_header_fields(DCPS::DataSampleHeader &dsh, size_t size, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence, bool historic_sample=false, DCPS::MessageId id=DCPS::SAMPLE_DATA)
Definition: Sedp.cpp:3569

◆ write_parameter_list()

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::Writer::write_parameter_list ( const ParameterList plist,
const DCPS::GUID_t reader,
DCPS::SequenceNumber sequence 
)

Definition at line 3308 of file Sedp.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_Message_Block::cont(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::MUTABLE, OpenDDS::DCPS::primitive_serialized_size_ulong(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_sample(), and OpenDDS::DCPS::serialized_size().

Referenced by OpenDDS::RTPS::Sedp::DiscoveryWriter::write_dcps_participant_secure().

3311 {
3313 
3314  // Determine message length
3315  size_t size = 0;
3316  DCPS::primitive_serialized_size_ulong(sedp_encoding, size);
3317  DCPS::serialized_size(sedp_encoding, size, plist);
3318 
3319  // Build and send RTPS message
3320  DCPS::Message_Block_Ptr payload(
3321  new ACE_Message_Block(
3324  new ACE_Message_Block(size)));
3325 
3326  if (!payload) {
3327  ACE_ERROR((LM_ERROR,
3328  ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_parameter_list")
3329  ACE_TEXT(" - Failed to allocate message block message\n")));
3330  return DDS::RETCODE_ERROR;
3331  }
3332 
3333  Serializer serializer(payload->cont(), sedp_encoding);
3334  DCPS::EncapsulationHeader encap;
3335  if (encap.from_encoding(sedp_encoding, DCPS::MUTABLE) &&
3336  serializer << encap && serializer << plist) {
3337  send_sample(move(payload), size, reader, sequence, reader != GUID_UNKNOWN);
3338  } else {
3339  result = DDS::RETCODE_ERROR;
3340  }
3341 
3342  return result;
3343 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export void primitive_serialized_size_ulong(const Encoding &encoding, size_t &size, size_t count=1)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
void send_sample(DCPS::Message_Block_Ptr payload, size_t size, const DCPS::GUID_t &reader, DCPS::SequenceNumber &sequence, bool historic=false)
Definition: Sedp.cpp:3247

Member Data Documentation

◆ deferred_samples_

DeferredSamples OpenDDS::RTPS::Sedp::Writer::deferred_samples_
protected

Definition at line 478 of file Sedp.h.

Referenced by send_deferred_samples(), and send_sample().

◆ seq_

DCPS::SequenceNumber OpenDDS::RTPS::Sedp::Writer::seq_
protected

The documentation for this class was generated from the following files: