OpenDDS::DCPS::ReplayerImpl Class Reference

Implementation of Replayer functionality. More...

#include <ReplayerImpl.h>

Inheritance diagram for OpenDDS::DCPS::ReplayerImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReplayerImpl:
Collaboration graph
[legend]

List of all members.

Classes

struct  ReaderInfo

Public Member Functions

 ReplayerImpl ()
 ~ReplayerImpl ()
DDS::ReturnCode_t cleanup ()
virtual void init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
virtual DDS::ReturnCode_t write (const RawDataSample &sample)
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSample &sample)
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSampleList &samples)
virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos)
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos)
virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
virtual ReplayerListener_rch get_listener ()
virtual bool check_transport_qos (const TransportInst &inst)
virtual const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
virtual CORBA::Long get_priority_value (const AssociationData &data) const
virtual void data_delivered (const DataSampleElement *sample)
virtual void data_dropped (const DataSampleElement *sample, bool dropped_by_transport)
virtual void control_delivered (const Message_Block_Ptr &sample)
virtual void control_dropped (const Message_Block_Ptr &sample, bool dropped_by_transport)
virtual void notify_publication_disconnected (const ReaderIdSeq &subids)
virtual void notify_publication_reconnected (const ReaderIdSeq &subids)
virtual void notify_publication_lost (const ReaderIdSeq &subids)
virtual void retrieve_inline_qos_data (InlineQosData &qos_data) const
virtual void add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const ReaderIdSeq &readers, CORBA::Boolean callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void update_subscription_params (const RepoId &readerId, const DDS::StringSeq &exprParams)
virtual void inconsistent_topic ()
void remove_all_associations ()
virtual void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
DDS::ReturnCode_t enable ()
DomainParticipantImplparticipant ()
virtual DDS::InstanceHandle_t get_instance_handle ()

Public Attributes

int data_dropped_count_
 Statistics counter.
int data_delivered_count_

Private Member Functions

void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
DDS::ReturnCode_t write (const RawDataSample *sample_array, int array_size, DDS::InstanceHandle_t *reader)
DDS::ReturnCode_t create_sample_data_message (Message_Block_Ptr data, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
bool need_sequence_repair () const
void lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids.
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
void association_complete_i (const RepoId &remote_id)
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap

Private Attributes

size_t n_chunks_
 The number of chunks for the cached allocator.
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations.
CORBA::String_var type_name_
 The type name of associated topic.
DDS::DataWriterQos qos_
 The qos policy list of this datawriter.
DomainParticipantImplparticipant_servant_
RepoIdToReaderInfoMap reader_info_
CORBA::String_var topic_name_
 The name of associated topic.
RepoId topic_id_
 The associated topic repository id.
DDS::Topic_var topic_objref_
 The object reference of the associated topic.
TopicDescriptionPtr< TopicImpltopic_servant_
 The topic servant.
DDS::StatusMask listener_mask_
ReplayerListener_rch listener_
 Used to notify the entity for relevant events.
DDS::DomainId_t domain_id_
 The domain id.
PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter.
DDS::PublisherQos publisher_qos_
PublicationId publication_id_
 The repository id of this datawriter/publication.
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope.
ACE_Recursive_Thread_Mutex lock_
 The sample data container.
RepoIdToHandleMap id_to_handle_map_
RepoIdSet readers_
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
 Status conditions.
DDS::PublicationMatchedStatus publication_match_status_
unique_ptr< MessageBlockAllocatormb_allocator_
unique_ptr< DataBlockAllocatordb_allocator_
unique_ptr
< DataSampleHeaderAllocator
header_allocator_
unique_ptr
< DataSampleElementAllocator
sample_list_element_allocator_
bool is_bit_
 The time interval for sending liveliness message.
RepoIdToSequenceMap idToSequence_
RepoIdSet pending_readers_
RepoIdSet assoc_complete_readers_
ACE_Condition
< ACE_Recursive_Thread_Mutex
empty_condition_
int pending_write_count_

Friends

class ::DDS_TEST

Detailed Description

Implementation of Replayer functionality.

This class is the implementation of the Replayer. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 61 of file ReplayerImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ReplayerImpl::ReplayerImpl (  ) 

Definition at line 51 of file ReplayerImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, offered_incompatible_qos_status_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, and DDS::OfferedIncompatibleQosStatus::total_count_change.

00052   : data_dropped_count_(0),
00053   data_delivered_count_(0),
00054   n_chunks_(TheServiceParticipant->n_chunks()),
00055   association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00056   qos_(TheServiceParticipant->initial_DataWriterQos()),
00057   participant_servant_(0),
00058   topic_id_(GUID_UNKNOWN),
00059   topic_servant_(0),
00060   listener_mask_(DEFAULT_STATUS_MASK),
00061   domain_id_(0),
00062   publisher_servant_(0),
00063   publication_id_(GUID_UNKNOWN),
00064   sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00065   // data_container_(0),
00066   // liveliness_lost_(false),
00067   // last_deadline_missed_total_count_(0),
00068   is_bit_(false),
00069   empty_condition_(lock_),
00070   pending_write_count_(0)
00071 {
00072   // liveliness_lost_status_.total_count = 0;
00073   // liveliness_lost_status_.total_count_change = 0;
00074   //
00075   // offered_deadline_missed_status_.total_count = 0;
00076   // offered_deadline_missed_status_.total_count_change = 0;
00077   // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00078 
00079   offered_incompatible_qos_status_.total_count = 0;
00080   offered_incompatible_qos_status_.total_count_change = 0;
00081   offered_incompatible_qos_status_.last_policy_id = 0;
00082   offered_incompatible_qos_status_.policies.length(0);
00083 
00084   publication_match_status_.total_count = 0;
00085   publication_match_status_.total_count_change = 0;
00086   publication_match_status_.current_count = 0;
00087   publication_match_status_.current_count_change = 0;
00088   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00089 
00090 }

OpenDDS::DCPS::ReplayerImpl::~ReplayerImpl (  ) 

Definition at line 94 of file ReplayerImpl.cpp.

References DBG_ENTRY_LVL.

00095 {
00096   DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00097 }


Member Function Documentation

void OpenDDS::DCPS::ReplayerImpl::add_association ( const RepoId yourId,
const ReaderAssociation reader,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 385 of file ReplayerImpl.cpp.

References ACE_TEXT(), assoc_complete_readers_, OpenDDS::DCPS::TransportClient::associate(), association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataReaderQos::durability, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), is_bit_, LM_DEBUG, LM_ERROR, lock_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.

00388 {
00389   DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00390 
00391   if (DCPS_debug_level >= 1) {
00392     GuidConverter writer_converter(yourId);
00393     GuidConverter reader_converter(reader.readerId);
00394     ACE_DEBUG((LM_DEBUG,
00395                ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00396                ACE_TEXT("bit %d local %C remote %C\n"),
00397                is_bit_,
00398                OPENDDS_STRING(writer_converter).c_str(),
00399                OPENDDS_STRING(reader_converter).c_str()));
00400   }
00401 
00402   // if (entity_deleted_ == true) {
00403   //   if (DCPS_debug_level >= 1)
00404   //     ACE_DEBUG((LM_DEBUG,
00405   //                ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
00406   //                ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00407   //
00408   //   return;
00409   // }
00410 
00411   if (GUID_UNKNOWN == publication_id_) {
00412     publication_id_ = yourId;
00413   }
00414 
00415   {
00416     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00417     reader_info_.insert(std::make_pair(reader.readerId,
00418                                        ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00419                                                   reader.exprParams, participant_servant_,
00420                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00421   }
00422 
00423   if (DCPS_debug_level > 4) {
00424     GuidConverter converter(publication_id_);
00425     ACE_DEBUG((LM_DEBUG,
00426                ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00427                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00428                OPENDDS_STRING(converter).c_str(),
00429                qos_.transport_priority.value));
00430   }
00431 
00432   AssociationData data;
00433   data.remote_id_ = reader.readerId;
00434   data.remote_data_ = reader.readerTransInfo;
00435   data.remote_reliable_ =
00436     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00437   data.remote_durable_ =
00438     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00439 
00440   if (!this->associate(data, active)) {
00441     //FUTURE: inform inforepo and try again as passive peer
00442     if (DCPS_debug_level) {
00443       ACE_ERROR((LM_ERROR,
00444                  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00445                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00446     }
00447     return;
00448   }
00449 
00450   if (active) {
00451     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00452 
00453     // Have we already received an association_complete() callback?
00454     if (assoc_complete_readers_.count(reader.readerId)) {
00455       assoc_complete_readers_.erase(reader.readerId);
00456       association_complete_i(reader.readerId);
00457 
00458       // Add to pending_readers_ -> pending means we are waiting
00459       // for the association_complete() callback.
00460     } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00461       GuidConverter converter(reader.readerId);
00462       ACE_ERROR((LM_ERROR,
00463                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00464                  ACE_TEXT("failed to mark %C as pending.\n"),
00465                  OPENDDS_STRING(converter).c_str()));
00466 
00467     } else {
00468       if (DCPS_debug_level > 0) {
00469         GuidConverter converter(reader.readerId);
00470         ACE_DEBUG((LM_DEBUG,
00471                    ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00472                    ACE_TEXT("marked %C as pending.\n"),
00473                    OPENDDS_STRING(converter).c_str()));
00474       }
00475     }
00476   } else {
00477     // In the current implementation, DataWriter is always active, so this
00478     // code will not be applicable.
00479     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00480     disco->association_complete(this->domain_id_,
00481                                 this->participant_servant_->get_id(),
00482                                 this->publication_id_, reader.readerId);
00483   }
00484 }

Here is the call graph for this function:

void OpenDDS::DCPS::ReplayerImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 506 of file ReplayerImpl.cpp.

References ACE_TEXT(), assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, LM_DEBUG, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and OpenDDS::DCPS::remove().

00507 {
00508   DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00509 
00510   if (DCPS_debug_level >= 1) {
00511     GuidConverter writer_converter(this->publication_id_);
00512     GuidConverter reader_converter(remote_id);
00513     ACE_DEBUG((LM_DEBUG,
00514                ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00515                ACE_TEXT("bit %d local %C remote %C\n"),
00516                is_bit_,
00517                OPENDDS_STRING(writer_converter).c_str(),
00518                OPENDDS_STRING(reader_converter).c_str()));
00519   }
00520 
00521   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00522   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00523     // Not found in pending_readers_, defer calling association_complete_i()
00524     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00525     assoc_complete_readers_.insert(remote_id);
00526   } else {
00527     association_complete_i(remote_id);
00528   }
00529 }

Here is the call graph for this function:

void OpenDDS::DCPS::ReplayerImpl::association_complete_i ( const RepoId remote_id  )  [private]

Definition at line 532 of file ReplayerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::bind(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OPENDDS_STRING, participant_servant_, publication_match_status_, readers_, DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by add_association(), and association_complete().

00533 {
00534   DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00535   // bool reader_durable = false;
00536   {
00537     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00538     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00539       GuidConverter converter(remote_id);
00540       ACE_ERROR((LM_ERROR,
00541                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00542                  ACE_TEXT("insert %C from pending failed.\n"),
00543                  OPENDDS_STRING(converter).c_str()));
00544     }
00545     // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00546     // if (it != reader_info_.end()) {
00547     //   reader_durable = it->second.durable_;
00548     // }
00549   }
00550 
00551   if (!is_bit_) {
00552 
00553     DDS::InstanceHandle_t handle =
00554       this->participant_servant_->id_to_handle(remote_id);
00555 
00556     {
00557       // protect publication_match_status_ and status changed flags.
00558       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00559 
00560       // update the publication_match_status_
00561       ++publication_match_status_.total_count;
00562       ++publication_match_status_.total_count_change;
00563       ++publication_match_status_.current_count;
00564       ++publication_match_status_.current_count_change;
00565 
00566       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00567         GuidConverter converter(remote_id);
00568         ACE_DEBUG((LM_WARNING,
00569                    ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00570                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00571                    OPENDDS_STRING(converter).c_str(),
00572                    handle));
00573         return;
00574 
00575       } else if (DCPS_debug_level > 4) {
00576         GuidConverter converter(remote_id);
00577         ACE_DEBUG((LM_DEBUG,
00578                    ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00579                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00580                    OPENDDS_STRING(converter).c_str(),
00581                    handle));
00582       }
00583 
00584       publication_match_status_.last_subscription_handle = handle;
00585 
00586     }
00587 
00588 
00589     if (listener_.in()) {
00590       listener_->on_replayer_matched(this,
00591                                      publication_match_status_);
00592 
00593       // TBD - why does the spec say to change this but not
00594       // change the ChangeFlagStatus after a listener call?
00595       publication_match_status_.total_count_change = 0;
00596       publication_match_status_.current_count_change = 0;
00597     }
00598 
00599   }
00600 
00601 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::ReplayerImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 835 of file ReplayerImpl.cpp.

00836 {
00837   // DataWriter does not impose any constraints on which transports
00838   // may be used based on QoS.
00839   return true;
00840 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::cleanup ( void   ) 

cleanup the DataWriter.

Definition at line 101 of file ReplayerImpl.cpp.

References CORBA::LocalObject::_nil(), ACE_TEXT(), domain_id_, empty_condition_, LM_ERROR, lock_, pending_write_count_, publication_id_, remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, topic_objref_, topic_servant_, and ACE_Condition< ACE_Recursive_Thread_Mutex >::wait().

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().

00102 {
00103 
00104   //     // Unregister all registered instances prior to deletion.
00105   //     // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00106   //     // this->unregister_instances(source_timestamp);
00107   //
00108   //     // CORBA::String_var topic_name = this->get_Atopic_name();
00109   {
00110     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00111 
00112     // Wait for pending samples to drain prior to removing associations
00113     // and unregistering the publication.
00114     while (this->pending_write_count_)
00115       this->empty_condition_.wait();
00116 
00117     // Call remove association before unregistering the datawriter
00118     // with the transport, otherwise some callbacks resulted from
00119     // remove_association may lost.
00120     this->remove_all_associations();
00121 
00122     // release our Topic_var
00123     topic_objref_ = DDS::Topic::_nil();
00124     topic_servant_ = 0;
00125 
00126   }
00127 
00128   // not just unregister but remove any pending writes/sends.
00129   // this->unregister_all();
00130 
00131   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00132   if (!disco->remove_publication(
00133         this->domain_id_,
00134         this->participant_servant_->get_id(),
00135         this->publication_id_)) {
00136     ACE_ERROR_RETURN((LM_ERROR,
00137                       ACE_TEXT("(%P|%t) ERROR: ")
00138                       ACE_TEXT("PublisherImpl::delete_datawriter, ")
00139                       ACE_TEXT("publication not removed from discovery.\n")),
00140                      DDS::RETCODE_ERROR);
00141   }
00142   return DDS::RETCODE_OK;
00143 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::control_delivered ( const Message_Block_Ptr sample  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 883 of file ReplayerImpl.cpp.

00884 {
00885   ACE_UNUSED_ARG(sample);
00886 }

void OpenDDS::DCPS::ReplayerImpl::control_dropped ( const Message_Block_Ptr sample,
bool  dropped_by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 907 of file ReplayerImpl.cpp.

00909 {
00910   ACE_UNUSED_ARG(sample);
00911 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::create_sample_data_message ( Message_Block_Ptr  data,
DataSampleHeader header_data,
Message_Block_Ptr message,
const DDS::Time_t source_timestamp,
bool  content_filter 
) [private]

Definition at line 1027 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, db_allocator_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, and ACE_Time_Value::zero.

Referenced by write().

01032 {
01033   header_data.message_id_ = SAMPLE_DATA;
01034   header_data.coherent_change_ = content_filter;
01035 
01036   header_data.content_filter_ = 0;
01037   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01038   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01039   header_data.sequence_repair_ = need_sequence_repair();
01040   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01041     this->sequence_number_ = SequenceNumber();
01042   } else {
01043     ++this->sequence_number_;
01044   }
01045   header_data.sequence_ = this->sequence_number_;
01046   header_data.source_timestamp_sec_ = source_timestamp.sec;
01047   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01048 
01049   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01050       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01051     header_data.lifespan_duration_ = true;
01052     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01053     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01054   }
01055 
01056   // header_data.publication_id_ = publication_id_;
01057   // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01058   size_t max_marshaled_size = header_data.max_marshaled_size();
01059   ACE_Message_Block* tmp;
01060   ACE_NEW_MALLOC_RETURN(tmp,
01061                         static_cast<ACE_Message_Block*>(
01062                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01063                         ACE_Message_Block(max_marshaled_size,
01064                                           ACE_Message_Block::MB_DATA,
01065                                           data.release(),   //cont
01066                                           0,   //data
01067                                           header_allocator_.get(),   //alloc_strategy
01068                                           0,   //locking_strategy
01069                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01070                                           ACE_Time_Value::zero,
01071                                           ACE_Time_Value::max_time,
01072                                           db_allocator_.get(),
01073                                           mb_allocator_.get()),
01074                         DDS::RETCODE_ERROR);
01075   message.reset(tmp);
01076   *message << header_data;
01077   return DDS::RETCODE_OK;
01078 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::data_delivered ( const DataSampleElement sample  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 855 of file ReplayerImpl.cpp.

References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), data_delivered_count_, DBG_ENTRY_LVL, empty_condition_, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, lock_, OPENDDS_STRING, pending_write_count_, publication_id_, and sample_list_element_allocator_.

00856 {
00857   DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00858   if (!(sample->get_pub_id() == this->publication_id_)) {
00859     GuidConverter sample_converter(sample->get_pub_id());
00860     GuidConverter writer_converter(publication_id_);
00861     ACE_ERROR((LM_ERROR,
00862                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00863                ACE_TEXT(" The publication id %C from delivered element ")
00864                ACE_TEXT("does not match the datawriter's id %C\n"),
00865                OPENDDS_STRING(sample_converter).c_str(),
00866                OPENDDS_STRING(writer_converter).c_str()));
00867     return;
00868   }
00869   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00870   // this->data_container_->data_delivered(sample);
00871   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00872   ++data_delivered_count_;
00873 
00874   {
00875     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00876     if ((--pending_write_count_) == 0) {
00877       empty_condition_.broadcast();
00878     }
00879   }
00880 }

Here is the call graph for this function:

void OpenDDS::DCPS::ReplayerImpl::data_dropped ( const DataSampleElement sample,
bool  dropped_by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 889 of file ReplayerImpl.cpp.

References ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), data_dropped_count_, DBG_ENTRY_LVL, empty_condition_, lock_, pending_write_count_, and sample_list_element_allocator_.

00891 {
00892   DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00893   // this->data_container_->data_dropped(element, dropped_by_transport);
00894   ACE_UNUSED_ARG(dropped_by_transport);
00895   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00896   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00897   ++data_dropped_count_;
00898   {
00899     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00900     if ((--pending_write_count_) == 0) {
00901       empty_condition_.broadcast();
00902     }
00903   }
00904 }

Here is the call graph for this function:

DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id (  )  const [inline, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 108 of file ReplayerImpl.h.

00108 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::enable (  ) 

Implements DDS::Entity.

Definition at line 295 of file ReplayerImpl.cpp.

References ACE_TEXT(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), db_allocator_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataWriterQos::durability, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, mb_allocator_, n_chunks_, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

00296 {
00297   //According spec:
00298   // - Calling enable on an already enabled Entity returns OK and has no
00299   // effect.
00300   // - Calling enable on an Entity whose factory is not enabled will fail
00301   // and return PRECONDITION_NOT_MET.
00302 
00303   if (this->is_enabled()) {
00304     return DDS::RETCODE_OK;
00305   }
00306 
00307   // if (this->publisher_servant_->is_enabled() == false) {
00308   //   return DDS::RETCODE_PRECONDITION_NOT_MET;
00309   // }
00310   //
00311   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00312 
00313   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00314     n_chunks_ = qos_.resource_limits.max_samples;
00315   }
00316   // +1 because we might allocate one before releasing another
00317   // TBD - see if this +1 can be removed.
00318   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00319   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
00320   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
00321 
00322   sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_));
00323 
00324 
00325   if (DCPS_debug_level >= 2) {
00326     ACE_DEBUG((LM_DEBUG,
00327                "(%P|%t) ReplayerImpl::enable-mb"
00328                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00329                mb_allocator_.get(),
00330                n_chunks_));
00331 
00332     ACE_DEBUG((LM_DEBUG,
00333                "(%P|%t) ReplayerImpl::enable-db"
00334                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00335                db_allocator_.get(),
00336                n_chunks_));
00337 
00338     ACE_DEBUG((LM_DEBUG,
00339                "(%P|%t) ReplayerImpl::enable-header"
00340                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00341                header_allocator_.get(),
00342                n_chunks_));
00343   }
00344 
00345   this->set_enabled();
00346 
00347   try {
00348     this->enable_transport(reliable,
00349                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00350 
00351   } catch (const Transport::Exception&) {
00352     ACE_ERROR((LM_ERROR,
00353                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00354                ACE_TEXT("Transport Exception.\n")));
00355     return DDS::RETCODE_ERROR;
00356 
00357   }
00358 
00359   const TransportLocatorSeq& trans_conf_info = connection_info();
00360 
00361 
00362   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00363   this->publication_id_ =
00364     disco->add_publication(this->domain_id_,
00365                            this->participant_servant_->get_id(),
00366                            this->topic_servant_->get_id(),
00367                            this,
00368                            this->qos_,
00369                            trans_conf_info,
00370                            this->publisher_qos_);
00371 
00372   if (this->publication_id_ == GUID_UNKNOWN) {
00373     ACE_ERROR((LM_ERROR,
00374                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00375                ACE_TEXT("add_publication returned invalid id. \n")));
00376     return DDS::RETCODE_ERROR;
00377   }
00378 
00379   return DDS::RETCODE_OK;
00380 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::InstanceHandle_t OpenDDS::DCPS::ReplayerImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 1121 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.

01122 {
01123   return this->participant_servant_->id_to_handle(publication_id_);
01124 }

Here is the call graph for this function:

ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::get_listener (  )  [virtual]

Get the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 289 of file ReplayerImpl.cpp.

References listener_.

00290 {
00291   return listener_;
00292 }

CORBA::Long OpenDDS::DCPS::ReplayerImpl::get_priority_value ( const AssociationData data  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 849 of file ReplayerImpl.cpp.

References qos_, and DDS::DataWriterQos::transport_priority.

00850 {
00851   return this->qos_.transport_priority.value;
00852 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::get_qos ( DDS::PublisherQos publisher_qos,
DDS::DataWriterQos datawriter_qos 
) [virtual]

Get the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 272 of file ReplayerImpl.cpp.

References publisher_qos_, qos_, and DDS::RETCODE_OK.

00274 {
00275   qos = qos_;
00276   publisher_qos = publisher_qos_;
00277   return DDS::RETCODE_OK;
00278 }

const RepoId & OpenDDS::DCPS::ReplayerImpl::get_repo_id (  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 843 of file ReplayerImpl.cpp.

References publication_id_.

00844 {
00845   return this->publication_id_;
00846 }

void OpenDDS::DCPS::ReplayerImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 829 of file ReplayerImpl.cpp.

References topic_servant_.

00830 {
00831   topic_servant_->inconsistent_topic();
00832 }

void OpenDDS::DCPS::ReplayerImpl::init ( DDS::Topic_ptr  topic,
TopicImpl topic_servant,
const DDS::DataWriterQos qos,
ReplayerListener_rch  a_listener,
const DDS::StatusMask mask,
OpenDDS::DCPS::DomainParticipantImpl participant_servant,
const DDS::PublisherQos publisher_qos 
) [virtual]

Initialize the data members.

Definition at line 146 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::Replayer::_duplicate(), DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), is_bit_, listener_, listener_mask_, participant_servant_, publisher_qos_, qos_, topic_id_, topic_name_, topic_objref_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_name_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

00154 {
00155   DBG_ENTRY_LVL("ReplayerImpl","init",6);
00156   topic_objref_ = DDS::Topic::_duplicate(topic);
00157   topic_servant_ = topic_servant;
00158   topic_name_    = topic_servant_->get_name();
00159   topic_id_      = topic_servant_->get_id();
00160   type_name_     = topic_servant_->get_type_name();
00161 
00162 #if !defined (DDS_HAS_MINIMUM_BIT)
00163   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00164 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00165 
00166   qos_ = qos;
00167 
00168   //Note: OK to _duplicate(nil).
00169   listener_ = a_listener;
00170   listener_mask_ = mask;
00171 
00172   // Only store the participant pointer, since it is our "grand"
00173   // parent, we will exist as long as it does.
00174   participant_servant_ = participant_servant;
00175   domain_id_ = participant_servant_->get_domain_id();
00176 
00177   publisher_qos_ = publisher_qos;
00178 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the subscription repo ids.

Definition at line 1081 of file ReplayerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), LM_DEBUG, OPENDDS_STRING, and participant_servant_.

Referenced by remove_associations().

01083 {
01084   CORBA::ULong const num_rds = ids.length();
01085 
01086   if (DCPS_debug_level > 9) {
01087     OPENDDS_STRING separator;
01088     OPENDDS_STRING buffer;
01089 
01090     for (CORBA::ULong i = 0; i < num_rds; ++i) {
01091       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01092       separator = ", ";
01093     }
01094 
01095     ACE_DEBUG((LM_DEBUG,
01096                ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
01097                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01098                buffer.c_str()));
01099   }
01100 
01101   hdls.length(num_rds);
01102 
01103   for (CORBA::ULong i = 0; i < num_rds; ++i) {
01104     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01105   }
01106 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::ReplayerImpl::need_sequence_repair (  )  const [private]

Definition at line 1109 of file ReplayerImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by create_sample_data_message().

01110 {
01111   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01112        end = reader_info_.end(); it != end; ++it) {
01113     if (it->second.expected_sequence_ != sequence_number_) {
01114       return true;
01115     }
01116   }
01117   return false;
01118 }

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::notify_publication_disconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 914 of file ReplayerImpl.cpp.

00915 {
00916   ACE_UNUSED_ARG(subids);
00917 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 932 of file ReplayerImpl.cpp.

00933 {
00934   ACE_UNUSED_ARG(handles);
00935 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 926 of file ReplayerImpl.cpp.

Referenced by remove_associations().

00927 {
00928   ACE_UNUSED_ARG(subids);
00929 }

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::notify_publication_reconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 920 of file ReplayerImpl.cpp.

00921 {
00922   ACE_UNUSED_ARG(subids);
00923 }

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
SequenceNumber  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [private]
DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant (  )  [inline]

Definition at line 162 of file ReplayerImpl.h.

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer().

00162                                                 {
00163     return participant_servant_;
00164   }

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 787 of file ReplayerImpl.cpp.

00792 {
00793   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00794 }

void OpenDDS::DCPS::ReplayerImpl::remove_all_associations (  ) 

Definition at line 742 of file ReplayerImpl.cpp.

References ACE_TEXT(), LM_WARNING, lock_, pending_readers_, readers_, remove_associations(), size, and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by cleanup().

00743 {
00744   this->stop_associating();
00745 
00746   OpenDDS::DCPS::ReaderIdSeq readers;
00747   CORBA::ULong size;
00748   CORBA::ULong num_pending_readers;
00749   {
00750     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00751 
00752     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00753     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00754     readers.length(size);
00755 
00756     RepoIdSet::iterator itEnd = readers_.end();
00757     int i = 0;
00758 
00759     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00760       readers[i++] = *it;
00761     }
00762 
00763     itEnd = pending_readers_.end();
00764     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00765       readers[i++] = *it;
00766     }
00767 
00768     if (num_pending_readers > 0) {
00769       ACE_DEBUG((LM_WARNING,
00770                  ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00771                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00772                  num_pending_readers));
00773     }
00774   }
00775 
00776   try {
00777     if (0 < size) {
00778       CORBA::Boolean dont_notify_lost = false;
00779       this->remove_associations(readers, dont_notify_lost);
00780     }
00781 
00782   } catch (const CORBA::Exception&) {
00783   }
00784 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::remove_associations ( const ReaderIdSeq readers,
CORBA::Boolean  callback 
) [virtual]

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1 TODO: Should rds_len really be fully_associated_len here??

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 604 of file ReplayerImpl.cpp.

References ACE_TEXT(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), id_to_handle_map_, idToSequence_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, len, listener_, LM_DEBUG, LM_WARNING, lock_, lookup_instance_handles(), notify_publication_lost(), OPENDDS_STRING, pending_readers_, publication_id_, publication_match_status_, reader_info_, readers_, OpenDDS::DCPS::remove(), OpenDDS::DCPS::TransportClient::stop_associating(), and DDS::PublicationMatchedStatus::total_count_change.

Referenced by remove_all_associations().

00606 {
00607   if (DCPS_debug_level >= 1) {
00608     GuidConverter writer_converter(publication_id_);
00609     GuidConverter reader_converter(readers[0]);
00610     ACE_DEBUG((LM_DEBUG,
00611                ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00612                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00613                is_bit_,
00614                OPENDDS_STRING(writer_converter).c_str(),
00615                OPENDDS_STRING(reader_converter).c_str(),
00616                readers.length()));
00617   }
00618 
00619   this->stop_associating(readers.get_buffer(), readers.length());
00620 
00621   ReaderIdSeq fully_associated_readers;
00622   CORBA::ULong fully_associated_len = 0;
00623   ReaderIdSeq rds;
00624   CORBA::ULong rds_len = 0;
00625   DDS::InstanceHandleSeq handles;
00626 
00627   {
00628     // Ensure the same acquisition order as in wait_for_acknowledgments().
00629     // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
00630     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00631 
00632     //Remove the readers from fully associated reader list.
00633     //If the supplied reader is not in the cached reader list then it is
00634     //already removed. We just need remove the readers in the list that have
00635     //not been removed.
00636 
00637     CORBA::ULong len = readers.length();
00638 
00639     for (CORBA::ULong i = 0; i < len; ++i) {
00640       //Remove the readers from fully associated reader list. If it's not
00641       //in there, the association_complete() is not called yet and remove it
00642       //from pending list.
00643 
00644       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00645         ++fully_associated_len;
00646         fully_associated_readers.length(fully_associated_len);
00647         fully_associated_readers [fully_associated_len - 1] = readers[i];
00648 
00649         // Remove this reader from the ACK sequence map if its there.
00650         // This is where we need to be holding the wfaLock_ obtained
00651         // above.
00652         RepoIdToSequenceMap::iterator where
00653           = this->idToSequence_.find(readers[i]);
00654 
00655         if (where != this->idToSequence_.end()) {
00656           this->idToSequence_.erase(where);
00657 
00658           // It is possible that this subscription was causing the wait
00659           // to continue, so give the opportunity to find out.
00660           // this->wfaCondition_.broadcast();
00661         }
00662 
00663         ++rds_len;
00664         rds.length(rds_len);
00665         rds [rds_len - 1] = readers[i];
00666 
00667       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00668         ++rds_len;
00669         rds.length(rds_len);
00670         rds [rds_len - 1] = readers[i];
00671 
00672         GuidConverter converter(readers[i]);
00673         ACE_DEBUG((LM_WARNING,
00674                    ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00675                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00676                    OPENDDS_STRING(converter).c_str()));
00677       }
00678       reader_info_.erase(readers[i]);
00679       //else reader is already removed which indicates remove_association()
00680       //is called multiple times.
00681     }
00682 
00683     if (fully_associated_len > 0 && !is_bit_) {
00684       // The reader should be in the id_to_handle map at this time
00685       this->lookup_instance_handles(fully_associated_readers, handles);
00686 
00687       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00688         id_to_handle_map_.erase(fully_associated_readers[i]);
00689       }
00690     }
00691 
00692     // wfaGuard.release();
00693 
00694     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00695     // association_complete() here.
00696     if (!this->is_bit_) {
00697 
00698       // Derive the change in the number of subscriptions reading this writer.
00699       int matchedSubscriptions =
00700         static_cast<int>(this->id_to_handle_map_.size());
00701       this->publication_match_status_.current_count_change =
00702         matchedSubscriptions - this->publication_match_status_.current_count;
00703 
00704       // Only process status if the number of subscriptions has changed.
00705       if (this->publication_match_status_.current_count_change != 0) {
00706         this->publication_match_status_.current_count = matchedSubscriptions;
00707 
00708         /// Section 7.1.4.1: total_count will not decrement.
00709 
00710         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00711         /// TODO: Should rds_len really be fully_associated_len here??
00712         this->publication_match_status_.last_subscription_handle =
00713           handles[rds_len - 1];
00714 
00715 
00716         if (listener_.in()) {
00717           listener_->on_replayer_matched(
00718             this,
00719             this->publication_match_status_);
00720 
00721           // Listener consumes the change.
00722           this->publication_match_status_.total_count_change = 0;
00723           this->publication_match_status_.current_count_change = 0;
00724         }
00725 
00726       }
00727     }
00728   }
00729 
00730   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00731     this->disassociate(rds[i]);
00732   }
00733 
00734   // If this remove_association is invoked when the InfoRepo
00735   // detects a lost reader then make a callback to notify
00736   // subscription lost.
00737   if (notify_lost && handles.length() > 0) {
00738     this->notify_publication_lost(handles);
00739   }
00740 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::ReplayerImpl::retrieve_inline_qos_data ( TransportSendListener::InlineQosData qos_data  )  const [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 939 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_qos_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.

00940 {
00941   qos_data.pub_qos = this->publisher_qos_;
00942   qos_data.dw_qos = this->qos_;
00943   qos_data.topic_name = this->topic_name_.in();
00944 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_listener ( const ReplayerListener_rch a_listener,
DDS::StatusMask  mask 
) [virtual]

Change the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 281 of file ReplayerImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00283 {
00284   listener_ = a_listener;
00285   listener_mask_ = mask;
00286   return DDS::RETCODE_OK;
00287 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_qos ( const DDS::PublisherQos publisher_qos,
const DDS::DataWriterQos datawriter_qos 
) [virtual]

Set the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 181 of file ReplayerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00183 {
00184 
00185   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00186 
00187   if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00188     if (publisher_qos_ == publisher_qos)
00189       return DDS::RETCODE_OK;
00190 
00191     // for the not changeable qos, it can be changed before enable
00192     if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00193       return DDS::RETCODE_IMMUTABLE_POLICY;
00194 
00195     } else {
00196       publisher_qos_ = publisher_qos;
00197     }
00198   } else {
00199     return DDS::RETCODE_INCONSISTENT_POLICY;
00200   }
00201 
00202   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00203   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00204   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00205   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00206   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00207 
00208   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00209     if (qos_ == qos)
00210       return DDS::RETCODE_OK;
00211 
00212     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00213       return DDS::RETCODE_IMMUTABLE_POLICY;
00214 
00215     } else {
00216       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00217       // DDS::PublisherQos publisherQos;
00218       // this->publisher_servant_->get_qos(publisherQos);
00219       DDS::PublisherQos publisherQos = this->publisher_qos_;
00220       const bool status
00221         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00222                                         this->participant_servant_->get_id(),
00223                                         this->publication_id_,
00224                                         qos,
00225                                         publisherQos);
00226 
00227       if (!status) {
00228         ACE_ERROR_RETURN((LM_ERROR,
00229                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00230                           ACE_TEXT("qos not updated. \n")),
00231                          DDS::RETCODE_ERROR);
00232       }
00233     }
00234 
00235     if (!(qos_ == qos)) {
00236       // Reset the deadline timer if the period has changed.
00237       // if (qos_.deadline.period.sec != qos.deadline.period.sec
00238       //     || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00239       //   if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00240       //       && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00241       //     ACE_auto_ptr_reset(this->watchdog_,
00242       //                        new OfferedDeadlineWatchdog(
00243       //                          this->reactor_,
00244       //                          this->lock_,
00245       //                          qos.deadline,
00246       //                          this,
00247       //                          this,
00248       //                          this->offered_deadline_missed_status_,
00249       //                          this->last_deadline_missed_total_count_));
00250       //
00251       //   } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00252       //              && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00253       //     this->watchdog_->cancel_all();
00254       //     this->watchdog_.reset();
00255       //
00256       //   } else {
00257       //     this->watchdog_->reset_interval(
00258       //       duration_to_time_value(qos.deadline.period));
00259       //   }
00260       // }
00261 
00262       qos_ = qos;
00263     }
00264 
00265     return DDS::RETCODE_OK;
00266 
00267   } else {
00268     return DDS::RETCODE_INCONSISTENT_POLICY;
00269   }
00270 }

Here is the call graph for this function:

void OpenDDS::DCPS::ReplayerImpl::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 797 of file ReplayerImpl.cpp.

00800 {
00801   TransportClient::unregister_for_reader(participant, writerid, readerid);
00802 }

void OpenDDS::DCPS::ReplayerImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]
void OpenDDS::DCPS::ReplayerImpl::update_subscription_params ( const RepoId readerId,
const DDS::StringSeq exprParams 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 821 of file ReplayerImpl.cpp.

00823 {
00824   ACE_UNUSED_ARG(readerId);
00825   ACE_UNUSED_ARG(params);
00826 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample_array,
int  array_size,
DDS::InstanceHandle_t reader 
) [private]

Definition at line 947 of file ReplayerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, create_sample_data_message(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, lock_, OpenDDS::DCPS::move(), participant_servant_, pending_write_count_, OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_, reader_info_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::RawDataSample::sample_byte_order_, sample_list_element_allocator_, OpenDDS::DCPS::TransportClient::send(), sequence_number_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and OpenDDS::DCPS::RawDataSample::source_timestamp_.

00950 {
00951   DBG_ENTRY_LVL("ReplayerImpl","write",6);
00952 
00953   OpenDDS::DCPS::RepoId repo_id;
00954   if (reader_ih_ptr) {
00955     repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00956     if (repo_id == GUID_UNKNOWN) {
00957       ACE_ERROR_RETURN((LM_ERROR,
00958                         ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00959                         ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00960                        DDS::RETCODE_ERROR);
00961     }
00962   }
00963 
00964   SendStateDataSampleList list;
00965 
00966   for (int i = 0; i < num_samples; ++i) {
00967     DataSampleElement* element = 0;
00968 
00969     ACE_NEW_MALLOC_RETURN(
00970       element,
00971       static_cast<DataSampleElement*>(
00972         sample_list_element_allocator_->malloc(
00973           sizeof(DataSampleElement))),
00974       DataSampleElement(publication_id_,
00975                             this,
00976                             PublicationInstance_rch()),
00977       DDS::RETCODE_ERROR);
00978 
00979     element->get_header().byte_order_ = samples[i].sample_byte_order_;
00980     element->get_header().publication_id_ = this->publication_id_;
00981     list.enqueue_tail(element);
00982     Message_Block_Ptr temp;
00983     Message_Block_Ptr sample(samples[i].sample_->duplicate());
00984     DDS::ReturnCode_t ret = create_sample_data_message(move(sample),
00985                                                        element->get_header(),
00986                                                        temp,
00987                                                        samples[i].source_timestamp_,
00988                                                        false);
00989     element->set_sample(move(temp));
00990     if (reader_ih_ptr) {
00991       element->set_num_subs(1);
00992       element->set_sub_id(0, repo_id);
00993     }
00994 
00995     if (ret != DDS::RETCODE_OK) {
00996       // we need to free the list
00997       while (list.dequeue(element)) {
00998         ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
00999       }
01000 
01001       return ret;
01002     }
01003   }
01004 
01005   {
01006     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01007     ++pending_write_count_;
01008   }
01009 
01010   this->send(list);
01011 
01012   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01013        end = reader_info_.end(); iter != end; ++iter) {
01014     iter->second.expected_sequence_ = sequence_number_;
01015   }
01016 
01017   return DDS::RETCODE_OK;
01018 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample  )  [virtual]

Send the sample to all associated DataReaders.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1021 of file ReplayerImpl.cpp.

Referenced by write_to_reader().

01022 {
01023   return this->write(&sample, 1, 0);
01024 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSampleList &  samples 
) [virtual]

Send the samples to the specified DataReader.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1134 of file ReplayerImpl.cpp.

References DDS::RETCODE_ERROR, and write().

01136 {
01137   if (samples.size())
01138     return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01139   return DDS::RETCODE_ERROR;
01140 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSample sample 
) [virtual]

Send the sample to the specified DataReader.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1127 of file ReplayerImpl.cpp.

References write().

01129 {
01130   return write(&sample, 1, &subscription);
01131 }

Here is the call graph for this function:


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 214 of file ReplayerImpl.h.


Member Data Documentation

Definition at line 312 of file ReplayerImpl.h.

Referenced by add_association(), and association_complete().

The multiplier for allocators affected by associations.

Definition at line 189 of file ReplayerImpl.h.

Referenced by enable().

Definition at line 126 of file ReplayerImpl.h.

Referenced by data_delivered().

Statistics counter.

Definition at line 125 of file ReplayerImpl.h.

Referenced by data_dropped().

Definition at line 278 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

The domain id.

Definition at line 231 of file ReplayerImpl.h.

Referenced by add_association(), cleanup(), enable(), init(), and set_qos().

Definition at line 314 of file ReplayerImpl.h.

Referenced by cleanup(), data_delivered(), and data_dropped().

Definition at line 280 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

Definition at line 249 of file ReplayerImpl.h.

Referenced by association_complete_i(), and remove_associations().

RepoIdToSequenceMap OpenDDS::DCPS::ReplayerImpl::idToSequence_ [private]

Definition at line 310 of file ReplayerImpl.h.

Referenced by remove_associations().

The time interval for sending liveliness message.

The orb's reactor to be used to register the liveliness timer. Timestamp of last write/dispose/assert_liveliness. Total number of offered deadlines missed during last offered deadline status check. Watchdog responsible for reporting missed offered deadlines. The flag indicates whether the liveliness timer is scheduled and needs be cancelled. Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 305 of file ReplayerImpl.h.

Referenced by add_association(), association_complete(), association_complete_i(), init(), and remove_associations().

Used to notify the entity for relevant events.

Definition at line 229 of file ReplayerImpl.h.

Referenced by association_complete_i(), get_listener(), init(), remove_associations(), and set_listener().

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 227 of file ReplayerImpl.h.

Referenced by init(), and set_listener().

The sample data container.

The lock to protect the activate subscriptions and status changes.

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 245 of file ReplayerImpl.h.

Referenced by add_association(), association_complete(), association_complete_i(), cleanup(), data_delivered(), data_dropped(), remove_all_associations(), remove_associations(), update_incompatible_qos(), and write().

True if the writer failed to actively signal its liveliness within its offered liveliness period.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Definition at line 276 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

The number of chunks for the cached allocator.

Definition at line 186 of file ReplayerImpl.h.

Referenced by enable().

Status conditions.

Definition at line 256 of file ReplayerImpl.h.

Referenced by ReplayerImpl(), and update_incompatible_qos().

The participant servant which creats the publisher that creates this datawriter.

Definition at line 199 of file ReplayerImpl.h.

Referenced by add_association(), association_complete_i(), enable(), get_instance_handle(), init(), lookup_instance_handles(), set_qos(), and write().

Definition at line 315 of file ReplayerImpl.h.

Referenced by cleanup(), data_delivered(), data_dropped(), and write().

The repository id of this datawriter/publication.

Definition at line 237 of file ReplayerImpl.h.

Referenced by add_association(), association_complete(), cleanup(), data_delivered(), enable(), get_instance_handle(), get_repo_id(), remove_associations(), set_qos(), and write().

Definition at line 257 of file ReplayerImpl.h.

Referenced by association_complete_i(), remove_associations(), and ReplayerImpl().

Definition at line 234 of file ReplayerImpl.h.

Referenced by enable(), get_qos(), init(), retrieve_inline_qos_data(), and set_qos().

The publisher servant which creates this datawriter.

Definition at line 233 of file ReplayerImpl.h.

The qos policy list of this datawriter.

Definition at line 195 of file ReplayerImpl.h.

Referenced by add_association(), create_sample_data_message(), enable(), get_priority_value(), get_qos(), init(), retrieve_inline_qos_data(), and set_qos().

RepoIdToReaderInfoMap OpenDDS::DCPS::ReplayerImpl::reader_info_ [private]

Definition at line 210 of file ReplayerImpl.h.

Referenced by add_association(), need_sequence_repair(), remove_associations(), and write().

The cached allocator to allocate DataSampleElement objects.

Definition at line 284 of file ReplayerImpl.h.

Referenced by data_delivered(), data_dropped(), enable(), and write().

The sequence number unique in DataWriter scope.

Definition at line 239 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), need_sequence_repair(), and write().

The associated topic repository id.

Definition at line 219 of file ReplayerImpl.h.

Referenced by init().

The name of associated topic.

Definition at line 217 of file ReplayerImpl.h.

Referenced by init(), and retrieve_inline_qos_data().

The object reference of the associated topic.

Definition at line 221 of file ReplayerImpl.h.

Referenced by cleanup(), and init().

The topic servant.

Definition at line 223 of file ReplayerImpl.h.

Referenced by cleanup(), enable(), inconsistent_topic(), and init().

The type name of associated topic.

Definition at line 192 of file ReplayerImpl.h.

Referenced by init().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1