OpenDDS::DCPS::ReplayerImpl Class Reference

Implementation of Replayer functionality. More...

#include <ReplayerImpl.h>

Inheritance diagram for OpenDDS::DCPS::ReplayerImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReplayerImpl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ReplayerImpl ()
 ~ReplayerImpl ()
DDS::ReturnCode_t cleanup ()
virtual void init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
virtual DDS::ReturnCode_t write (const RawDataSample &sample)
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSample &sample)
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSampleList &samples)
virtual DDS::ReturnCode_t set_qos (const ::DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos)
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos)
virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
virtual ReplayerListener_rch get_listener ()
virtual bool check_transport_qos (const TransportInst &inst)
virtual const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
virtual CORBA::Long get_priority_value (const AssociationData &data) const
virtual void data_delivered (const DataSampleElement *sample)
virtual void data_dropped (const DataSampleElement *sample, bool dropped_by_transport)
virtual void control_delivered (ACE_Message_Block *sample)
virtual void control_dropped (ACE_Message_Block *sample, bool dropped_by_transport)
virtual void notify_publication_disconnected (const ReaderIdSeq &subids)
virtual void notify_publication_reconnected (const ReaderIdSeq &subids)
virtual void notify_publication_lost (const ReaderIdSeq &subids)
virtual void notify_connection_deleted (const RepoId &)
virtual void retrieve_inline_qos_data (InlineQosData &qos_data) const
virtual void add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const ReaderIdSeq &readers, CORBA::Boolean callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void update_subscription_params (const RepoId &readerId, const DDS::StringSeq &exprParams)
virtual void inconsistent_topic ()
void remove_all_associations ()
virtual void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
DDS::ReturnCode_t enable ()
DomainParticipantImplparticipant ()
virtual DDS::InstanceHandle_t get_instance_handle ()

Public Attributes

int data_dropped_count_
 Statistics counter.
int data_delivered_count_

Private Member Functions

void _add_ref ()
void _remove_ref ()
void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
DDS::ReturnCode_t write (const RawDataSample *sample_array, int array_size, DDS::InstanceHandle_t *reader)
DDS::ReturnCode_t create_sample_data_message (DataSample *data, DataSampleHeader &header_data, ACE_Message_Block *&message, const DDS::Time_t &source_timestamp, bool content_filter)
bool need_sequence_repair () const
bool lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids.
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
void association_complete_i (const RepoId &remote_id)
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap
 Flag indicates that the init() is called.

Private Attributes

size_t n_chunks_
 The number of chunks for the cached allocator.
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations.
CORBA::String_var type_name_
 The type name of associated topic.
DDS::DataWriterQos qos_
 The qos policy list of this datawriter.
DomainParticipantImplparticipant_servant_
RepoIdToReaderInfoMap reader_info_
CORBA::String_var topic_name_
 The name of associated topic.
RepoId topic_id_
 The associated topic repository id.
DDS::Topic_var topic_objref_
 The object reference of the associated topic.
TopicImpltopic_servant_
 The topic servant.
DDS::StatusMask listener_mask_
ReplayerListener_rch listener_
 Used to notify the entity for relevant events.
DDS::DomainId_t domain_id_
 The domain id.
PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter.
DDS::PublisherQos publisher_qos_
PublicationId publication_id_
 The repository id of this datawriter/publication.
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope.
ACE_Recursive_Thread_Mutex lock_
 The sample data container.
RepoIdToHandleMap id_to_handle_map_
RepoIdSet readers_
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
 Status conditions.
DDS::PublicationMatchedStatus publication_match_status_
std::auto_ptr< MessageBlockAllocatormb_allocator_
std::auto_ptr< DataBlockAllocatordb_allocator_
std::auto_ptr< DataSampleHeaderAllocatorheader_allocator_
std::auto_ptr< DataSampleElementAllocatorsample_list_element_allocator_
std::auto_ptr< TransportSendElementAllocatortransport_send_element_allocator_
std::auto_ptr< TransportCustomizedElementAllocatortransport_customized_element_allocator_
bool is_bit_
 Timestamp of last write/dispose/assert_liveliness.
RepoIdToSequenceMap idToSequence_
RepoIdSet pending_readers_
RepoIdSet assoc_complete_readers_
ACE_Condition< ACE_Recursive_Thread_Mutex > empty_condition_
int pending_write_count_

Friends

class ::DDS_TEST

Classes

struct  ReaderInfo

Detailed Description

Implementation of Replayer functionality.

This class is the implmentation of the Replayer. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 58 of file ReplayerImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ReplayerImpl::ReplayerImpl (  ) 

Definition at line 52 of file ReplayerImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, offered_incompatible_qos_status_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, and DDS::OfferedIncompatibleQosStatus::total_count_change.

00053   : data_dropped_count_(0),
00054   data_delivered_count_(0),
00055   n_chunks_(TheServiceParticipant->n_chunks()),
00056   association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00057   qos_(TheServiceParticipant->initial_DataWriterQos()),
00058   participant_servant_(0),
00059   topic_id_(GUID_UNKNOWN),
00060   topic_servant_(0),
00061   listener_mask_(DEFAULT_STATUS_MASK),
00062   domain_id_(0),
00063   publisher_servant_(0),
00064   publication_id_(GUID_UNKNOWN),
00065   sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066   // data_container_(0),
00067   // liveliness_lost_(false),
00068   // last_deadline_missed_total_count_(0),
00069   is_bit_(false),
00070   empty_condition_(lock_),
00071   pending_write_count_(0)
00072 {
00073   // liveliness_lost_status_.total_count = 0;
00074   // liveliness_lost_status_.total_count_change = 0;
00075   //
00076   // offered_deadline_missed_status_.total_count = 0;
00077   // offered_deadline_missed_status_.total_count_change = 0;
00078   // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00079 
00080   offered_incompatible_qos_status_.total_count = 0;
00081   offered_incompatible_qos_status_.total_count_change = 0;
00082   offered_incompatible_qos_status_.last_policy_id = 0;
00083   offered_incompatible_qos_status_.policies.length(0);
00084 
00085   publication_match_status_.total_count = 0;
00086   publication_match_status_.total_count_change = 0;
00087   publication_match_status_.current_count = 0;
00088   publication_match_status_.current_count_change = 0;
00089   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00090 
00091 }

OpenDDS::DCPS::ReplayerImpl::~ReplayerImpl (  ) 

Definition at line 95 of file ReplayerImpl.cpp.

References DBG_ENTRY_LVL.

00096 {
00097   DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00098 }


Member Function Documentation

void OpenDDS::DCPS::ReplayerImpl::_add_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::Replayer.

Definition at line 168 of file ReplayerImpl.h.

00168 { EntityImpl::_add_ref(); }

void OpenDDS::DCPS::ReplayerImpl::_remove_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::Replayer.

Definition at line 169 of file ReplayerImpl.h.

00169 { EntityImpl::_remove_ref(); }

void OpenDDS::DCPS::ReplayerImpl::add_association ( const RepoId yourId,
const ReaderAssociation reader,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 403 of file ReplayerImpl.cpp.

References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), is_bit_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.

00406 {
00407   DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00408 
00409   if (DCPS_debug_level >= 1) {
00410     GuidConverter writer_converter(yourId);
00411     GuidConverter reader_converter(reader.readerId);
00412     ACE_DEBUG((LM_DEBUG,
00413                ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00414                ACE_TEXT("bit %d local %C remote %C\n"),
00415                is_bit_,
00416                OPENDDS_STRING(writer_converter).c_str(),
00417                OPENDDS_STRING(reader_converter).c_str()));
00418   }
00419 
00420   // if (entity_deleted_ == true) {
00421   //   if (DCPS_debug_level >= 1)
00422   //     ACE_DEBUG((LM_DEBUG,
00423   //                ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
00424   //                ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00425   //
00426   //   return;
00427   // }
00428 
00429   if (GUID_UNKNOWN == publication_id_) {
00430     publication_id_ = yourId;
00431   }
00432 
00433   {
00434     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00435     reader_info_.insert(std::make_pair(reader.readerId,
00436                                        ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00437                                                   reader.exprParams, participant_servant_,
00438                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00439   }
00440 
00441   if (DCPS_debug_level > 4) {
00442     GuidConverter converter(publication_id_);
00443     ACE_DEBUG((LM_DEBUG,
00444                ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00445                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00446                OPENDDS_STRING(converter).c_str(),
00447                qos_.transport_priority.value));
00448   }
00449 
00450   AssociationData data;
00451   data.remote_id_ = reader.readerId;
00452   data.remote_data_ = reader.readerTransInfo;
00453   data.remote_reliable_ =
00454     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00455   data.remote_durable_ =
00456     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00457 
00458   if (!this->associate(data, active)) {
00459     //FUTURE: inform inforepo and try again as passive peer
00460     if (DCPS_debug_level) {
00461       ACE_DEBUG((LM_ERROR,
00462                  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00463                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00464     }
00465     return;
00466   }
00467 
00468   if (active) {
00469     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00470 
00471     // Have we already received an association_complete() callback?
00472     if (assoc_complete_readers_.count(reader.readerId)) {
00473       assoc_complete_readers_.erase(reader.readerId);
00474       association_complete_i(reader.readerId);
00475 
00476       // Add to pending_readers_ -> pending means we are waiting
00477       // for the association_complete() callback.
00478     } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00479       GuidConverter converter(reader.readerId);
00480       ACE_ERROR((LM_ERROR,
00481                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00482                  ACE_TEXT("failed to mark %C as pending.\n"),
00483                  OPENDDS_STRING(converter).c_str()));
00484 
00485     } else {
00486       if (DCPS_debug_level > 0) {
00487         GuidConverter converter(reader.readerId);
00488         ACE_DEBUG((LM_DEBUG,
00489                    ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00490                    ACE_TEXT("marked %C as pending.\n"),
00491                    OPENDDS_STRING(converter).c_str()));
00492       }
00493     }
00494   } else {
00495     // In the current implementation, DataWriter is always active, so this
00496     // code will not be applicable.
00497     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00498     disco->association_complete(this->domain_id_,
00499                                 this->participant_servant_->get_id(),
00500                                 this->publication_id_, reader.readerId);
00501   }
00502 }

void OpenDDS::DCPS::ReplayerImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 524 of file ReplayerImpl.cpp.

References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, pending_readers_, and OpenDDS::DCPS::remove().

00525 {
00526   DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00527 
00528   if (DCPS_debug_level >= 1) {
00529     GuidConverter writer_converter(this->publication_id_);
00530     GuidConverter reader_converter(remote_id);
00531     ACE_DEBUG((LM_DEBUG,
00532                ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00533                ACE_TEXT("bit %d local %C remote %C\n"),
00534                is_bit_,
00535                OPENDDS_STRING(writer_converter).c_str(),
00536                OPENDDS_STRING(reader_converter).c_str()));
00537   }
00538 
00539   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00540   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00541     // Not found in pending_readers_, defer calling association_complete_i()
00542     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00543     assoc_complete_readers_.insert(remote_id);
00544   } else {
00545     association_complete_i(remote_id);
00546   }
00547 }

void OpenDDS::DCPS::ReplayerImpl::association_complete_i ( const RepoId remote_id  )  [private]

Definition at line 550 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::bind(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, OPENDDS_STRING, participant_servant_, publication_match_status_, readers_, DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by add_association(), and association_complete().

00551 {
00552   DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00553   // bool reader_durable = false;
00554   {
00555     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00556     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00557       GuidConverter converter(remote_id);
00558       ACE_ERROR((LM_ERROR,
00559                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00560                  ACE_TEXT("insert %C from pending failed.\n"),
00561                  OPENDDS_STRING(converter).c_str()));
00562     }
00563     // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00564     // if (it != reader_info_.end()) {
00565     //   reader_durable = it->second.durable_;
00566     // }
00567   }
00568 
00569   if (!is_bit_) {
00570 
00571     DDS::InstanceHandle_t handle =
00572       this->participant_servant_->id_to_handle(remote_id);
00573 
00574     {
00575       // protect publication_match_status_ and status changed flags.
00576       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00577 
00578       // update the publication_match_status_
00579       ++publication_match_status_.total_count;
00580       ++publication_match_status_.total_count_change;
00581       ++publication_match_status_.current_count;
00582       ++publication_match_status_.current_count_change;
00583 
00584       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00585         GuidConverter converter(remote_id);
00586         ACE_DEBUG((LM_WARNING,
00587                    ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00588                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00589                    OPENDDS_STRING(converter).c_str(),
00590                    handle));
00591         return;
00592 
00593       } else if (DCPS_debug_level > 4) {
00594         GuidConverter converter(remote_id);
00595         ACE_DEBUG((LM_DEBUG,
00596                    ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00597                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00598                    OPENDDS_STRING(converter).c_str(),
00599                    handle));
00600       }
00601 
00602       publication_match_status_.last_subscription_handle = handle;
00603 
00604     }
00605 
00606 
00607     if (listener_.in()) {
00608       listener_->on_replayer_matched(this,
00609                                      publication_match_status_);
00610 
00611       // TBD - why does the spec say to change this but not
00612       // change the ChangeFlagStatus after a listener call?
00613       publication_match_status_.total_count_change = 0;
00614       publication_match_status_.current_count_change = 0;
00615     }
00616 
00617   }
00618 
00619 }

bool OpenDDS::DCPS::ReplayerImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 858 of file ReplayerImpl.cpp.

00859 {
00860   // DataWriter does not impose any constraints on which transports
00861   // may be used based on QoS.
00862   return true;
00863 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::cleanup (  ) 

cleanup the DataWriter.

Definition at line 102 of file ReplayerImpl.cpp.

References empty_condition_, publication_id_, remove_all_associations(), OpenDDS::DCPS::TopicImpl::remove_entity_ref(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, topic_objref_, and topic_servant_.

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer().

00103 {
00104 
00105   //     // Unregister all registered instances prior to deletion.
00106   //     // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00107   //     // this->unregister_instances(source_timestamp);
00108   //
00109   //     // CORBA::String_var topic_name = this->get_Atopic_name();
00110   {
00111     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00112 
00113     // Wait for pending samples to drain prior to removing associations
00114     // and unregistering the publication.
00115     while (this->pending_write_count_)
00116       this->empty_condition_.wait();
00117 
00118     // Call remove association before unregistering the datawriter
00119     // with the transport, otherwise some callbacks resulted from
00120     // remove_association may lost.
00121     this->remove_all_associations();
00122 
00123     // release our Topic_var
00124     topic_objref_ = DDS::Topic::_nil();
00125     topic_servant_->remove_entity_ref();
00126     topic_servant_->_remove_ref();
00127     topic_servant_ = 0;
00128 
00129   }
00130 
00131   // not just unregister but remove any pending writes/sends.
00132   // this->unregister_all();
00133 
00134   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00135   if (!disco->remove_publication(
00136         this->domain_id_,
00137         this->participant_servant_->get_id(),
00138         this->publication_id_)) {
00139     ACE_ERROR_RETURN((LM_ERROR,
00140                       ACE_TEXT("(%P|%t) ERROR: ")
00141                       ACE_TEXT("PublisherImpl::delete_datawriter, ")
00142                       ACE_TEXT("publication not removed from discovery.\n")),
00143                      DDS::RETCODE_ERROR);
00144   }
00145   return DDS::RETCODE_OK;
00146 }

void OpenDDS::DCPS::ReplayerImpl::control_delivered ( ACE_Message_Block *  sample  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 906 of file ReplayerImpl.cpp.

00907 {
00908   ACE_UNUSED_ARG(sample);
00909 }

void OpenDDS::DCPS::ReplayerImpl::control_dropped ( ACE_Message_Block *  sample,
bool  dropped_by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 930 of file ReplayerImpl.cpp.

00932 {
00933   ACE_UNUSED_ARG(sample);
00934 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::create_sample_data_message ( DataSample data,
DataSampleHeader header_data,
ACE_Message_Block *&  message,
const DDS::Time_t source_timestamp,
bool  content_filter 
) [private]

Definition at line 1055 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, db_allocator_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, and OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_.

Referenced by write().

01060 {
01061   header_data.message_id_ = SAMPLE_DATA;
01062   header_data.coherent_change_ = content_filter;
01063 
01064   header_data.content_filter_ = 0;
01065   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01066   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01067   header_data.sequence_repair_ = need_sequence_repair();
01068   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01069     this->sequence_number_ = SequenceNumber();
01070   } else {
01071     ++this->sequence_number_;
01072   }
01073   header_data.sequence_ = this->sequence_number_;
01074   header_data.source_timestamp_sec_ = source_timestamp.sec;
01075   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01076 
01077   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01078       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01079     header_data.lifespan_duration_ = true;
01080     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01081     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01082   }
01083 
01084   // header_data.publication_id_ = publication_id_;
01085   // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01086   size_t max_marshaled_size = header_data.max_marshaled_size();
01087 
01088   ACE_NEW_MALLOC_RETURN(message,
01089                         static_cast<ACE_Message_Block*>(
01090                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01091                         ACE_Message_Block(max_marshaled_size,
01092                                           ACE_Message_Block::MB_DATA,
01093                                           data,   //cont
01094                                           0,   //data
01095                                           header_allocator_.get(),   //alloc_strategy
01096                                           0,   //locking_strategy
01097                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01098                                           ACE_Time_Value::zero,
01099                                           ACE_Time_Value::max_time,
01100                                           db_allocator_.get(),
01101                                           mb_allocator_.get()),
01102                         DDS::RETCODE_ERROR);
01103 
01104   *message << header_data;
01105   return DDS::RETCODE_OK;
01106 }

void OpenDDS::DCPS::ReplayerImpl::data_delivered ( const DataSampleElement sample  )  [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 878 of file ReplayerImpl.cpp.

References data_delivered_count_, DBG_ENTRY_LVL, empty_condition_, OpenDDS::DCPS::DataSampleElement::get_pub_id(), OPENDDS_STRING, pending_write_count_, publication_id_, and sample_list_element_allocator_.

00879 {
00880   DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00881   if (!(sample->get_pub_id() == this->publication_id_)) {
00882     GuidConverter sample_converter(sample->get_pub_id());
00883     GuidConverter writer_converter(publication_id_);
00884     ACE_ERROR((LM_ERROR,
00885                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00886                ACE_TEXT(" The publication id %C from delivered element ")
00887                ACE_TEXT("does not match the datawriter's id %C\n"),
00888                OPENDDS_STRING(sample_converter).c_str(),
00889                OPENDDS_STRING(writer_converter).c_str()));
00890     return;
00891   }
00892   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00893   // this->data_container_->data_delivered(sample);
00894   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00895   ++data_delivered_count_;
00896 
00897   {
00898     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00899     if ((--pending_write_count_) == 0) {
00900       empty_condition_.broadcast();
00901     }
00902   }
00903 }

void OpenDDS::DCPS::ReplayerImpl::data_dropped ( const DataSampleElement sample,
bool  dropped_by_transport 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 912 of file ReplayerImpl.cpp.

References data_dropped_count_, DBG_ENTRY_LVL, empty_condition_, pending_write_count_, and sample_list_element_allocator_.

00914 {
00915   DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00916   // this->data_container_->data_dropped(element, dropped_by_transport);
00917   ACE_UNUSED_ARG(dropped_by_transport);
00918   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00919   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00920   ++data_dropped_count_;
00921   {
00922     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00923     if ((--pending_write_count_) == 0) {
00924       empty_condition_.broadcast();
00925     }
00926   }
00927 }

DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id (  )  const [inline, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 105 of file ReplayerImpl.h.

00105 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::enable (  ) 

Implements DDS::Entity.

Definition at line 303 of file ReplayerImpl.cpp.

References association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::LENGTH_UNLIMITED, mb_allocator_, n_chunks_, publication_id_, publisher_qos_, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_servant_, transport_customized_element_allocator_, transport_send_element_allocator_, and DDS::VOLATILE_DURABILITY_QOS.

00304 {
00305   //According spec:
00306   // - Calling enable on an already enabled Entity returns OK and has no
00307   // effect.
00308   // - Calling enable on an Entity whose factory is not enabled will fail
00309   // and return PRECONDITION_NOT_MET.
00310 
00311   if (this->is_enabled()) {
00312     return DDS::RETCODE_OK;
00313   }
00314 
00315   // if (this->publisher_servant_->is_enabled() == false) {
00316   //   return DDS::RETCODE_PRECONDITION_NOT_MET;
00317   // }
00318   //
00319   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00320 
00321   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00322     n_chunks_ = qos_.resource_limits.max_samples;
00323   }
00324   // +1 because we might allocate one before releasing another
00325   // TBD - see if this +1 can be removed.
00326   ACE_auto_ptr_reset(mb_allocator_,
00327                      new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00328   ACE_auto_ptr_reset(db_allocator_,
00329                      new DataBlockAllocator(n_chunks_+1));
00330   ACE_auto_ptr_reset(header_allocator_,
00331                      new DataSampleHeaderAllocator(n_chunks_+1));
00332 
00333   ACE_auto_ptr_reset(sample_list_element_allocator_,
00334                      new DataSampleElementAllocator(2 * n_chunks_));
00335 
00336   ACE_auto_ptr_reset(transport_send_element_allocator_,
00337                      new TransportSendElementAllocator(2 * n_chunks_,
00338                                                        sizeof(TransportSendElement)));
00339   ACE_auto_ptr_reset(transport_customized_element_allocator_,
00340                      new TransportCustomizedElementAllocator(2 * n_chunks_,
00341                                                              sizeof(TransportCustomizedElement)));
00342 
00343   if (DCPS_debug_level >= 2) {
00344     ACE_DEBUG((LM_DEBUG,
00345                "(%P|%t) ReplayerImpl::enable-mb"
00346                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00347                mb_allocator_.get(),
00348                n_chunks_));
00349 
00350     ACE_DEBUG((LM_DEBUG,
00351                "(%P|%t) ReplayerImpl::enable-db"
00352                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00353                db_allocator_.get(),
00354                n_chunks_));
00355 
00356     ACE_DEBUG((LM_DEBUG,
00357                "(%P|%t) ReplayerImpl::enable-header"
00358                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00359                header_allocator_.get(),
00360                n_chunks_));
00361   }
00362 
00363   this->set_enabled();
00364 
00365   try {
00366     this->enable_transport(reliable,
00367                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00368 
00369   } catch (const Transport::Exception&) {
00370     ACE_ERROR((LM_ERROR,
00371                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00372                ACE_TEXT("Transport Exception.\n")));
00373     return DDS::RETCODE_ERROR;
00374 
00375   }
00376 
00377   const TransportLocatorSeq& trans_conf_info = connection_info();
00378 
00379 
00380   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00381   this->publication_id_ =
00382     disco->add_publication(this->domain_id_,
00383                            this->participant_servant_->get_id(),
00384                            this->topic_servant_->get_id(),
00385                            this,
00386                            this->qos_,
00387                            trans_conf_info,
00388                            this->publisher_qos_);
00389 
00390   if (this->publication_id_ == GUID_UNKNOWN) {
00391     ACE_ERROR((LM_ERROR,
00392                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00393                ACE_TEXT("add_publication returned invalid id. \n")));
00394     return DDS::RETCODE_ERROR;
00395   }
00396 
00397   return DDS::RETCODE_OK;
00398 }

DDS::InstanceHandle_t OpenDDS::DCPS::ReplayerImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 1151 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.

01152 {
01153   return this->participant_servant_->id_to_handle(publication_id_);
01154 }

ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::get_listener (  )  [virtual]

Get the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 297 of file ReplayerImpl.cpp.

References listener_.

00298 {
00299   return listener_;
00300 }

CORBA::Long OpenDDS::DCPS::ReplayerImpl::get_priority_value ( const AssociationData data  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 872 of file ReplayerImpl.cpp.

References qos_, and DDS::DataWriterQos::transport_priority.

00873 {
00874   return this->qos_.transport_priority.value;
00875 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::get_qos ( DDS::PublisherQos publisher_qos,
DDS::DataWriterQos datawriter_qos 
) [virtual]

Get the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 280 of file ReplayerImpl.cpp.

References publisher_qos_, qos_, and DDS::RETCODE_OK.

00282 {
00283   qos = qos_;
00284   publisher_qos = publisher_qos_;
00285   return DDS::RETCODE_OK;
00286 }

const RepoId & OpenDDS::DCPS::ReplayerImpl::get_repo_id (  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 866 of file ReplayerImpl.cpp.

References publication_id_.

00867 {
00868   return this->publication_id_;
00869 }

void OpenDDS::DCPS::ReplayerImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 852 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.

00853 {
00854   topic_servant_->inconsistent_topic();
00855 }

void OpenDDS::DCPS::ReplayerImpl::init ( DDS::Topic_ptr  topic,
TopicImpl topic_servant,
const DDS::DataWriterQos qos,
ReplayerListener_rch  a_listener,
const DDS::StatusMask mask,
OpenDDS::DCPS::DomainParticipantImpl participant_servant,
const DDS::PublisherQos publisher_qos 
) [virtual]

Initialize the data members.

Definition at line 149 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), is_bit_, listener_, listener_mask_, participant_servant_, publisher_qos_, qos_, topic_id_, topic_name_, topic_objref_, topic_servant_, and type_name_.

00157 {
00158   DBG_ENTRY_LVL("ReplayerImpl","init",6);
00159   topic_objref_ = DDS::Topic::_duplicate(topic);
00160   topic_servant_ = topic_servant;
00161   topic_servant_->_add_ref();
00162   topic_servant_->add_entity_ref();
00163   topic_name_    = topic_servant_->get_name();
00164   topic_id_      = topic_servant_->get_id();
00165   type_name_     = topic_servant_->get_type_name();
00166 
00167 #if !defined (DDS_HAS_MINIMUM_BIT)
00168   is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00169             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00170             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00171             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00172 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00173 
00174   qos_ = qos;
00175 
00176   //Note: OK to _duplicate(nil).
00177   listener_ = a_listener;
00178   listener_mask_ = mask;
00179 
00180   // Only store the participant pointer, since it is our "grand"
00181   // parent, we will exist as long as it does.
00182   participant_servant_ = participant_servant;
00183   domain_id_ = participant_servant_->get_domain_id();
00184 
00185   publisher_qos_ = publisher_qos;
00186 }

bool OpenDDS::DCPS::ReplayerImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the subscription repo ids.

Definition at line 1109 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.

01111 {
01112   if (DCPS_debug_level > 9) {
01113     CORBA::ULong const size = ids.length();
01114     OPENDDS_STRING separator;
01115     OPENDDS_STRING buffer;
01116 
01117     for (unsigned long i = 0; i < size; ++i) {
01118       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01119       separator = ", ";
01120     }
01121 
01122     ACE_DEBUG((LM_DEBUG,
01123                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
01124                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01125                buffer.c_str()));
01126   }
01127 
01128   CORBA::ULong const num_rds = ids.length();
01129   hdls.length(num_rds);
01130 
01131   for (CORBA::ULong i = 0; i < num_rds; ++i) {
01132     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01133   }
01134 
01135   return true;
01136 }

bool OpenDDS::DCPS::ReplayerImpl::need_sequence_repair (  )  const [private]

Definition at line 1139 of file ReplayerImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by create_sample_data_message().

01140 {
01141   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01142        end = reader_info_.end(); it != end; ++it) {
01143     if (it->second.expected_sequence_ != sequence_number_) {
01144       return true;
01145     }
01146   }
01147   return false;
01148 }

void OpenDDS::DCPS::ReplayerImpl::notify_connection_deleted ( const RepoId  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 961 of file ReplayerImpl.cpp.

00962 {
00963 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_disconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 937 of file ReplayerImpl.cpp.

00938 {
00939   ACE_UNUSED_ARG(subids);
00940 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 955 of file ReplayerImpl.cpp.

00956 {
00957   ACE_UNUSED_ARG(handles);
00958 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 949 of file ReplayerImpl.cpp.

00950 {
00951   ACE_UNUSED_ARG(subids);
00952 }

void OpenDDS::DCPS::ReplayerImpl::notify_publication_reconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 943 of file ReplayerImpl.cpp.

00944 {
00945   ACE_UNUSED_ARG(subids);
00946 }

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
SequenceNumber  ,
GUID_tKeyLessThan   
) [private]

Flag indicates that the init() is called.

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [private]

DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant (  )  [inline]

Definition at line 160 of file ReplayerImpl.h.

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer(), OpenDDS::DCPS::ReplayerImpl::ReaderInfo::ReaderInfo(), register_for_reader(), and unregister_for_reader().

00160                                                 {
00161     return participant_servant_;
00162   }

void OpenDDS::DCPS::ReplayerImpl::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 810 of file ReplayerImpl.cpp.

References participant(), and OpenDDS::DCPS::TransportClient::register_for_reader().

00815 {
00816   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00817 }

void OpenDDS::DCPS::ReplayerImpl::remove_all_associations (  ) 

Definition at line 765 of file ReplayerImpl.cpp.

References lock_, pending_readers_, readers_, remove_associations(), and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by cleanup().

00766 {
00767   this->stop_associating();
00768 
00769   OpenDDS::DCPS::ReaderIdSeq readers;
00770   CORBA::ULong size;
00771   CORBA::ULong num_pending_readers;
00772   {
00773     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00774 
00775     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00776     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00777     readers.length(size);
00778 
00779     RepoIdSet::iterator itEnd = readers_.end();
00780     int i = 0;
00781 
00782     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00783       readers[i++] = *it;
00784     }
00785 
00786     itEnd = pending_readers_.end();
00787     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00788       readers[i++] = *it;
00789     }
00790 
00791     if (num_pending_readers > 0) {
00792       ACE_DEBUG((LM_WARNING,
00793                  ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00794                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00795                  num_pending_readers));
00796     }
00797   }
00798 
00799   try {
00800     if (0 < size) {
00801       CORBA::Boolean dont_notify_lost = false;
00802       this->remove_associations(readers, dont_notify_lost);
00803     }
00804 
00805   } catch (const CORBA::Exception&) {
00806   }
00807 }

void OpenDDS::DCPS::ReplayerImpl::remove_associations ( const ReaderIdSeq readers,
CORBA::Boolean  callback 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 622 of file ReplayerImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, id_to_handle_map_, idToSequence_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, OPENDDS_STRING, pending_readers_, publication_id_, publication_match_status_, reader_info_, readers_, OpenDDS::DCPS::remove(), OpenDDS::DCPS::TransportClient::stop_associating(), and DDS::PublicationMatchedStatus::total_count_change.

Referenced by remove_all_associations().

00624 {
00625   if (DCPS_debug_level >= 1) {
00626     GuidConverter writer_converter(publication_id_);
00627     GuidConverter reader_converter(readers[0]);
00628     ACE_DEBUG((LM_DEBUG,
00629                ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00630                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00631                is_bit_,
00632                OPENDDS_STRING(writer_converter).c_str(),
00633                OPENDDS_STRING(reader_converter).c_str(),
00634                readers.length()));
00635   }
00636 
00637   this->stop_associating(readers.get_buffer(), readers.length());
00638 
00639   ReaderIdSeq fully_associated_readers;
00640   CORBA::ULong fully_associated_len = 0;
00641   ReaderIdSeq rds;
00642   CORBA::ULong rds_len = 0;
00643   DDS::InstanceHandleSeq handles;
00644 
00645   {
00646     // Ensure the same acquisition order as in wait_for_acknowledgments().
00647     // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
00648     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00649 
00650     //Remove the readers from fully associated reader list.
00651     //If the supplied reader is not in the cached reader list then it is
00652     //already removed. We just need remove the readers in the list that have
00653     //not been removed.
00654 
00655     CORBA::ULong len = readers.length();
00656 
00657     for (CORBA::ULong i = 0; i < len; ++i) {
00658       //Remove the readers from fully associated reader list. If it's not
00659       //in there, the association_complete() is not called yet and remove it
00660       //from pending list.
00661 
00662       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00663         ++fully_associated_len;
00664         fully_associated_readers.length(fully_associated_len);
00665         fully_associated_readers [fully_associated_len - 1] = readers[i];
00666 
00667         // Remove this reader from the ACK sequence map if its there.
00668         // This is where we need to be holding the wfaLock_ obtained
00669         // above.
00670         RepoIdToSequenceMap::iterator where
00671           = this->idToSequence_.find(readers[i]);
00672 
00673         if (where != this->idToSequence_.end()) {
00674           this->idToSequence_.erase(where);
00675 
00676           // It is possible that this subscription was causing the wait
00677           // to continue, so give the opportunity to find out.
00678           // this->wfaCondition_.broadcast();
00679         }
00680 
00681         ++rds_len;
00682         rds.length(rds_len);
00683         rds [rds_len - 1] = readers[i];
00684 
00685       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00686         ++rds_len;
00687         rds.length(rds_len);
00688         rds [rds_len - 1] = readers[i];
00689 
00690         GuidConverter converter(readers[i]);
00691         ACE_DEBUG((LM_WARNING,
00692                    ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00693                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00694                    OPENDDS_STRING(converter).c_str()));
00695       }
00696       reader_info_.erase(readers[i]);
00697       //else reader is already removed which indicates remove_association()
00698       //is called multiple times.
00699     }
00700 
00701     if (fully_associated_len > 0 && !is_bit_) {
00702       // The reader should be in the id_to_handle map at this time so
00703       // log with error.
00704       if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00705         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReplayerImpl::remove_associations: "
00706                    "lookup_instance_handles failed, notify %d \n", notify_lost));
00707         return;
00708       }
00709 
00710       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00711         id_to_handle_map_.erase(fully_associated_readers[i]);
00712       }
00713     }
00714 
00715     // wfaGuard.release();
00716 
00717     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00718     // association_complete() here.
00719     if (!this->is_bit_) {
00720 
00721       // Derive the change in the number of subscriptions reading this writer.
00722       int matchedSubscriptions =
00723         static_cast<int>(this->id_to_handle_map_.size());
00724       this->publication_match_status_.current_count_change =
00725         matchedSubscriptions - this->publication_match_status_.current_count;
00726 
00727       // Only process status if the number of subscriptions has changed.
00728       if (this->publication_match_status_.current_count_change != 0) {
00729         this->publication_match_status_.current_count = matchedSubscriptions;
00730 
00731         /// Section 7.1.4.1: total_count will not decrement.
00732 
00733         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00734         /// TODO: Should rds_len really be fully_associated_len here??
00735         this->publication_match_status_.last_subscription_handle =
00736           handles[rds_len - 1];
00737 
00738 
00739         if (listener_.in()) {
00740           listener_->on_replayer_matched(
00741             this,
00742             this->publication_match_status_);
00743 
00744           // Listener consumes the change.
00745           this->publication_match_status_.total_count_change = 0;
00746           this->publication_match_status_.current_count_change = 0;
00747         }
00748 
00749       }
00750     }
00751   }
00752 
00753   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00754     this->disassociate(rds[i]);
00755   }
00756 
00757   // If this remove_association is invoked when the InfoRepo
00758   // detects a lost reader then make a callback to notify
00759   // subscription lost.
00760   if (notify_lost && handles.length() > 0) {
00761     this->notify_publication_lost(handles);
00762   }
00763 }

virtual void OpenDDS::DCPS::ReplayerImpl::retrieve_inline_qos_data ( InlineQosData &  qos_data  )  const [virtual]

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_listener ( const ReplayerListener_rch a_listener,
DDS::StatusMask  mask 
) [virtual]

Change the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 289 of file ReplayerImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00291 {
00292   listener_ = a_listener;
00293   listener_mask_ = mask;
00294   return DDS::RETCODE_OK;
00295 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_qos ( const ::DDS::PublisherQos publisher_qos,
const DDS::DataWriterQos datawriter_qos 
) [virtual]

Set the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 189 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00191 {
00192 
00193   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00194 
00195   if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00196     if (publisher_qos_ == publisher_qos)
00197       return DDS::RETCODE_OK;
00198 
00199     // for the not changeable qos, it can be changed before enable
00200     if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00201       return DDS::RETCODE_IMMUTABLE_POLICY;
00202 
00203     } else {
00204       publisher_qos_ = publisher_qos;
00205     }
00206   } else {
00207     return DDS::RETCODE_INCONSISTENT_POLICY;
00208   }
00209 
00210   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00211   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00212   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00213   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00214   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00215 
00216   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00217     if (qos_ == qos)
00218       return DDS::RETCODE_OK;
00219 
00220     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00221       return DDS::RETCODE_IMMUTABLE_POLICY;
00222 
00223     } else {
00224       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00225       // DDS::PublisherQos publisherQos;
00226       // this->publisher_servant_->get_qos(publisherQos);
00227       DDS::PublisherQos publisherQos = this->publisher_qos_;
00228       const bool status
00229         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00230                                         this->participant_servant_->get_id(),
00231                                         this->publication_id_,
00232                                         qos,
00233                                         publisherQos);
00234 
00235       if (!status) {
00236         ACE_ERROR_RETURN((LM_ERROR,
00237                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00238                           ACE_TEXT("qos not updated. \n")),
00239                          DDS::RETCODE_ERROR);
00240       }
00241     }
00242 
00243     if (!(qos_ == qos)) {
00244       // Reset the deadline timer if the period has changed.
00245       // if (qos_.deadline.period.sec != qos.deadline.period.sec
00246       //     || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00247       //   if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00248       //       && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00249       //     ACE_auto_ptr_reset(this->watchdog_,
00250       //                        new OfferedDeadlineWatchdog(
00251       //                          this->reactor_,
00252       //                          this->lock_,
00253       //                          qos.deadline,
00254       //                          this,
00255       //                          this->dw_local_objref_.in(),
00256       //                          this->offered_deadline_missed_status_,
00257       //                          this->last_deadline_missed_total_count_));
00258       //
00259       //   } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00260       //              && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00261       //     this->watchdog_->cancel_all();
00262       //     this->watchdog_.reset();
00263       //
00264       //   } else {
00265       //     this->watchdog_->reset_interval(
00266       //       duration_to_time_value(qos.deadline.period));
00267       //   }
00268       // }
00269 
00270       qos_ = qos;
00271     }
00272 
00273     return DDS::RETCODE_OK;
00274 
00275   } else {
00276     return DDS::RETCODE_INCONSISTENT_POLICY;
00277   }
00278 }

void OpenDDS::DCPS::ReplayerImpl::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 820 of file ReplayerImpl.cpp.

References participant(), and OpenDDS::DCPS::TransportClient::unregister_for_reader().

00823 {
00824   TransportClient::unregister_for_reader(participant, writerid, readerid);
00825 }

void OpenDDS::DCPS::ReplayerImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 828 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.

00829 {
00830 
00831 
00832   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00833 
00834   // copy status and increment change
00835   offered_incompatible_qos_status_.total_count = status.total_count;
00836   offered_incompatible_qos_status_.total_count_change +=
00837     status.count_since_last_send;
00838   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00839   offered_incompatible_qos_status_.policies = status.policies;
00840 
00841 }

void OpenDDS::DCPS::ReplayerImpl::update_subscription_params ( const RepoId readerId,
const DDS::StringSeq exprParams 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 844 of file ReplayerImpl.cpp.

00846 {
00847   ACE_UNUSED_ARG(readerId);
00848   ACE_UNUSED_ARG(params);
00849 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample_array,
int  array_size,
DDS::InstanceHandle_t reader 
) [private]

Definition at line 974 of file ReplayerImpl.cpp.

References create_sample_data_message(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_UNKNOWN, participant_servant_, pending_write_count_, publication_id_, reader_info_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::RawDataSample::sample_byte_order_, sample_list_element_allocator_, OpenDDS::DCPS::TransportClient::send(), sequence_number_, OpenDDS::DCPS::RawDataSample::source_timestamp_, transport_customized_element_allocator_, and transport_send_element_allocator_.

00977 {
00978   DBG_ENTRY_LVL("ReplayerImpl","write",6);
00979 
00980   OpenDDS::DCPS::RepoId repo_id;
00981   if (reader_ih_ptr) {
00982     repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00983     if (repo_id == GUID_UNKNOWN) {
00984       ACE_ERROR_RETURN((LM_ERROR,
00985                         ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00986                         ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00987                        DDS::RETCODE_ERROR);
00988     }
00989   }
00990 
00991   SendStateDataSampleList list;
00992 
00993   for (int i = 0; i < num_samples; ++i) {
00994     DataSampleElement* element = 0;
00995 
00996     ACE_NEW_MALLOC_RETURN(
00997       element,
00998       static_cast<DataSampleElement*>(
00999         sample_list_element_allocator_->malloc(
01000           sizeof(DataSampleElement))),
01001       DataSampleElement(publication_id_,
01002                             this,
01003                             0,
01004                             transport_send_element_allocator_.get(),
01005                             transport_customized_element_allocator_.get()),
01006       DDS::RETCODE_ERROR);
01007 
01008     element->get_header().byte_order_ = samples[i].sample_byte_order_;
01009     element->get_header().publication_id_ = this->publication_id_;
01010     list.enqueue_tail(element);
01011     DataSample* temp;
01012     DDS::ReturnCode_t ret = create_sample_data_message(samples[i].sample_->duplicate(),
01013                                                        element->get_header(),
01014                                                        temp,
01015                                                        samples[i].source_timestamp_,
01016                                                        false);
01017     element->set_sample(temp);
01018     if (reader_ih_ptr) {
01019       element->set_num_subs(1);
01020       element->set_sub_id(0, repo_id);
01021     }
01022 
01023     if (ret != DDS::RETCODE_OK) {
01024       // we need to free the list
01025       while (list.dequeue(element)) {
01026         ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
01027       }
01028 
01029       return ret;
01030     }
01031   }
01032 
01033   {
01034     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01035     ++pending_write_count_;
01036   }
01037 
01038   this->send(list);
01039 
01040   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01041        end = reader_info_.end(); iter != end; ++iter) {
01042     iter->second.expected_sequence_ = sequence_number_;
01043   }
01044 
01045   return DDS::RETCODE_OK;
01046 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample  )  [virtual]

Send the sample to all associated DataReaders.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1049 of file ReplayerImpl.cpp.

Referenced by write_to_reader().

01050 {
01051   return this->write(&sample, 1, 0);
01052 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSampleList &  samples 
) [virtual]

Send the samples to the specified DataReader.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1167 of file ReplayerImpl.cpp.

References DDS::RETCODE_ERROR, and write().

01169 {
01170   if (samples.size())
01171     return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01172   return DDS::RETCODE_ERROR;
01173 }

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSample sample 
) [virtual]

Send the sample to the specified DataReader.

Note:
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1160 of file ReplayerImpl.cpp.

References write().

01162 {
01163   return write(&sample, 1, &subscription);
01164 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 215 of file ReplayerImpl.h.


Member Data Documentation

RepoIdSet OpenDDS::DCPS::ReplayerImpl::assoc_complete_readers_ [private]

Definition at line 324 of file ReplayerImpl.h.

Referenced by add_association(), and association_complete().

size_t OpenDDS::DCPS::ReplayerImpl::association_chunk_multiplier_ [private]

The multiplier for allocators affected by associations.

Definition at line 190 of file ReplayerImpl.h.

Referenced by enable().

int OpenDDS::DCPS::ReplayerImpl::data_delivered_count_

Definition at line 124 of file ReplayerImpl.h.

Referenced by data_delivered().

int OpenDDS::DCPS::ReplayerImpl::data_dropped_count_

Statistics counter.

Definition at line 123 of file ReplayerImpl.h.

Referenced by data_dropped().

std::auto_ptr<DataBlockAllocator> OpenDDS::DCPS::ReplayerImpl::db_allocator_ [private]

Definition at line 279 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id_ [private]

The domain id.

Definition at line 232 of file ReplayerImpl.h.

Referenced by init(), and set_qos().

ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::ReplayerImpl::empty_condition_ [private]

Definition at line 326 of file ReplayerImpl.h.

Referenced by cleanup(), data_delivered(), and data_dropped().

std::auto_ptr<DataSampleHeaderAllocator> OpenDDS::DCPS::ReplayerImpl::header_allocator_ [private]

Definition at line 281 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

RepoIdToHandleMap OpenDDS::DCPS::ReplayerImpl::id_to_handle_map_ [private]

Definition at line 250 of file ReplayerImpl.h.

Referenced by association_complete_i(), and remove_associations().

RepoIdToSequenceMap OpenDDS::DCPS::ReplayerImpl::idToSequence_ [private]

Definition at line 322 of file ReplayerImpl.h.

Referenced by remove_associations().

bool OpenDDS::DCPS::ReplayerImpl::is_bit_ [private]

Timestamp of last write/dispose/assert_liveliness.

Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 314 of file ReplayerImpl.h.

Referenced by add_association(), association_complete(), association_complete_i(), init(), and remove_associations().

ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::listener_ [private]

Used to notify the entity for relevant events.

Definition at line 230 of file ReplayerImpl.h.

Referenced by association_complete_i(), get_listener(), init(), remove_associations(), and set_listener().

DDS::StatusMask OpenDDS::DCPS::ReplayerImpl::listener_mask_ [private]

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 228 of file ReplayerImpl.h.

Referenced by init(), and set_listener().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::ReplayerImpl::lock_ [private]

The sample data container.

The lock to protect the activate subscriptions and status changes.

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 246 of file ReplayerImpl.h.

Referenced by remove_all_associations().

std::auto_ptr<MessageBlockAllocator> OpenDDS::DCPS::ReplayerImpl::mb_allocator_ [private]

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Definition at line 277 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

size_t OpenDDS::DCPS::ReplayerImpl::n_chunks_ [private]

The number of chunks for the cached allocator.

Definition at line 187 of file ReplayerImpl.h.

Referenced by enable().

DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::ReplayerImpl::offered_incompatible_qos_status_ [private]

Status conditions.

Definition at line 257 of file ReplayerImpl.h.

Referenced by ReplayerImpl(), and update_incompatible_qos().

DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant_servant_ [private]

The participant servant which creats the publisher that creates this datawriter.

Definition at line 200 of file ReplayerImpl.h.

Referenced by add_association(), association_complete_i(), get_instance_handle(), init(), set_qos(), and write().

RepoIdSet OpenDDS::DCPS::ReplayerImpl::pending_readers_ [private]

Definition at line 324 of file ReplayerImpl.h.

Referenced by add_association(), association_complete(), remove_all_associations(), and remove_associations().

int OpenDDS::DCPS::ReplayerImpl::pending_write_count_ [private]

Definition at line 327 of file ReplayerImpl.h.

Referenced by data_delivered(), data_dropped(), and write().

PublicationId OpenDDS::DCPS::ReplayerImpl::publication_id_ [private]

The repository id of this datawriter/publication.

Definition at line 238 of file ReplayerImpl.h.

Referenced by add_association(), cleanup(), data_delivered(), enable(), get_instance_handle(), get_repo_id(), remove_associations(), set_qos(), and write().

DDS::PublicationMatchedStatus OpenDDS::DCPS::ReplayerImpl::publication_match_status_ [private]

Definition at line 258 of file ReplayerImpl.h.

Referenced by association_complete_i(), remove_associations(), and ReplayerImpl().

DDS::PublisherQos OpenDDS::DCPS::ReplayerImpl::publisher_qos_ [private]

Definition at line 235 of file ReplayerImpl.h.

Referenced by enable(), get_qos(), init(), and set_qos().

PublisherImpl* OpenDDS::DCPS::ReplayerImpl::publisher_servant_ [private]

The publisher servant which creates this datawriter.

Definition at line 234 of file ReplayerImpl.h.

DDS::DataWriterQos OpenDDS::DCPS::ReplayerImpl::qos_ [private]

The qos policy list of this datawriter.

Definition at line 196 of file ReplayerImpl.h.

Referenced by add_association(), create_sample_data_message(), enable(), get_priority_value(), get_qos(), init(), and set_qos().

RepoIdToReaderInfoMap OpenDDS::DCPS::ReplayerImpl::reader_info_ [private]

Definition at line 211 of file ReplayerImpl.h.

Referenced by add_association(), need_sequence_repair(), remove_associations(), and write().

RepoIdSet OpenDDS::DCPS::ReplayerImpl::readers_ [private]

Definition at line 252 of file ReplayerImpl.h.

Referenced by association_complete_i(), remove_all_associations(), and remove_associations().

std::auto_ptr<DataSampleElementAllocator> OpenDDS::DCPS::ReplayerImpl::sample_list_element_allocator_ [private]

The cached allocator to allocate DataSampleElement objects.

Definition at line 285 of file ReplayerImpl.h.

Referenced by data_delivered(), data_dropped(), enable(), and write().

SequenceNumber OpenDDS::DCPS::ReplayerImpl::sequence_number_ [private]

The sequence number unique in DataWriter scope.

Definition at line 240 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), need_sequence_repair(), and write().

RepoId OpenDDS::DCPS::ReplayerImpl::topic_id_ [private]

The associated topic repository id.

Definition at line 220 of file ReplayerImpl.h.

Referenced by init().

CORBA::String_var OpenDDS::DCPS::ReplayerImpl::topic_name_ [private]

The name of associated topic.

Definition at line 218 of file ReplayerImpl.h.

Referenced by init().

DDS::Topic_var OpenDDS::DCPS::ReplayerImpl::topic_objref_ [private]

The object reference of the associated topic.

Definition at line 222 of file ReplayerImpl.h.

Referenced by cleanup(), and init().

TopicImpl* OpenDDS::DCPS::ReplayerImpl::topic_servant_ [private]

The topic servant.

Definition at line 224 of file ReplayerImpl.h.

Referenced by cleanup(), enable(), inconsistent_topic(), and init().

std::auto_ptr<TransportCustomizedElementAllocator> OpenDDS::DCPS::ReplayerImpl::transport_customized_element_allocator_ [private]

Definition at line 293 of file ReplayerImpl.h.

Referenced by enable(), and write().

std::auto_ptr<TransportSendElementAllocator> OpenDDS::DCPS::ReplayerImpl::transport_send_element_allocator_ [private]

The allocator for TransportSendElement. The TransportSendElement allocator is put here because it needs the number of chunks information that WriteDataContainer has.

Definition at line 291 of file ReplayerImpl.h.

Referenced by enable(), and write().

CORBA::String_var OpenDDS::DCPS::ReplayerImpl::type_name_ [private]

The type name of associated topic.

Definition at line 193 of file ReplayerImpl.h.

Referenced by init().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:23 2016 for OpenDDS by  doxygen 1.4.7