LCOV - code coverage report
Current view: top level - DCPS - ReplayerImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 409 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 42 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : #include "ReplayerImpl.h"
      10             : #include "FeatureDisabledQosCheck.h"
      11             : #include "DomainParticipantImpl.h"
      12             : #include "PublisherImpl.h"
      13             : #include "Service_Participant.h"
      14             : #include "GuidConverter.h"
      15             : #include "TopicImpl.h"
      16             : #include "PublicationInstance.h"
      17             : #include "SendStateDataSampleList.h"
      18             : #include "DataSampleElement.h"
      19             : #include "Serializer.h"
      20             : #include "Transient_Kludge.h"
      21             : #include "DataDurabilityCache.h"
      22             : #include "MonitorFactory.h"
      23             : #include "TypeSupportImpl.h"
      24             : #include "DCPS_Utils.h"
      25             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      26             : #include "CoherentChangeControl.h"
      27             : #endif
      28             : #include "AssociationData.h"
      29             : 
      30             : #if !defined (DDS_HAS_MINIMUM_BIT)
      31             : #include "BuiltInTopicUtils.h"
      32             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
      33             : 
      34             : #include "Util.h"
      35             : 
      36             : #include "transport/framework/EntryExit.h"
      37             : #include "transport/framework/TransportExceptions.h"
      38             : #include "transport/framework/TransportSendElement.h"
      39             : #include "transport/framework/TransportCustomizedElement.h"
      40             : 
      41             : #include "ace/Reactor.h"
      42             : #include "ace/Auto_Ptr.h"
      43             : 
      44             : #include <stdexcept>
      45             : 
      46             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      47             : 
      48             : namespace OpenDDS {
      49             : namespace DCPS {
      50             : 
      51             : 
      52           0 : ReplayerImpl::ReplayerImpl()
      53           0 :   : data_dropped_count_(0),
      54           0 :   data_delivered_count_(0),
      55           0 :   n_chunks_(TheServiceParticipant->n_chunks()),
      56           0 :   association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
      57           0 :   qos_(TheServiceParticipant->initial_DataWriterQos()),
      58           0 :   participant_servant_(0),
      59           0 :   topic_id_(GUID_UNKNOWN),
      60           0 :   topic_servant_(0),
      61           0 :   listener_mask_(DEFAULT_STATUS_MASK),
      62           0 :   domain_id_(0),
      63           0 :   publisher_servant_(0),
      64           0 :   publication_id_(GUID_UNKNOWN),
      65           0 :   sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
      66             :   // data_container_(0),
      67             :   // liveliness_lost_(false),
      68             :   // last_deadline_missed_total_count_(0),
      69           0 :   is_bit_(false),
      70           0 :   empty_condition_(lock_),
      71           0 :   pending_write_count_(0)
      72             : {
      73             :   // liveliness_lost_status_.total_count = 0;
      74             :   // liveliness_lost_status_.total_count_change = 0;
      75             :   //
      76             :   // offered_deadline_missed_status_.total_count = 0;
      77             :   // offered_deadline_missed_status_.total_count_change = 0;
      78             :   // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
      79             : 
      80           0 :   offered_incompatible_qos_status_.total_count = 0;
      81           0 :   offered_incompatible_qos_status_.total_count_change = 0;
      82           0 :   offered_incompatible_qos_status_.last_policy_id = 0;
      83           0 :   offered_incompatible_qos_status_.policies.length(0);
      84             : 
      85           0 :   publication_match_status_.total_count = 0;
      86           0 :   publication_match_status_.total_count_change = 0;
      87           0 :   publication_match_status_.current_count = 0;
      88           0 :   publication_match_status_.current_count_change = 0;
      89           0 :   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
      90             : 
      91           0 : }
      92             : 
      93             : // This method is called when there are no longer any reference to the
      94             : // the servant.
      95           0 : ReplayerImpl::~ReplayerImpl()
      96             : {
      97             :   DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
      98           0 : }
      99             : 
     100             : // this method is called when delete_datawriter is called.
     101             : DDS::ReturnCode_t
     102           0 : ReplayerImpl::cleanup()
     103             : {
     104             : 
     105             :   //     // Unregister all registered instances prior to deletion.
     106             :   //     // this->unregister_instances(SystemTimePoint::now().to_dds_time());
     107             :   //
     108             :   //     // CORBA::String_var topic_name = this->get_Atopic_name();
     109             :   {
     110           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
     111             : 
     112             :     // Wait for pending samples to drain prior to removing associations
     113             :     // and unregistering the publication.
     114           0 :     ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
     115           0 :     while (this->pending_write_count_) {
     116           0 :       this->empty_condition_.wait(thread_status_manager);
     117             :     }
     118             : 
     119             :     // Call remove association before unregistering the datawriter
     120             :     // with the transport, otherwise some callbacks resulted from
     121             :     // remove_association may lost.
     122           0 :     this->remove_all_associations();
     123             : 
     124             :     // release our Topic_var
     125           0 :     topic_objref_ = DDS::Topic::_nil();
     126           0 :     topic_servant_ = 0;
     127             : 
     128           0 :   }
     129             : 
     130             :   // not just unregister but remove any pending writes/sends.
     131             :   // this->unregister_all();
     132             : 
     133           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
     134           0 :   if (!disco->remove_publication(
     135             :         this->domain_id_,
     136           0 :         this->participant_servant_->get_id(),
     137           0 :         this->publication_id_)) {
     138           0 :     ACE_ERROR_RETURN((LM_ERROR,
     139             :                       ACE_TEXT("(%P|%t) ERROR: ")
     140             :                       ACE_TEXT("PublisherImpl::delete_datawriter, ")
     141             :                       ACE_TEXT("publication not removed from discovery.\n")),
     142             :                      DDS::RETCODE_ERROR);
     143             :   }
     144           0 :   return DDS::RETCODE_OK;
     145           0 : }
     146             : 
     147             : void
     148           0 : ReplayerImpl::init(
     149             :   DDS::Topic_ptr                         topic,
     150             :   TopicImpl *                            topic_servant,
     151             :   const DDS::DataWriterQos &             qos,
     152             :   ReplayerListener_rch                   a_listener,
     153             :   const DDS::StatusMask &                mask,
     154             :   OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
     155             :   const DDS::PublisherQos&               publisher_qos)
     156             : {
     157             :   DBG_ENTRY_LVL("ReplayerImpl","init",6);
     158           0 :   topic_objref_ = DDS::Topic::_duplicate(topic);
     159           0 :   topic_servant_ = topic_servant;
     160           0 :   topic_name_    = topic_servant_->get_name();
     161           0 :   topic_id_      = topic_servant_->get_id();
     162           0 :   type_name_     = topic_servant_->get_type_name();
     163             : 
     164             : #if !defined (DDS_HAS_MINIMUM_BIT)
     165           0 :   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
     166             : #endif   // !defined (DDS_HAS_MINIMUM_BIT)
     167             : 
     168           0 :   qos_ = qos;
     169           0 :   passed_qos_ = qos;
     170             : 
     171             :   //Note: OK to _duplicate(nil).
     172           0 :   listener_ = a_listener;
     173           0 :   listener_mask_ = mask;
     174             : 
     175             :   // Only store the participant pointer, since it is our "grand"
     176             :   // parent, we will exist as long as it does.
     177           0 :   participant_servant_ = participant_servant;
     178           0 :   domain_id_ = participant_servant_->get_domain_id();
     179             : 
     180           0 :   publisher_qos_ = publisher_qos;
     181           0 : }
     182             : 
     183             : 
     184           0 : DDS::ReturnCode_t ReplayerImpl::set_qos (const DDS::PublisherQos &  publisher_qos,
     185             :                                          const DDS::DataWriterQos & qos)
     186             : {
     187             : 
     188             :   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
     189             : 
     190           0 :   if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
     191           0 :     if (publisher_qos_ == publisher_qos)
     192           0 :       return DDS::RETCODE_OK;
     193             : 
     194             :     // for the not changeable qos, it can be changed before enable
     195           0 :     if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_) {
     196           0 :       return DDS::RETCODE_IMMUTABLE_POLICY;
     197             : 
     198             :     } else {
     199           0 :       publisher_qos_ = publisher_qos;
     200             :     }
     201             :   } else {
     202           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     203             :   }
     204             : 
     205             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     206             :   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     207             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     208             :   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     209             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     210             : 
     211           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
     212           0 :     if (qos_ == qos)
     213           0 :       return DDS::RETCODE_OK;
     214             : 
     215           0 :     if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
     216           0 :       return DDS::RETCODE_IMMUTABLE_POLICY;
     217             : 
     218             :     } else {
     219           0 :       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     220             :       // DDS::PublisherQos publisherQos;
     221             :       // this->publisher_servant_->get_qos(publisherQos);
     222           0 :       DDS::PublisherQos publisherQos = this->publisher_qos_;
     223             :       const bool status
     224           0 :         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
     225           0 :                                         this->participant_servant_->get_id(),
     226           0 :                                         this->publication_id_,
     227             :                                         qos,
     228             :                                         publisherQos);
     229             : 
     230           0 :       if (!status) {
     231           0 :         ACE_ERROR_RETURN((LM_ERROR,
     232             :                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
     233             :                           ACE_TEXT("qos not updated.\n")),
     234             :                          DDS::RETCODE_ERROR);
     235             :       }
     236           0 :     }
     237             : 
     238           0 :     if (!(qos_ == qos)) {
     239             :       // Reset the deadline timer if the period has changed.
     240             :       // if (qos_.deadline.period.sec != qos.deadline.period.sec
     241             :       //     || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
     242             :       //   if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
     243             :       //       && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
     244             :       //     ACE_auto_ptr_reset(this->watchdog_,
     245             :       //                        new OfferedDeadlineWatchdog(
     246             :       //                          this->reactor_,
     247             :       //                          this->lock_,
     248             :       //                          qos.deadline,
     249             :       //                          this,
     250             :       //                          this,
     251             :       //                          this->offered_deadline_missed_status_,
     252             :       //                          this->last_deadline_missed_total_count_));
     253             :       //
     254             :       //   } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
     255             :       //              && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
     256             :       //     this->watchdog_->cancel_all();
     257             :       //     this->watchdog_.reset();
     258             :       //
     259             :       //   } else {
     260             :       //     this->watchdog_->reset_interval(
     261             :       //       duration_to_time_value(qos.deadline.period));
     262             :       //   }
     263             :       // }
     264             : 
     265           0 :       qos_ = qos;
     266             :     }
     267             : 
     268           0 :     return DDS::RETCODE_OK;
     269             : 
     270             :   } else {
     271           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     272             :   }
     273             : }
     274             : 
     275           0 : DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos &  publisher_qos,
     276             :                                          DDS::DataWriterQos & qos)
     277             : {
     278           0 :   qos = passed_qos_;
     279           0 :   publisher_qos = publisher_qos_;
     280           0 :   return DDS::RETCODE_OK;
     281             : }
     282             : 
     283             : 
     284           0 : DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
     285             :                                               DDS::StatusMask              mask)
     286             : {
     287           0 :   listener_ = a_listener;
     288           0 :   listener_mask_ = mask;
     289           0 :   return DDS::RETCODE_OK;
     290             : }
     291             : 
     292           0 : ReplayerListener_rch ReplayerImpl::get_listener ()
     293             : {
     294           0 :   return listener_;
     295             : }
     296             : 
     297             : DDS::ReturnCode_t
     298           0 : ReplayerImpl::enable()
     299             : {
     300             :   //According spec:
     301             :   // - Calling enable on an already enabled Entity returns OK and has no
     302             :   // effect.
     303             :   // - Calling enable on an Entity whose factory is not enabled will fail
     304             :   // and return PRECONDITION_NOT_MET.
     305             : 
     306           0 :   if (this->is_enabled()) {
     307           0 :     return DDS::RETCODE_OK;
     308             :   }
     309             : 
     310             :   // if (!this->publisher_servant_->is_enabled()) {
     311             :   //   return DDS::RETCODE_PRECONDITION_NOT_MET;
     312             :   // }
     313             :   //
     314           0 :   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
     315             : 
     316           0 :   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
     317           0 :     n_chunks_ = qos_.resource_limits.max_samples;
     318             :   }
     319             :   // +1 because we might allocate one before releasing another
     320             :   // TBD - see if this +1 can be removed.
     321           0 :   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
     322           0 :   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
     323           0 :   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
     324             : 
     325           0 :   sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_));
     326             : 
     327             : 
     328           0 :   if (DCPS_debug_level >= 2) {
     329           0 :     ACE_DEBUG((LM_DEBUG,
     330             :                "(%P|%t) ReplayerImpl::enable-mb"
     331             :                " Cached_Allocator_With_Overflow %x with %d chunks\n",
     332             :                mb_allocator_.get(),
     333             :                n_chunks_));
     334             : 
     335           0 :     ACE_DEBUG((LM_DEBUG,
     336             :                "(%P|%t) ReplayerImpl::enable-db"
     337             :                " Cached_Allocator_With_Overflow %x with %d chunks\n",
     338             :                db_allocator_.get(),
     339             :                n_chunks_));
     340             : 
     341           0 :     ACE_DEBUG((LM_DEBUG,
     342             :                "(%P|%t) ReplayerImpl::enable-header"
     343             :                " Cached_Allocator_With_Overflow %x with %d chunks\n",
     344             :                header_allocator_.get(),
     345             :                n_chunks_));
     346             :   }
     347             : 
     348           0 :   this->set_enabled();
     349             : 
     350             :   try {
     351           0 :     this->enable_transport(reliable,
     352           0 :                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     353             : 
     354           0 :   } catch (const Transport::Exception&) {
     355           0 :     ACE_ERROR((LM_ERROR,
     356             :                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
     357             :                ACE_TEXT("Transport Exception.\n")));
     358           0 :     return DDS::RETCODE_ERROR;
     359             : 
     360           0 :   }
     361             : 
     362           0 :   const TransportLocatorSeq& trans_conf_info = connection_info();
     363             : 
     364             : 
     365           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
     366             : 
     367           0 :   set_writer_effective_data_rep_qos(qos_.representation.value, cdr_encapsulation());
     368           0 :   if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
     369           0 :     return DDS::RETCODE_ERROR;
     370             :   }
     371             : 
     372           0 :   XTypes::TypeInformation type_info;
     373           0 :   type_info.minimal.typeid_with_size.typeobject_serialized_size = 0;
     374           0 :   type_info.minimal.dependent_typeid_count = 0;
     375           0 :   type_info.complete.typeid_with_size.typeobject_serialized_size = 0;
     376           0 :   type_info.complete.dependent_typeid_count = 0;
     377             : 
     378             :   this->publication_id_ =
     379           0 :     disco->add_publication(this->domain_id_,
     380           0 :                            this->participant_servant_->get_id(),
     381           0 :                            this->topic_servant_->get_id(),
     382           0 :                            rchandle_from(this),
     383           0 :                            this->qos_,
     384             :                            trans_conf_info,
     385           0 :                            this->publisher_qos_,
     386             :                            type_info);
     387             : 
     388           0 :   if (this->publication_id_ == GUID_UNKNOWN) {
     389           0 :     ACE_ERROR((LM_ERROR,
     390             :                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
     391             :                ACE_TEXT("add_publication returned invalid id.\n")));
     392           0 :     return DDS::RETCODE_ERROR;
     393             :   }
     394             : 
     395           0 :   return DDS::RETCODE_OK;
     396           0 : }
     397             : 
     398             : 
     399             : 
     400             : void
     401           0 : ReplayerImpl::add_association(const GUID_t&            yourId,
     402             :                               const ReaderAssociation& reader,
     403             :                               bool                     active)
     404             : {
     405             :   DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
     406             : 
     407           0 :   if (DCPS_debug_level >= 1) {
     408           0 :     ACE_DEBUG((LM_DEBUG,
     409             :                ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
     410             :                ACE_TEXT("bit %d local %C remote %C\n"),
     411             :                is_bit_,
     412             :                LogGuid(yourId).c_str(),
     413             :                LogGuid(reader.readerId).c_str()));
     414             :   }
     415             : 
     416             :   // if (entity_deleted_) {
     417             :   //   if (DCPS_debug_level >= 1)
     418             :   //     ACE_DEBUG((LM_DEBUG,
     419             :   //                ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
     420             :   //                ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
     421             :   //
     422             :   //   return;
     423             :   // }
     424             : 
     425           0 :   if (GUID_UNKNOWN == publication_id_) {
     426           0 :     publication_id_ = yourId;
     427             :   }
     428             : 
     429             :   {
     430           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     431           0 :     reader_info_.insert(std::make_pair(reader.readerId,
     432           0 :                                        ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
     433           0 :                                                   reader.exprParams, participant_servant_,
     434           0 :                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
     435           0 :   }
     436             : 
     437           0 :   if (DCPS_debug_level > 4) {
     438           0 :     ACE_DEBUG((LM_DEBUG,
     439             :                ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
     440             :                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
     441             :                LogGuid(publication_id_).c_str(),
     442             :                qos_.transport_priority.value));
     443             :   }
     444             : 
     445           0 :   AssociationData data;
     446           0 :   data.remote_id_ = reader.readerId;
     447           0 :   data.remote_data_ = reader.readerTransInfo;
     448           0 :   data.discovery_locator_ = reader.readerDiscInfo;
     449           0 :   data.remote_transport_context_ = reader.transportContext;
     450           0 :   data.remote_reliable_ =
     451           0 :     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
     452           0 :   data.remote_durable_ =
     453           0 :     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     454             : 
     455           0 :   if (!this->associate(data, active)) {
     456             :     //FUTURE: inform inforepo and try again as passive peer
     457           0 :     if (DCPS_debug_level) {
     458           0 :       ACE_ERROR((LM_ERROR,
     459             :                  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
     460             :                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
     461             :     }
     462           0 :     return;
     463             :   }
     464             : 
     465           0 :   if (active) {
     466           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     467             : 
     468           0 :     association_complete_i(reader.readerId);
     469           0 :   }
     470           0 : }
     471             : 
     472             : 
     473           0 : ReplayerImpl::ReaderInfo::ReaderInfo(const char*            filter,
     474             :                                      const DDS::StringSeq&  params,
     475             :                                      DomainParticipantImpl* participant,
     476           0 :                                      bool                   durable)
     477           0 :   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
     478           0 :   , durable_(durable)
     479             : {
     480             :   ACE_UNUSED_ARG(filter);
     481             :   ACE_UNUSED_ARG(params);
     482             :   ACE_UNUSED_ARG(participant);
     483           0 : }
     484             : 
     485             : 
     486           0 : ReplayerImpl::ReaderInfo::~ReaderInfo()
     487             : {
     488           0 : }
     489             : 
     490             : void
     491           0 : ReplayerImpl::association_complete_i(const GUID_t& remote_id)
     492             : {
     493             :   DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
     494             :   // bool reader_durable = false;
     495             :   {
     496           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     497           0 :     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
     498           0 :       ACE_ERROR((LM_ERROR,
     499             :                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
     500             :                  ACE_TEXT("insert %C from pending failed.\n"),
     501             :                  LogGuid(remote_id).c_str()));
     502             :     }
     503             :     // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
     504             :     // if (it != reader_info_.end()) {
     505             :     //   reader_durable = it->second.durable_;
     506             :     // }
     507           0 :   }
     508             : 
     509           0 :   if (!is_bit_) {
     510             : 
     511           0 :     const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(remote_id);
     512             : 
     513             :     {
     514             :       // protect publication_match_status_ and status changed flags.
     515           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     516             : 
     517             :       // update the publication_match_status_
     518           0 :       ++publication_match_status_.total_count;
     519           0 :       ++publication_match_status_.total_count_change;
     520           0 :       ++publication_match_status_.current_count;
     521           0 :       ++publication_match_status_.current_count_change;
     522             : 
     523           0 :       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
     524           0 :         ACE_DEBUG((LM_WARNING,
     525             :                    ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
     526             :                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
     527             :                    LogGuid(remote_id).c_str(),
     528             :                    handle));
     529           0 :         return;
     530             : 
     531           0 :       } else if (DCPS_debug_level > 4) {
     532           0 :         ACE_DEBUG((LM_DEBUG,
     533             :                    ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
     534             :                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
     535             :                    LogGuid(remote_id).c_str(),
     536             :                    handle));
     537             :       }
     538             : 
     539           0 :       publication_match_status_.last_subscription_handle = handle;
     540             : 
     541           0 :     }
     542             : 
     543             : 
     544           0 :     if (listener_.in()) {
     545           0 :       listener_->on_replayer_matched(this,
     546           0 :                                      publication_match_status_);
     547             : 
     548             :       // TBD - why does the spec say to change this but not
     549             :       // change the ChangeFlagStatus after a listener call?
     550           0 :       publication_match_status_.total_count_change = 0;
     551           0 :       publication_match_status_.current_count_change = 0;
     552             :     }
     553             : 
     554             :   }
     555             : 
     556             : }
     557             : 
     558             : void
     559           0 : ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
     560             :                                   CORBA::Boolean      notify_lost)
     561             : {
     562           0 :   if (DCPS_debug_level >= 1) {
     563           0 :     ACE_DEBUG((LM_DEBUG,
     564             :                ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
     565             :                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     566             :                is_bit_,
     567             :                LogGuid(publication_id_).c_str(),
     568             :                LogGuid(readers[0]).c_str(),
     569             :                readers.length()));
     570             :   }
     571             : 
     572           0 :   this->stop_associating(readers.get_buffer(), readers.length());
     573             : 
     574           0 :   ReaderIdSeq fully_associated_readers;
     575           0 :   CORBA::ULong fully_associated_len = 0;
     576           0 :   ReaderIdSeq rds;
     577           0 :   CORBA::ULong rds_len = 0;
     578           0 :   DDS::InstanceHandleSeq handles;
     579             : 
     580             :   {
     581             :     // Ensure the same acquisition order as in wait_for_acknowledgments().
     582             :     // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
     583           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     584             : 
     585             :     //Remove the readers from fully associated reader list.
     586             :     //If the supplied reader is not in the cached reader list then it is
     587             :     //already removed. We just need remove the readers in the list that have
     588             :     //not been removed.
     589             : 
     590           0 :     CORBA::ULong len = readers.length();
     591             : 
     592           0 :     for (CORBA::ULong i = 0; i < len; ++i) {
     593             :       //Remove the readers from fully associated reader list. If it's not
     594             :       //in there, the association_complete() is not called yet and remove it
     595             :       //from pending list.
     596             : 
     597           0 :       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
     598           0 :         ++fully_associated_len;
     599           0 :         fully_associated_readers.length(fully_associated_len);
     600           0 :         fully_associated_readers [fully_associated_len - 1] = readers[i];
     601             : 
     602             :         // Remove this reader from the ACK sequence map if its there.
     603             :         // This is where we need to be holding the wfaLock_ obtained
     604             :         // above.
     605             :         RepoIdToSequenceMap::iterator where
     606           0 :           = this->idToSequence_.find(readers[i]);
     607             : 
     608           0 :         if (where != this->idToSequence_.end()) {
     609           0 :           this->idToSequence_.erase(where);
     610             : 
     611             :           // It is possible that this subscription was causing the wait
     612             :           // to continue, so give the opportunity to find out.
     613             :           // this->wfaCondition_.broadcast();
     614             :         }
     615             : 
     616           0 :         ++rds_len;
     617           0 :         rds.length(rds_len);
     618           0 :         rds [rds_len - 1] = readers[i];
     619             :       }
     620           0 :       reader_info_.erase(readers[i]);
     621             :       //else reader is already removed which indicates remove_association()
     622             :       //is called multiple times.
     623             :     }
     624             : 
     625           0 :     if (fully_associated_len > 0 && !is_bit_) {
     626             :       // The reader should be in the id_to_handle map at this time
     627           0 :       this->lookup_instance_handles(fully_associated_readers, handles);
     628             : 
     629           0 :       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
     630           0 :         id_to_handle_map_.erase(fully_associated_readers[i]);
     631             :       }
     632             :     }
     633             : 
     634             :     // wfaGuard.release();
     635             : 
     636             :     // Mirror the PUBLICATION_MATCHED_STATUS processing from
     637             :     // association_complete() here.
     638           0 :     if (!this->is_bit_) {
     639             : 
     640             :       // Derive the change in the number of subscriptions reading this writer.
     641             :       int matchedSubscriptions =
     642           0 :         static_cast<int>(this->id_to_handle_map_.size());
     643           0 :       this->publication_match_status_.current_count_change =
     644           0 :         matchedSubscriptions - this->publication_match_status_.current_count;
     645             : 
     646             :       // Only process status if the number of subscriptions has changed.
     647           0 :       if (this->publication_match_status_.current_count_change != 0) {
     648           0 :         this->publication_match_status_.current_count = matchedSubscriptions;
     649             : 
     650             :         /// Section 7.1.4.1: total_count will not decrement.
     651             : 
     652             :         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
     653             :         /// TODO: Should rds_len really be fully_associated_len here??
     654           0 :         this->publication_match_status_.last_subscription_handle =
     655           0 :           handles[rds_len - 1];
     656             : 
     657             : 
     658           0 :         if (listener_.in()) {
     659           0 :           listener_->on_replayer_matched(
     660             :             this,
     661           0 :             this->publication_match_status_);
     662             : 
     663             :           // Listener consumes the change.
     664           0 :           this->publication_match_status_.total_count_change = 0;
     665           0 :           this->publication_match_status_.current_count_change = 0;
     666             :         }
     667             : 
     668             :       }
     669             :     }
     670           0 :   }
     671             : 
     672           0 :   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
     673           0 :     this->disassociate(rds[i]);
     674             :   }
     675             : 
     676             :   // If this remove_association is invoked when the InfoRepo
     677             :   // detects a lost reader then make a callback to notify
     678             :   // subscription lost.
     679           0 :   if (notify_lost && handles.length() > 0) {
     680           0 :     this->notify_publication_lost(handles);
     681             :   }
     682             : 
     683           0 :   for (unsigned int i = 0; i < handles.length(); ++i) {
     684           0 :     participant_servant_->return_handle(handles[i]);
     685             :   }
     686           0 : }
     687             : 
     688           0 : void ReplayerImpl::remove_all_associations()
     689             : {
     690           0 :   this->stop_associating();
     691             : 
     692           0 :   OpenDDS::DCPS::ReaderIdSeq readers;
     693             :   CORBA::ULong size;
     694             :   {
     695           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
     696             : 
     697           0 :     size = static_cast<CORBA::ULong>(readers_.size());
     698           0 :     readers.length(size);
     699             : 
     700           0 :     RepoIdSet::iterator itEnd = readers_.end();
     701           0 :     int i = 0;
     702             : 
     703           0 :     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
     704           0 :       readers[i++] = *it;
     705             :     }
     706           0 :   }
     707             : 
     708             :   try {
     709           0 :     if (0 < size) {
     710           0 :       CORBA::Boolean dont_notify_lost = false;
     711           0 :       this->remove_associations(readers, dont_notify_lost);
     712             :     }
     713             : 
     714           0 :   } catch (const CORBA::Exception&) {
     715           0 :   }
     716             : 
     717           0 :   transport_stop();
     718           0 : }
     719             : 
     720             : void
     721           0 : ReplayerImpl::register_for_reader(const GUID_t& participant,
     722             :                                   const GUID_t& writerid,
     723             :                                   const GUID_t& readerid,
     724             :                                   const TransportLocatorSeq& locators,
     725             :                                   DiscoveryListener* listener)
     726             : {
     727           0 :   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
     728           0 : }
     729             : 
     730             : void
     731           0 : ReplayerImpl::unregister_for_reader(const GUID_t& participant,
     732             :                                     const GUID_t& writerid,
     733             :                                     const GUID_t& readerid)
     734             : {
     735           0 :   TransportClient::unregister_for_reader(participant, writerid, readerid);
     736           0 : }
     737             : 
     738             : void
     739           0 : ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
     740             : {
     741             : 
     742             : 
     743           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     744             : 
     745             :   // copy status and increment change
     746           0 :   offered_incompatible_qos_status_.total_count = status.total_count;
     747           0 :   offered_incompatible_qos_status_.total_count_change +=
     748           0 :     status.count_since_last_send;
     749           0 :   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
     750           0 :   offered_incompatible_qos_status_.policies = status.policies;
     751             : 
     752           0 : }
     753             : 
     754             : void
     755           0 : ReplayerImpl::update_subscription_params(const GUID_t&         readerId,
     756             :                                          const DDS::StringSeq& params)
     757             : {
     758             :   ACE_UNUSED_ARG(readerId);
     759             :   ACE_UNUSED_ARG(params);
     760           0 : }
     761             : 
     762             : bool
     763           0 : ReplayerImpl::check_transport_qos(const TransportInst&)
     764             : {
     765             :   // DataWriter does not impose any constraints on which transports
     766             :   // may be used based on QoS.
     767           0 :   return true;
     768             : }
     769             : 
     770           0 : GUID_t ReplayerImpl::get_guid() const
     771             : {
     772           0 :   return this->publication_id_;
     773             : }
     774             : 
     775             : CORBA::Long
     776           0 : ReplayerImpl::get_priority_value(const AssociationData&) const
     777             : {
     778           0 :   return this->qos_.transport_priority.value;
     779             : }
     780             : 
     781             : void
     782           0 : ReplayerImpl::data_delivered(const DataSampleElement* sample)
     783             : {
     784             :   DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
     785           0 :   if (!(sample->get_pub_id() == this->publication_id_)) {
     786           0 :     ACE_ERROR((LM_ERROR,
     787             :                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
     788             :                ACE_TEXT(" The publication id %C from delivered element ")
     789             :                ACE_TEXT("does not match the datawriter's id %C\n"),
     790             :                LogGuid(sample->get_pub_id()).c_str(),
     791             :                LogGuid(publication_id_).c_str()));
     792           0 :     return;
     793             :   }
     794           0 :   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
     795             :   // this->data_container_->data_delivered(sample);
     796           0 :   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
     797           0 :   ++data_delivered_count_;
     798             : 
     799             :   {
     800           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     801           0 :     if (--pending_write_count_ == 0) {
     802           0 :       empty_condition_.notify_all();
     803             :     }
     804           0 :   }
     805             : }
     806             : 
     807             : void
     808           0 : ReplayerImpl::control_delivered(const Message_Block_Ptr& sample)
     809             : {
     810             :   ACE_UNUSED_ARG(sample);
     811           0 : }
     812             : 
     813             : void
     814           0 : ReplayerImpl::data_dropped(const DataSampleElement* sample,
     815             :                            bool                         dropped_by_transport)
     816             : {
     817             :   DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
     818             :   // this->data_container_->data_dropped(element, dropped_by_transport);
     819             :   ACE_UNUSED_ARG(dropped_by_transport);
     820           0 :   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
     821           0 :   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
     822           0 :   ++data_dropped_count_;
     823             :   {
     824           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     825           0 :     if ((--pending_write_count_) == 0) {
     826           0 :       empty_condition_.notify_all();
     827             :     }
     828           0 :   }
     829             : }
     830             : 
     831             : void
     832           0 : ReplayerImpl::control_dropped(const Message_Block_Ptr& sample,
     833             :                               bool /* dropped_by_transport */)
     834             : {
     835             :   ACE_UNUSED_ARG(sample);
     836           0 : }
     837             : 
     838             : void
     839           0 : ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
     840             : {
     841             :   ACE_UNUSED_ARG(subids);
     842           0 : }
     843             : 
     844             : void
     845           0 : ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
     846             : {
     847             :   ACE_UNUSED_ARG(subids);
     848           0 : }
     849             : 
     850             : void
     851           0 : ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
     852             : {
     853             :   ACE_UNUSED_ARG(subids);
     854           0 : }
     855             : 
     856             : void
     857           0 : ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
     858             : {
     859             :   ACE_UNUSED_ARG(handles);
     860           0 : }
     861             : 
     862             : 
     863             : void
     864           0 : ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
     865             : {
     866           0 :   qos_data.pub_qos = this->publisher_qos_;
     867           0 :   qos_data.dw_qos = this->qos_;
     868           0 :   qos_data.topic_name = this->topic_name_.in();
     869           0 : }
     870             : 
     871             : DDS::ReturnCode_t
     872           0 : ReplayerImpl::write (const RawDataSample*   samples,
     873             :                      int                    num_samples,
     874             :                      DDS::InstanceHandle_t* reader_ih_ptr)
     875             : {
     876             :   DBG_ENTRY_LVL("ReplayerImpl","write",6);
     877             : 
     878             :   OpenDDS::DCPS::GUID_t repo_id;
     879           0 :   if (reader_ih_ptr) {
     880           0 :     repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
     881           0 :     if (repo_id == GUID_UNKNOWN) {
     882           0 :       ACE_ERROR_RETURN((LM_ERROR,
     883             :                         ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
     884             :                         ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
     885             :                        DDS::RETCODE_ERROR);
     886             :     }
     887             :   }
     888             : 
     889           0 :   SendStateDataSampleList list;
     890             : 
     891           0 :   for (int i = 0; i < num_samples; ++i) {
     892           0 :     DataSampleElement* element = 0;
     893             : 
     894           0 :     ACE_NEW_MALLOC_RETURN(
     895             :       element,
     896             :       static_cast<DataSampleElement*>(
     897             :         sample_list_element_allocator_->malloc(
     898             :           sizeof(DataSampleElement))),
     899             :       DataSampleElement(publication_id_,
     900             :                             this,
     901             :                             PublicationInstance_rch()),
     902             :       DDS::RETCODE_ERROR);
     903             : 
     904           0 :     element->get_header().byte_order_ = samples[i].sample_byte_order_;
     905           0 :     element->get_header().publication_id_ = this->publication_id_;
     906           0 :     list.enqueue_tail(element);
     907           0 :     Message_Block_Ptr temp;
     908           0 :     Message_Block_Ptr sample(samples[i].sample_->duplicate());
     909           0 :     DDS::ReturnCode_t ret = create_sample_data_message(move(sample),
     910             :                                                        element->get_header(),
     911             :                                                        temp,
     912           0 :                                                        samples[i].source_timestamp_,
     913             :                                                        false);
     914           0 :     element->set_sample(move(temp));
     915           0 :     if (reader_ih_ptr) {
     916           0 :       element->set_num_subs(1);
     917           0 :       element->set_sub_id(0, repo_id);
     918             :     }
     919             : 
     920           0 :     if (ret != DDS::RETCODE_OK) {
     921             :       // we need to free the list
     922           0 :       while (list.dequeue(element)) {
     923           0 :         ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
     924             :       }
     925             : 
     926           0 :       return ret;
     927             :     }
     928           0 :   }
     929             : 
     930             :   {
     931           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
     932           0 :     ++pending_write_count_;
     933           0 :   }
     934             : 
     935           0 :   this->send(list);
     936             : 
     937           0 :   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
     938           0 :        end = reader_info_.end(); iter != end; ++iter) {
     939           0 :     iter->second.expected_sequence_ = sequence_number_;
     940             :   }
     941             : 
     942           0 :   return DDS::RETCODE_OK;
     943           0 : }
     944             : 
     945             : DDS::ReturnCode_t
     946           0 : ReplayerImpl::write(const RawDataSample& sample)
     947             : {
     948           0 :   return this->write(&sample, 1, 0);
     949             : }
     950             : 
     951             : DDS::ReturnCode_t
     952           0 : ReplayerImpl::create_sample_data_message(Message_Block_Ptr   data,
     953             :                                          DataSampleHeader&   header_data,
     954             :                                          Message_Block_Ptr&  message,
     955             :                                          const DDS::Time_t&  source_timestamp,
     956             :                                          bool                content_filter)
     957             : {
     958           0 :   header_data.message_id_ = SAMPLE_DATA;
     959           0 :   header_data.coherent_change_ = content_filter;
     960             : 
     961           0 :   header_data.content_filter_ = false;
     962           0 :   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
     963           0 :   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
     964           0 :   header_data.sequence_repair_ = need_sequence_repair();
     965           0 :   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
     966           0 :     this->sequence_number_ = SequenceNumber();
     967             :   } else {
     968           0 :     ++this->sequence_number_;
     969             :   }
     970           0 :   header_data.sequence_ = this->sequence_number_;
     971           0 :   header_data.source_timestamp_sec_ = source_timestamp.sec;
     972           0 :   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
     973             : 
     974           0 :   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
     975           0 :       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
     976           0 :     header_data.lifespan_duration_ = true;
     977           0 :     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
     978           0 :     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
     979             :   }
     980             : 
     981             :   // header_data.publication_id_ = publication_id_;
     982             :   // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
     983             :   ACE_Message_Block* tmp;
     984           0 :   ACE_NEW_MALLOC_RETURN(tmp,
     985             :                         static_cast<ACE_Message_Block*>(
     986             :                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
     987             :                         ACE_Message_Block(DataSampleHeader::get_max_serialized_size(),
     988             :                                           ACE_Message_Block::MB_DATA,
     989             :                                           data.release(),   //cont
     990             :                                           0,   //data
     991             :                                           header_allocator_.get(),   //alloc_strategy
     992             :                                           0,   //locking_strategy
     993             :                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
     994             :                                           ACE_Time_Value::zero,
     995             :                                           ACE_Time_Value::max_time,
     996             :                                           db_allocator_.get(),
     997             :                                           mb_allocator_.get()),
     998             :                         DDS::RETCODE_ERROR);
     999           0 :   message.reset(tmp);
    1000           0 :   *message << header_data;
    1001           0 :   return DDS::RETCODE_OK;
    1002             : }
    1003             : 
    1004             : void
    1005           0 : ReplayerImpl::lookup_instance_handles(const ReaderIdSeq&       ids,
    1006             :                                       DDS::InstanceHandleSeq & hdls)
    1007             : {
    1008           0 :   CORBA::ULong const num_rds = ids.length();
    1009             : 
    1010           0 :   if (DCPS_debug_level > 9) {
    1011           0 :     OPENDDS_STRING separator;
    1012           0 :     OPENDDS_STRING buffer;
    1013             : 
    1014           0 :     for (CORBA::ULong i = 0; i < num_rds; ++i) {
    1015           0 :       buffer += separator + LogGuid(ids[i]).conv_;
    1016           0 :       separator = ", ";
    1017             :     }
    1018             : 
    1019           0 :     ACE_DEBUG((LM_DEBUG,
    1020             :                ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
    1021             :                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
    1022             :                buffer.c_str()));
    1023           0 :   }
    1024             : 
    1025           0 :   hdls.length(num_rds);
    1026             : 
    1027           0 :   for (CORBA::ULong i = 0; i < num_rds; ++i) {
    1028           0 :     hdls[i] = participant_servant_->lookup_handle(ids[i]);
    1029             :   }
    1030           0 : }
    1031             : 
    1032             : bool
    1033           0 : ReplayerImpl::need_sequence_repair() const
    1034             : {
    1035           0 :   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
    1036           0 :        end = reader_info_.end(); it != end; ++it) {
    1037           0 :     if (it->second.expected_sequence_ != sequence_number_) {
    1038           0 :       return true;
    1039             :     }
    1040             :   }
    1041           0 :   return false;
    1042             : }
    1043             : 
    1044             : DDS::InstanceHandle_t
    1045           0 : ReplayerImpl::get_instance_handle()
    1046             : {
    1047           0 :   return get_entity_instance_handle(publication_id_, rchandle_from(participant_servant_));
    1048             : }
    1049             : 
    1050             : DDS::ReturnCode_t
    1051           0 : ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
    1052             :                                const RawDataSample&  sample )
    1053             : {
    1054           0 :   return write(&sample, 1, &subscription);
    1055             : }
    1056             : 
    1057             : DDS::ReturnCode_t
    1058           0 : ReplayerImpl::write_to_reader (DDS::InstanceHandle_t    subscription,
    1059             :                                const RawDataSampleList& samples )
    1060             : {
    1061           0 :   if (!samples.empty())
    1062           0 :     return write(&samples[0], static_cast<int>(samples.size()), &subscription);
    1063           0 :   return DDS::RETCODE_ERROR;
    1064             : }
    1065             : 
    1066             : } // namespace DCPS
    1067             : } // namespace
    1068             : 
    1069             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16