LCOV - code coverage report
Current view: top level - DCPS - DataReaderImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 1872 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 146 0.0 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       7             : 
       8             : #include "DataReaderImpl.h"
       9             : 
      10             : #include "SubscriptionInstance.h"
      11             : #include "ReceivedDataElementList.h"
      12             : #include "DomainParticipantImpl.h"
      13             : #include "Service_Participant.h"
      14             : #include "Qos_Helper.h"
      15             : #include "FeatureDisabledQosCheck.h"
      16             : #include "GuidConverter.h"
      17             : #include "TopicImpl.h"
      18             : #include "Serializer.h"
      19             : #include "SubscriberImpl.h"
      20             : #include "Transient_Kludge.h"
      21             : #include "Util.h"
      22             : #include "DCPS_Utils.h"
      23             : #include "QueryConditionImpl.h"
      24             : #include "ReadConditionImpl.h"
      25             : #include "MonitorFactory.h"
      26             : #include "transport/framework/EntryExit.h"
      27             : #include "transport/framework/TransportExceptions.h"
      28             : #include "SafetyProfileStreams.h"
      29             : #include "TypeSupportImpl.h"
      30             : #include "XTypes/TypeObject.h"
      31             : #ifndef DDS_HAS_MINIMUM_BIT
      32             : #  include "BuiltInTopicUtils.h"
      33             : #endif
      34             : 
      35             : #ifndef DDS_HAS_MINIMUM_BIT
      36             : #  include <dds/DdsDcpsCoreTypeSupportC.h>
      37             : #endif
      38             : #include <dds/DdsDcpsCoreC.h>
      39             : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
      40             : 
      41             : #include <ace/Reactor.h>
      42             : #include <ace/Auto_Ptr.h>
      43             : #include <ace/OS_NS_sys_time.h>
      44             : 
      45             : #include <cstdio>
      46             : #include <stdexcept>
      47             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      48             : #  include <sstream>
      49             : #endif
      50             : 
      51             : #ifndef __ACE_INLINE__
      52             : #  include "DataReaderImpl.inl"
      53             : #endif
      54             : 
      55             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      56             : 
      57             : namespace OpenDDS {
      58             : namespace DCPS {
      59             : 
      60           0 : DataReaderImpl::DataReaderImpl()
      61           0 :   : has_subscription_id_(false)
      62           0 :   , subscription_id_mutex_()
      63           0 :   , subscription_id_condition_(subscription_id_mutex_)
      64           0 :   , qos_(TheServiceParticipant->initial_DataReaderQos())
      65           0 :   , reverse_sample_lock_(sample_lock_)
      66           0 :   , topic_servant_(0)
      67           0 :   , type_support_(0)
      68           0 :   , topic_id_(GUID_UNKNOWN)
      69             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
      70           0 :   , is_exclusive_ownership_(false)
      71             : #endif
      72           0 :   , coherent_(false)
      73           0 :   , subqos_(TheServiceParticipant->initial_SubscriberQos())
      74           0 :   , topic_desc_(0)
      75           0 :   , listener_mask_(DEFAULT_STATUS_MASK)
      76           0 :   , domain_id_(0)
      77           0 :   , end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
      78           0 :   , n_chunks_(TheServiceParticipant->n_chunks())
      79           0 :   , reactor_(0)
      80           0 :   , liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
      81           0 :   , last_deadline_missed_total_count_(0)
      82           0 :   , deadline_queue_enabled_(false)
      83           0 :   , deadline_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl::deadline_task))
      84           0 :   , is_bit_(false)
      85           0 :   , always_get_history_(false)
      86           0 :   , statistics_enabled_(false)
      87           0 :   , raw_latency_buffer_size_(0)
      88           0 :   , raw_latency_buffer_type_(DataCollector<double>::KeepOldest)
      89           0 :   , transport_disabled_(false)
      90           0 :   , mb_alloc_(DEFAULT_TRANSPORT_RECEIVE_BUFFERS)
      91             : {
      92           0 :   reactor_ = TheServiceParticipant->timer();
      93             : 
      94           0 :   liveliness_changed_status_.alive_count = 0;
      95           0 :   liveliness_changed_status_.not_alive_count = 0;
      96           0 :   liveliness_changed_status_.alive_count_change = 0;
      97           0 :   liveliness_changed_status_.not_alive_count_change = 0;
      98           0 :   liveliness_changed_status_.last_publication_handle =
      99             :       DDS::HANDLE_NIL;
     100             : 
     101           0 :   requested_deadline_missed_status_.total_count = 0;
     102           0 :   requested_deadline_missed_status_.total_count_change = 0;
     103           0 :   requested_deadline_missed_status_.last_instance_handle =
     104             :       DDS::HANDLE_NIL;
     105             : 
     106           0 :   requested_incompatible_qos_status_.total_count = 0;
     107           0 :   requested_incompatible_qos_status_.total_count_change = 0;
     108           0 :   requested_incompatible_qos_status_.last_policy_id = 0;
     109           0 :   requested_incompatible_qos_status_.policies.length(0);
     110             : 
     111           0 :   subscription_match_status_.total_count = 0;
     112           0 :   subscription_match_status_.total_count_change = 0;
     113           0 :   subscription_match_status_.current_count = 0;
     114           0 :   subscription_match_status_.current_count_change = 0;
     115           0 :   subscription_match_status_.last_publication_handle =
     116             :       DDS::HANDLE_NIL;
     117             : 
     118           0 :   sample_lost_status_.total_count = 0;
     119           0 :   sample_lost_status_.total_count_change = 0;
     120             : 
     121           0 :   sample_rejected_status_.total_count = 0;
     122           0 :   sample_rejected_status_.total_count_change = 0;
     123           0 :   sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
     124           0 :   sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
     125             : 
     126           0 :   this->budget_exceeded_status_.total_count = 0;
     127           0 :   this->budget_exceeded_status_.total_count_change = 0;
     128           0 :   this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
     129             : 
     130           0 :   monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this));
     131           0 :   periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this));
     132           0 : }
     133             : 
     134             : // This method is called when there are no longer any reference to the
     135             : // the servant.
     136           0 : DataReaderImpl::~DataReaderImpl()
     137             : {
     138             :   DBG_ENTRY_LVL("DataReaderImpl", "~DataReaderImpl", 6);
     139             : 
     140           0 :   deadline_task_->cancel();
     141             : 
     142             : #ifndef OPENDDS_SAFETY_PROFILE
     143           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     144           0 :   if (participant) {
     145           0 :     XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
     146           0 :     if (type_lookup_service) {
     147           0 :       type_lookup_service->remove_guid_from_dynamic_map(subscription_id_);
     148             :     }
     149           0 :   }
     150             : #endif
     151           0 : }
     152             : 
     153             : // this method is called when delete_datareader is called.
     154             : void
     155           0 : DataReaderImpl::cleanup()
     156             : {
     157             :   // As first step set our listener to nill which will prevent us from calling
     158             :   // back onto the listener at the moment the related DDS entity has been
     159             :   // deleted
     160           0 :   set_listener(0, NO_STATUS_MASK);
     161             : 
     162             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     163           0 :   OwnershipManagerPtr owner_manager = this->ownership_manager();
     164           0 :   if (owner_manager) {
     165           0 :     owner_manager->unregister_reader(topic_servant_->type_name(), this);
     166             :   }
     167             : #endif
     168             : 
     169           0 :   topic_servant_ = 0;
     170             : 
     171             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     172             :   {
     173           0 :     ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
     174           0 :     content_filtered_topic_ = 0;
     175           0 :   }
     176             : #endif
     177             : 
     178             : #ifndef OPENDDS_NO_MULTI_TOPIC
     179           0 :   multi_topic_ = 0;
     180             : #endif
     181             : 
     182           0 : }
     183             : 
     184           0 : void DataReaderImpl::init(
     185             :     TopicDescriptionImpl* topic_desc,
     186             :     const DDS::DataReaderQos &qos,
     187             :     DDS::DataReaderListener_ptr listener,
     188             :     const DDS::StatusMask & mask,
     189             :     DomainParticipantImpl* participant,
     190             :     SubscriberImpl* subscriber)
     191             : {
     192           0 :   topic_desc_ = DDS::TopicDescription::_duplicate(topic_desc);
     193           0 :   if (TopicImpl* topic = dynamic_cast<TopicImpl*>(topic_desc)) {
     194           0 :     topic_servant_ = topic;
     195           0 :     type_support_ = dynamic_cast<TypeSupportImpl*>(topic->get_type_support());
     196           0 :     topic_id_ = topic->get_id();
     197             :   }
     198             : 
     199             : #ifndef DDS_HAS_MINIMUM_BIT
     200           0 :   CORBA::String_var topic_name = topic_desc->get_name();
     201           0 :   CORBA::String_var topic_type_name = topic_desc->get_type_name();
     202           0 :   is_bit_ = topicIsBIT(topic_name, topic_type_name);
     203             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
     204             : 
     205           0 :   qos_ = qos;
     206           0 :   passed_qos_ = qos;
     207             : 
     208             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     209           0 :   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
     210             : #endif
     211             : 
     212           0 :   set_listener(listener, mask);
     213             : 
     214             :   // Only store the participant pointer, since it is our "grand"
     215             :   // parent, we will exist as long as it does
     216           0 :   participant_servant_ = *participant;
     217             : 
     218           0 :   domain_id_ = participant->get_domain_id();
     219             : 
     220           0 :   subscriber_servant_ = rchandle_from(subscriber);
     221             : 
     222           0 :   if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
     223           0 :     ACE_DEBUG((LM_WARNING,
     224             :         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
     225             :         ACE_TEXT("failed to get SubscriberQos\n")));
     226             :   }
     227           0 : }
     228             : 
     229             : DDS::InstanceHandle_t
     230           0 : DataReaderImpl::get_instance_handle()
     231             : {
     232           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     233           0 :   return get_entity_instance_handle(subscription_id_, participant);
     234           0 : }
     235             : 
     236             : void
     237           0 : DataReaderImpl::add_association(const GUID_t& yourId,
     238             :     const WriterAssociation& writer,
     239             :     bool active)
     240             : {
     241           0 :   if (DCPS_debug_level) {
     242           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
     243             :         ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
     244             :         LogGuid(yourId).c_str(),
     245             :         LogGuid(writer.writerId).c_str()));
     246             :   }
     247             : 
     248           0 :   if (get_deleted()) {
     249           0 :     if (DCPS_debug_level) {
     250           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
     251             :           ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
     252             :     }
     253           0 :     return;
     254             :   }
     255             : 
     256             :   // We are being called back from the repository before we are done
     257             :   // processing after our call to the repository that caused this call
     258             :   // (from the repository) to be made.
     259             :   {
     260           0 :     ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
     261           0 :     if (GUID_UNKNOWN == subscription_id_) {
     262           0 :       subscription_id_ = yourId;
     263           0 :       has_subscription_id_ = true;
     264           0 :       subscription_id_condition_.notify_all();
     265             :     }
     266           0 :   }
     267             : 
     268             :   // For each writer in the list of writers to associate with, we
     269             :   // create a WriterInfo and a WriterStats object and store them in
     270             :   // our internal maps.
     271             :   //
     272             :   {
     273             : 
     274           0 :     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
     275             : 
     276           0 :     const GUID_t& writer_id = writer.writerId;
     277           0 :     WriterInfo_rch info = make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos);
     278           0 :     std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
     279             :         // This insertion is idempotent.
     280           0 :         WriterMapType::value_type(
     281             :           writer_id,
     282             :           info));
     283             : 
     284             :     // Schedule timer if necessary
     285             :     //   - only need to check reader qos - we know the writer must be >= reader
     286           0 :     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
     287           0 :       info->waiting_for_end_historic_samples(true);
     288             :     }
     289             : 
     290             :     {
     291           0 :       ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
     292           0 :       statistics_.insert(
     293           0 :         StatsMapType::value_type(
     294             :           writer_id,
     295           0 :           WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
     296           0 :     }
     297             : 
     298             :     // If this is a durable reader
     299           0 :     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
     300             :       // TODO schedule timer for removing flag from writers
     301             :     }
     302             : 
     303           0 :     if (DCPS_debug_level > 4) {
     304           0 :       ACE_DEBUG((LM_DEBUG,
     305             :           "(%P|%t) DataReaderImpl::add_association: "
     306             :           "inserted writer %C.return %d\n",
     307             :           LogGuid(writer_id).c_str(), bpair.second));
     308             : 
     309           0 :       WriterMapType::iterator iter = writers_.find(writer_id);
     310           0 :       if (iter != writers_.end()) {
     311             :         // This may not be an error since it could happen that the sample
     312             :         // is delivered to the datareader after the write is dis-associated
     313             :         // with this datareader.
     314           0 :         ACE_DEBUG((LM_DEBUG,
     315             :             ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
     316             :             ACE_TEXT("reader %C is associated with writer %C.\n"),
     317             :             LogGuid(get_guid()).c_str(),
     318             :             LogGuid(writer_id).c_str()));
     319             :       }
     320             :     }
     321           0 :   }
     322             : 
     323             :   // Propagate the add_associations processing down into the Transport
     324             :   // layer here.  This will establish the transport support and reserve
     325             :   // usage of an existing connection or initiate creation of a new
     326             :   // connection if no suitable connection is available.
     327           0 :   AssociationData data;
     328           0 :   data.remote_id_ = writer.writerId;
     329           0 :   data.remote_data_ = writer.writerTransInfo;
     330           0 :   data.discovery_locator_ = writer.writerDiscInfo;
     331           0 :   data.participant_discovered_at_ = writer.participantDiscoveredAt;
     332           0 :   data.remote_transport_context_ = writer.transportContext;
     333           0 :   data.publication_transport_priority_ =
     334           0 :       writer.writerQos.transport_priority.value;
     335           0 :   data.remote_reliable_ =
     336           0 :       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
     337           0 :   data.remote_durable_ =
     338           0 :       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     339             : 
     340           0 :   if (associate(data, active)) {
     341           0 :     const Observer_rch observer = get_observer(Observer::e_ASSOCIATED);
     342           0 :     if (observer) {
     343           0 :       observer->on_associated(this, data.remote_id_);
     344             :     }
     345           0 :   } else {
     346           0 :     if (DCPS_debug_level) {
     347           0 :       ACE_ERROR((LM_ERROR,
     348             :           ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
     349             :           ACE_TEXT("ERROR: transport layer failed to associate.\n")));
     350             :     }
     351             :   }
     352           0 : }
     353             : 
     354             : void
     355           0 : DataReaderImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
     356             : {
     357           0 :   if (!(flags & ASSOC_OK)) {
     358           0 :     if (DCPS_debug_level) {
     359           0 :       ACE_ERROR((LM_ERROR,
     360             :           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
     361             :           ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
     362             :           LogGuid(remote_id).c_str()));
     363             :     }
     364           0 :     return;
     365             :   }
     366             : 
     367             :   // LIVELINESS policy timers are managed here.
     368           0 :   if (!liveliness_lease_duration_.is_zero()) {
     369           0 :     if (DCPS_debug_level >= 5) {
     370           0 :       ACE_DEBUG((LM_DEBUG,
     371             :           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
     372             :           ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
     373             :           LogGuid(get_guid()).c_str()));
     374             :     }
     375             :     // this call will start the timer if it is not already set
     376           0 :     liveliness_timer_->check_liveliness();
     377             :   }
     378             : 
     379           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     380             : 
     381           0 :   if (!participant)
     382           0 :     return;
     383             : 
     384           0 :   const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
     385             : 
     386           0 :   if (!is_bit_) {
     387             :     // We acquire the publication_handle_lock_ for the remainder of our
     388             :     // processing.
     389             :     {
     390           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     391             : 
     392             :       // This insertion is idempotent.
     393           0 :       publication_id_to_handle_map_.insert(RepoIdToHandleMap::value_type(remote_id, handle));
     394             : 
     395           0 :       if (DCPS_debug_level > 4) {
     396           0 :         ACE_DEBUG((LM_DEBUG,
     397             :             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
     398             :             ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
     399             :             LogGuid(remote_id).c_str(),
     400             :             handle));
     401             :       }
     402             : 
     403             :       // We need to adjust these after the insertions have all completed
     404             :       // since insertions are not guaranteed to increase the number of
     405             :       // currently matched publications.
     406           0 :       const int matchedPublications = static_cast<int>(publication_id_to_handle_map_.size());
     407           0 :       subscription_match_status_.current_count_change =
     408           0 :           matchedPublications - subscription_match_status_.current_count;
     409           0 :       subscription_match_status_.current_count = matchedPublications;
     410             : 
     411           0 :       ++subscription_match_status_.total_count;
     412           0 :       ++subscription_match_status_.total_count_change;
     413             : 
     414           0 :       subscription_match_status_.last_publication_handle = handle;
     415             : 
     416           0 :       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
     417             : 
     418             :       DDS::DataReaderListener_var listener =
     419           0 :           listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
     420             : 
     421           0 :       if (!CORBA::is_nil(listener)) {
     422           0 :         listener->on_subscription_matched(this, subscription_match_status_);
     423             : 
     424             :         // TBD - why does the spec say to change this but not change
     425             :         //       the ChangeFlagStatus after a listener call?
     426             : 
     427             :         // Client will look at it so next time it looks the change should be 0
     428           0 :         subscription_match_status_.total_count_change = 0;
     429           0 :         subscription_match_status_.current_count_change = 0;
     430             :       }
     431             : 
     432           0 :       notify_status_condition();
     433           0 :     }
     434             : 
     435             :     {
     436           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     437           0 :       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
     438             : 
     439           0 :       if (!writers_.count(remote_id)) {
     440           0 :         return;
     441             :       }
     442           0 :       writers_[remote_id]->handle(handle);
     443           0 :     }
     444             :   }
     445             : 
     446           0 :   if (monitor_) {
     447           0 :     monitor_->report();
     448             :   }
     449           0 : }
     450             : 
     451             : void
     452           0 : DataReaderImpl::remove_associations(const WriterIdSeq& writers,
     453             :     bool notify_lost)
     454             : {
     455             :   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
     456             : 
     457           0 :   if (writers.length() == 0) {
     458           0 :     return;
     459             :   }
     460             : 
     461           0 :   const Observer_rch observer = get_observer(Observer::e_DISASSOCIATED);
     462           0 :   if (observer) {
     463           0 :     for (CORBA::ULong i = 0; i < writers.length(); ++i) {
     464           0 :       observer->on_disassociated(this, writers[i]);
     465             :     }
     466             :   }
     467             : 
     468           0 :   if (DCPS_debug_level >= 1) {
     469           0 :     ACE_DEBUG((LM_DEBUG,
     470             :         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
     471             :         ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     472             :         is_bit_,
     473             :         LogGuid(get_guid()).c_str(),
     474             :         LogGuid(writers[0]).c_str(),
     475             :         writers.length()));
     476             :   }
     477           0 :   if (!get_deleted()) {
     478             :     // stop pending associations for these writer ids
     479           0 :     this->stop_associating(writers.get_buffer(), writers.length());
     480             : 
     481             :     {
     482           0 :       CORBA::ULong wr_len = writers.length();
     483           0 :       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
     484             : 
     485           0 :       for (CORBA::ULong i = 0; i < wr_len; i++) {
     486           0 :         const GUID_t writer_id = writers[i];
     487             :         {
     488           0 :           ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
     489           0 :           statistics_.erase(writer_id);
     490           0 :         }
     491             :       }
     492           0 :     }
     493             :   }
     494             : 
     495           0 :   remove_associations_i(writers, notify_lost);
     496           0 : }
     497             : 
     498             : void
     499           0 : DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
     500             :     bool notify_lost)
     501             : {
     502             :   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
     503             : 
     504           0 :   if (writers.length() == 0) {
     505           0 :     return;
     506             :   }
     507             : 
     508           0 :   if (DCPS_debug_level >= 1) {
     509           0 :     ACE_DEBUG((LM_DEBUG,
     510             :         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
     511             :         ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     512             :         is_bit_,
     513             :         LogGuid(get_guid()).c_str(),
     514             :         LogGuid(writers[0]).c_str(),
     515             :         writers.length()));
     516             :   }
     517           0 :   DDS::InstanceHandleSeq handles;
     518             : 
     519           0 :   CORBA::ULong wr_len = writers.length();
     520             : 
     521             :   // Flush historic samples and/or allow in-progress delivery of historic samples to complete
     522           0 :   for (CORBA::ULong i = 0; i < wr_len; i++) {
     523           0 :     resume_sample_processing(writers[i]);
     524             :   }
     525             : 
     526             :   // This is used to hold the list of writers which were actually
     527             :   // removed, which is a proper subset of the writers which were
     528             :   // requested to be removed.
     529           0 :   WriterIdSeq updated_writers;
     530           0 :   WriterMapType removed_writers;
     531             : 
     532             :   //Remove the writers from writer list. If the supplied writer
     533             :   //is not in the cached writers list then it is already removed.
     534             :   //We just need remove the writers in the list that have not been
     535             :   //removed.
     536             :   {
     537           0 :     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
     538             : 
     539           0 :     for (CORBA::ULong i = 0; i < wr_len; i++) {
     540           0 :       const GUID_t writer_id = writers[i];
     541             : 
     542           0 :       WriterMapType::iterator it = this->writers_.find(writer_id);
     543             : 
     544           0 :       if (it != this->writers_.end()) {
     545           0 :         removed_writers.insert(*it);
     546           0 :         end_historic_sweeper_->cancel_timer(it->second);
     547             :       }
     548             : 
     549           0 :       if (this->writers_.erase(writer_id) == 0) {
     550           0 :         if (DCPS_debug_level >= 1) {
     551           0 :           ACE_DEBUG((LM_DEBUG,
     552             :               ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
     553             :               ACE_TEXT("the writer local %C was already removed.\n"),
     554             :               LogGuid(writer_id).c_str()));
     555             :         }
     556             : 
     557             :       } else {
     558           0 :         push_back(updated_writers, writer_id);
     559             :       }
     560             :     }
     561           0 :   }
     562             : 
     563           0 :   for (WriterMapType::iterator it = removed_writers.begin(); it != removed_writers.end(); ++it) {
     564           0 :     it->second->removed();
     565             :   }
     566           0 :   removed_writers.clear();
     567             : 
     568           0 :   wr_len = updated_writers.length();
     569             : 
     570             :   // Return now if the supplied writers have been removed already.
     571           0 :   if (wr_len == 0) {
     572           0 :     return;
     573             :   }
     574             : 
     575           0 :   if (!is_bit_) {
     576             :     // The writer should be in the id_to_handle map at this time.
     577           0 :     this->lookup_instance_handles(updated_writers, handles);
     578             : 
     579           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(publication_handle_lock_);
     580             : 
     581           0 :     for (CORBA::ULong i = 0; i < wr_len; ++i) {
     582           0 :       publication_id_to_handle_map_.erase(updated_writers[i]);
     583             :     }
     584           0 :   }
     585             : 
     586           0 :   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
     587             :     {
     588           0 :       this->disassociate(updated_writers[i]);
     589             :     }
     590             :   }
     591             : 
     592             :   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
     593           0 :   if (!this->is_bit_) {
     594           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
     595             : 
     596             :     // Derive the change in the number of publications writing to this reader.
     597           0 :     int matchedPublications = static_cast<int>(this->publication_id_to_handle_map_.size());
     598             :     this->subscription_match_status_.current_count_change
     599           0 :     = matchedPublications - this->subscription_match_status_.current_count;
     600             : 
     601             :     // Only process status if the number of publications has changed.
     602           0 :     if (this->subscription_match_status_.current_count_change != 0) {
     603           0 :       this->subscription_match_status_.current_count = matchedPublications;
     604             : 
     605             :       /// Section 7.1.4.1: total_count will not decrement.
     606             : 
     607             :       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
     608             :       this->subscription_match_status_.last_publication_handle
     609           0 :       = handles[ wr_len - 1];
     610             : 
     611           0 :       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
     612             : 
     613             :       DDS::DataReaderListener_var listener
     614           0 :       = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
     615             : 
     616           0 :       if (!CORBA::is_nil(listener.in())) {
     617           0 :         listener->on_subscription_matched(this, this->subscription_match_status_);
     618             : 
     619             :         // Client will look at it so next time it looks the change should be 0
     620           0 :         this->subscription_match_status_.total_count_change = 0;
     621           0 :         this->subscription_match_status_.current_count_change = 0;
     622             :       }
     623           0 :       notify_status_condition();
     624           0 :     }
     625           0 :   }
     626             : 
     627             :   // If this remove_association is invoked when the InfoRepo
     628             :   // detects a lost writer then make a callback to notify
     629             :   // subscription lost.
     630           0 :   if (notify_lost) {
     631           0 :     this->notify_subscription_lost(handles);
     632             :   }
     633             : 
     634           0 :   if (this->monitor_) {
     635           0 :     this->monitor_->report();
     636             :   }
     637           0 : }
     638             : 
     639             : void
     640           0 : DataReaderImpl::remove_all_associations()
     641             : {
     642             :   DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
     643           0 :   stop_associating();
     644             : 
     645           0 :   OpenDDS::DCPS::WriterIdSeq writers;
     646             :   int size;
     647             : 
     648             :   {
     649           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
     650             : 
     651           0 :     size = static_cast<int>(writers_.size());
     652           0 :     writers.length(size);
     653             : 
     654           0 :     WriterMapType::iterator curr_writer = writers_.begin();
     655           0 :     WriterMapType::iterator end_writer = writers_.end();
     656             : 
     657           0 :     int i = 0;
     658             : 
     659           0 :     while (curr_writer != end_writer) {
     660           0 :       writers[i++] = curr_writer->first;
     661           0 :       ++curr_writer;
     662             :     }
     663           0 :   }
     664             : 
     665             :   try {
     666           0 :     if (0 < size) {
     667           0 :       remove_associations(writers, false);
     668             :     }
     669           0 :   } catch (const CORBA::Exception&) {
     670           0 :     ACE_DEBUG((LM_WARNING,
     671             :                ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::remove_all_associations() - ")
     672             :                ACE_TEXT("caught exception from remove_associations.\n")));
     673           0 :   }
     674             : 
     675           0 :   transport_stop();
     676           0 : }
     677             : 
     678             : void
     679           0 : DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
     680             : {
     681             :   DDS::DataReaderListener_var listener =
     682           0 :       listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
     683             : 
     684           0 :   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
     685             :     // This test should make the method idempotent.
     686           0 :     return;
     687             :   }
     688             : 
     689           0 :   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
     690             :       true);
     691             : 
     692             :   // copy status and increment change
     693           0 :   requested_incompatible_qos_status_.total_count = status.total_count;
     694           0 :   requested_incompatible_qos_status_.total_count_change +=
     695           0 :       status.count_since_last_send;
     696           0 :   requested_incompatible_qos_status_.last_policy_id =
     697           0 :       status.last_policy_id;
     698           0 :   requested_incompatible_qos_status_.policies = status.policies;
     699             : 
     700           0 :   if (!CORBA::is_nil(listener.in())) {
     701           0 :     listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
     702             : 
     703             :     // TBD - why does the spec say to change total_count_change but not
     704             :     // change the ChangeFlagStatus after a listener call?
     705             : 
     706             :     // client just looked at it so next time it looks the
     707             :     // change should be 0
     708           0 :     requested_incompatible_qos_status_.total_count_change = 0;
     709             :   }
     710             : 
     711           0 :   notify_status_condition();
     712           0 : }
     713             : 
     714             : void
     715           0 : DataReaderImpl::signal_liveliness(const GUID_t& remote_participant)
     716             : {
     717           0 :   GUID_t prefix = remote_participant;
     718           0 :   prefix.entityId = EntityId_t();
     719             : 
     720           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
     721             : 
     722             :   typedef std::pair<GUID_t, WriterInfo_rch> RepoWriterPair;
     723             :   typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
     724           0 :   WriterSet writers;
     725             : 
     726             :   {
     727           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
     728           0 :     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
     729           0 :            limit = writers_.end();
     730           0 :          pos != limit && equal_guid_prefixes(pos->first, prefix);
     731           0 :          ++pos) {
     732           0 :       writers.push_back(std::make_pair(pos->first, pos->second));
     733             :     }
     734           0 :   }
     735             : 
     736           0 :   const MonotonicTimePoint when = MonotonicTimePoint::now();
     737           0 :   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
     738           0 :        pos != limit;
     739           0 :        ++pos) {
     740           0 :     pos->second->received_activity(when);
     741             :   }
     742             : 
     743           0 :   if (!writers.empty()) {
     744           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
     745           0 :     for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
     746           0 :          pos != limit;
     747           0 :          ++pos) {
     748           0 :       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
     749           0 :            iter != instances_.end();
     750           0 :            ++iter) {
     751           0 :         SubscriptionInstance_rch ptr = iter->second;
     752           0 :         ptr->instance_state_->lively(pos->first);
     753           0 :       }
     754             :     }
     755           0 :   }
     756           0 : }
     757             : 
     758           0 : DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
     759             :     DDS::SampleStateMask sample_states,
     760             :     DDS::ViewStateMask view_states,
     761             :     DDS::InstanceStateMask instance_states)
     762             : {
     763           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
     764           0 :   DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
     765           0 :       view_states, instance_states);
     766           0 :   read_conditions_.insert(rc);
     767           0 :   return rc._retn();
     768           0 : }
     769             : 
     770             : #ifndef OPENDDS_NO_QUERY_CONDITION
     771           0 : DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
     772             :     DDS::SampleStateMask sample_states,
     773             :     DDS::ViewStateMask view_states,
     774             :     DDS::InstanceStateMask instance_states,
     775             :     const char* query_expression,
     776             :     const DDS::StringSeq& query_parameters)
     777             : {
     778           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
     779             :   try {
     780           0 :     DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
     781           0 :         view_states, instance_states, query_expression);
     782           0 :     if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
     783           0 :       return 0;
     784             :     }
     785           0 :     DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
     786           0 :     read_conditions_.insert(rc);
     787           0 :     return qc._retn();
     788           0 :   } catch (const std::exception& e) {
     789           0 :     if (DCPS_debug_level) {
     790           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
     791             :           ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
     792             :           e.what()));
     793             :     }
     794           0 :   }
     795           0 :   return 0;
     796           0 : }
     797             : #endif
     798             : 
     799           0 : bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
     800             : {
     801             :   //sample lock already held
     802           0 :   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
     803           0 :   return read_conditions_.find(rc) != read_conditions_.end();
     804           0 : }
     805             : 
     806           0 : DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
     807             :     DDS::ReadCondition_ptr a_condition)
     808             : {
     809           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
     810             :       DDS::RETCODE_OUT_OF_RESOURCES);
     811           0 :   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
     812           0 :   return read_conditions_.erase(rc)
     813           0 :       ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
     814           0 : }
     815             : 
     816           0 : DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
     817             : {
     818           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
     819             :       DDS::RETCODE_OUT_OF_RESOURCES);
     820           0 :   read_conditions_.clear();
     821           0 :   return DDS::RETCODE_OK;
     822           0 : }
     823             : 
     824           0 : DDS::ReturnCode_t DataReaderImpl::set_qos(const DDS::DataReaderQos& qos)
     825             : {
     826             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     827             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     828             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     829             : 
     830           0 :   DDS::DataReaderQos new_qos = qos;
     831           0 :   new_qos.representation.value = qos_.representation.value;
     832           0 :   if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
     833             : 
     834           0 :     if (qos_ == new_qos)
     835           0 :       return DDS::RETCODE_OK;
     836             : 
     837           0 :     if (enabled_) {
     838           0 :       if (!Qos_Helper::changeable(qos_, new_qos)) {
     839           0 :         return DDS::RETCODE_IMMUTABLE_POLICY;
     840             : 
     841             :       } else {
     842           0 :         Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     843           0 :         DDS::SubscriberQos subscriberQos;
     844             : 
     845           0 :         RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
     846           0 :         bool status = false;
     847           0 :         if (subscriber) {
     848           0 :           subscriber->get_qos(subscriberQos);
     849             :           status =
     850           0 :             disco->update_subscription_qos(
     851             :               domain_id_,
     852           0 :               dp_id_,
     853           0 :               subscription_id_,
     854             :               new_qos,
     855             :               subscriberQos);
     856             :         }
     857           0 :         if (!status) {
     858           0 :           ACE_ERROR_RETURN((LM_ERROR,
     859             :                             ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
     860             :                             ACE_TEXT("qos not updated.\n")),
     861             :                             DDS::RETCODE_ERROR);
     862             :         }
     863           0 :       }
     864             :     }
     865             : 
     866           0 :     qos_change(new_qos);
     867           0 :     qos_ = new_qos;
     868           0 :     passed_qos_ = qos;
     869             : 
     870           0 :     const Observer_rch observer = get_observer(Observer::e_QOS_CHANGED);
     871           0 :     if (observer) {
     872           0 :       observer->on_qos_changed(this);
     873             :     }
     874             : 
     875           0 :     return DDS::RETCODE_OK;
     876             : 
     877           0 :   } else {
     878           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     879             :   }
     880           0 : }
     881             : 
     882           0 : void DataReaderImpl::qos_change(const DDS::DataReaderQos & qos)
     883             : {
     884             :   // Reset the deadline timer if the period has changed.
     885           0 :   if (qos_.deadline.period.sec != qos.deadline.period.sec ||
     886           0 :       qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
     887           0 :     if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
     888           0 :         qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
     889           0 :       deadline_period_ = TimeDuration(qos.deadline.period);
     890           0 :       deadline_queue_enabled_ = true;
     891           0 :     } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
     892           0 :                qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
     893           0 :       cancel_all_deadlines();
     894           0 :       deadline_queue_enabled_ = false;
     895             :     } else {
     896           0 :       reset_deadline_period(TimeDuration(qos.deadline.period));
     897             :     }
     898             :   }
     899           0 : }
     900             : 
     901             : DDS::ReturnCode_t
     902           0 : DataReaderImpl::get_qos(
     903             :     DDS::DataReaderQos & qos)
     904             : {
     905           0 :   qos = passed_qos_;
     906           0 :   return DDS::RETCODE_OK;
     907             : }
     908             : 
     909           0 : DDS::ReturnCode_t DataReaderImpl::set_listener(
     910             :     DDS::DataReaderListener_ptr a_listener,
     911             :     DDS::StatusMask mask)
     912             : {
     913           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     914           0 :   listener_mask_ = mask;
     915             :   //note: OK to duplicate  a nil object ref
     916           0 :   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
     917           0 :   return DDS::RETCODE_OK;
     918           0 : }
     919             : 
     920           0 : DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
     921             : {
     922           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     923           0 :   return DDS::DataReaderListener::_duplicate(listener_.in());
     924           0 : }
     925             : 
     926           0 : DataReaderListener_ptr DataReaderImpl::get_ext_listener()
     927             : {
     928           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     929           0 :   return DataReaderListener::_narrow(listener_.in());
     930           0 : }
     931             : 
     932           0 : DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
     933             : {
     934             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     935             :   {
     936           0 :     ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
     937           0 :     if (content_filtered_topic_) {
     938           0 :       return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
     939             :     }
     940           0 :   }
     941             : #endif
     942           0 :   return DDS::TopicDescription::_duplicate(topic_desc_.in());
     943             : }
     944             : 
     945           0 : DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
     946             : {
     947           0 :   return get_subscriber_servant()._retn();
     948             : }
     949             : 
     950             : DDS::ReturnCode_t
     951           0 : DataReaderImpl::get_sample_rejected_status(
     952             :     DDS::SampleRejectedStatus & status)
     953             : {
     954           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
     955             : 
     956           0 :   set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
     957           0 :   status = sample_rejected_status_;
     958           0 :   sample_rejected_status_.total_count_change = 0;
     959           0 :   return DDS::RETCODE_OK;
     960           0 : }
     961             : 
     962             : DDS::ReturnCode_t
     963           0 : DataReaderImpl::get_liveliness_changed_status(
     964             :     DDS::LivelinessChangedStatus & status)
     965             : {
     966           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
     967             : 
     968           0 :   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
     969             :       false);
     970           0 :   status = liveliness_changed_status_;
     971             : 
     972           0 :   liveliness_changed_status_.alive_count_change = 0;
     973           0 :   liveliness_changed_status_.not_alive_count_change = 0;
     974             : 
     975           0 :   return DDS::RETCODE_OK;
     976           0 : }
     977             : 
     978             : DDS::ReturnCode_t
     979           0 : DataReaderImpl::get_requested_deadline_missed_status(
     980             :     DDS::RequestedDeadlineMissedStatus & status)
     981             : {
     982           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
     983             : 
     984           0 :   set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
     985             :       false);
     986             : 
     987           0 :   this->requested_deadline_missed_status_.total_count_change =
     988           0 :       this->requested_deadline_missed_status_.total_count
     989           0 :       - this->last_deadline_missed_total_count_;
     990             : 
     991             :   // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
     992             :   // is updated by the RequestedDeadlineWatchdog.
     993             : 
     994             :   // Update for next status check.
     995           0 :   this->last_deadline_missed_total_count_ =
     996           0 :       this->requested_deadline_missed_status_.total_count;
     997             : 
     998           0 :   status = requested_deadline_missed_status_;
     999             : 
    1000           0 :   return DDS::RETCODE_OK;
    1001           0 : }
    1002             : 
    1003             : DDS::ReturnCode_t
    1004           0 : DataReaderImpl::get_requested_incompatible_qos_status(
    1005             :     DDS::RequestedIncompatibleQosStatus & status)
    1006             : {
    1007           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
    1008             : 
    1009           0 :   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, false);
    1010           0 :   status = requested_incompatible_qos_status_;
    1011           0 :   requested_incompatible_qos_status_.total_count_change = 0;
    1012             : 
    1013           0 :   return DDS::RETCODE_OK;
    1014           0 : }
    1015             : 
    1016             : DDS::ReturnCode_t
    1017           0 : DataReaderImpl::get_subscription_matched_status(
    1018             :     DDS::SubscriptionMatchedStatus & status)
    1019             : {
    1020           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
    1021             : 
    1022           0 :   set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
    1023           0 :   status = subscription_match_status_;
    1024           0 :   subscription_match_status_.total_count_change = 0;
    1025           0 :   subscription_match_status_.current_count_change = 0;
    1026             : 
    1027           0 :   return DDS::RETCODE_OK;
    1028           0 : }
    1029             : 
    1030             : DDS::ReturnCode_t
    1031           0 : DataReaderImpl::get_sample_lost_status(
    1032             :     DDS::SampleLostStatus & status)
    1033             : {
    1034           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
    1035             : 
    1036           0 :   set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
    1037           0 :   status = sample_lost_status_;
    1038           0 :   sample_lost_status_.total_count_change = 0;
    1039           0 :   return DDS::RETCODE_OK;
    1040           0 : }
    1041             : 
    1042             : DDS::ReturnCode_t
    1043           0 : DataReaderImpl::wait_for_historical_data(
    1044             :     const DDS::Duration_t & /* max_wait */)
    1045             : {
    1046             :   // Add your implementation here
    1047           0 :   return DDS::RETCODE_OK;
    1048             : }
    1049             : 
    1050             : DDS::ReturnCode_t
    1051           0 : DataReaderImpl::get_matched_publications(
    1052             :     DDS::InstanceHandleSeq & publication_handles)
    1053             : {
    1054           0 :   if (!enabled_) {
    1055           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1056             :         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
    1057             :         ACE_TEXT(" Entity is not enabled.\n")),
    1058             :         DDS::RETCODE_NOT_ENABLED);
    1059             :   }
    1060             : 
    1061           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1062             :       guard,
    1063             :       publication_handle_lock_,
    1064             :       DDS::RETCODE_ERROR);
    1065             : 
    1066             :   // Copy out the handles for the current set of publications.
    1067           0 :   int index = 0;
    1068           0 :   publication_handles.length(static_cast<CORBA::ULong>(this->publication_id_to_handle_map_.size()));
    1069             : 
    1070           0 :   for (RepoIdToHandleMap::iterator
    1071           0 :       current = this->publication_id_to_handle_map_.begin();
    1072           0 :       current != this->publication_id_to_handle_map_.end();
    1073           0 :       ++current, ++index) {
    1074           0 :     publication_handles[index] = current->second;
    1075             :   }
    1076             : 
    1077           0 :   return DDS::RETCODE_OK;
    1078           0 : }
    1079             : 
    1080             : #if !defined (DDS_HAS_MINIMUM_BIT)
    1081             : DDS::ReturnCode_t
    1082           0 : DataReaderImpl::get_matched_publication_data(
    1083             :     DDS::PublicationBuiltinTopicData & publication_data,
    1084             :     DDS::InstanceHandle_t publication_handle)
    1085             : {
    1086           0 :   if (!enabled_) {
    1087           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1088             :         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
    1089             :         ACE_TEXT("get_matched_publication_data: ")
    1090             :         ACE_TEXT("Entity is not enabled.\n")),
    1091             :         DDS::RETCODE_NOT_ENABLED);
    1092             :   }
    1093             : 
    1094           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    1095             : 
    1096           0 :   if (!participant)
    1097           0 :     return DDS::RETCODE_ERROR;
    1098             : 
    1099           0 :   DDS::PublicationBuiltinTopicDataSeq data;
    1100           0 :   const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
    1101             :                                   participant.in(),
    1102             :                                   BUILT_IN_PUBLICATION_TOPIC,
    1103             :                                   publication_handle,
    1104             :                                   data);
    1105             : 
    1106           0 :   if (ret == DDS::RETCODE_OK) {
    1107           0 :     publication_data = data[0];
    1108             :   }
    1109             : 
    1110           0 :   return ret;
    1111           0 : }
    1112             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1113             : 
    1114             : DDS::ReturnCode_t
    1115           0 : DataReaderImpl::enable()
    1116             : {
    1117             :   // According to spec:
    1118             :   // - Calling enable on an already enabled Entity has no effect and returns OK.
    1119             :   // - Calling enable on an Entity whose factory is not enabled will fail
    1120             :   //   and return PRECONDITION_NOT_MET.
    1121             : 
    1122           0 :   if (this->is_enabled()) {
    1123           0 :     return DDS::RETCODE_OK;
    1124             :   }
    1125             : 
    1126           0 :   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    1127           0 :   if (!subscriber) {
    1128           0 :     return DDS::RETCODE_ERROR;
    1129             :   }
    1130             : 
    1131           0 :   if (!subscriber->is_enabled()) {
    1132           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1133             :   }
    1134             : 
    1135           0 :   if (topic_servant_ && !topic_servant_->is_enabled()) {
    1136           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1137             :   }
    1138             : 
    1139           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
    1140           0 :   if (participant) {
    1141           0 :     dp_id_ = participant->get_id();
    1142             :   }
    1143             : 
    1144           0 :   if (topic_servant_) {
    1145           0 :     set_reader_effective_data_rep_qos(qos_.representation.value);
    1146           0 :     if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
    1147           0 :       return DDS::RETCODE_ERROR;
    1148             :     }
    1149             :   }
    1150             : 
    1151           0 :   if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
    1152             :     // The spec says qos_.history.depth is "has no effect"
    1153             :     // when history.kind = KEEP_ALL so use max_samples_per_instance
    1154           0 :     depth_ = qos_.resource_limits.max_samples_per_instance;
    1155             : 
    1156             :   } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
    1157           0 :     depth_ = qos_.history.depth;
    1158             :   }
    1159             : 
    1160           0 :   if (depth_ == DDS::LENGTH_UNLIMITED) {
    1161             :     // DDS::LENGTH_UNLIMITED is negative so make it a positive
    1162             :     // value that is, for all intents and purposes, unlimited
    1163             :     // and we can use it for comparisons.
    1164             :     // WARNING: The client risks running out of memory in this case.
    1165           0 :     depth_ = ACE_INT32_MAX;
    1166             :   }
    1167             : 
    1168           0 :   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
    1169           0 :     n_chunks_ = qos_.resource_limits.max_samples;
    1170             :   }
    1171             : 
    1172             :   //else using value from Service_Participant
    1173             : 
    1174             :   // enable the type specific part of this DataReader
    1175           0 :   this->enable_specific();
    1176             : 
    1177             :   //Note: the QoS used to set n_chunks_ is Changeable=No so
    1178             :   // it is OK that we cannot change the size of our allocators.
    1179           0 :   rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_));
    1180             : 
    1181           0 :   if (DCPS_debug_level >= 2)
    1182           0 :     ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
    1183             :         " Cached_Allocator_With_Overflow %x with %d chunks\n",
    1184             :         rd_allocator_.get(), n_chunks_));
    1185             : 
    1186           0 :   if ((qos_.liveliness.lease_duration.sec !=
    1187           0 :       DDS::DURATION_INFINITE_SEC) &&
    1188           0 :       (qos_.liveliness.lease_duration.nanosec !=
    1189             :           DDS::DURATION_INFINITE_NSEC)) {
    1190           0 :     liveliness_lease_duration_ = TimeDuration(qos_.liveliness.lease_duration);
    1191             :   }
    1192             : 
    1193             :   // Setup the requested deadline watchdog if the configured deadline
    1194             :   // period is not the default (infinite).
    1195           0 :   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
    1196             : 
    1197           0 :   if (!deadline_queue_enabled_
    1198           0 :       && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
    1199           0 :           || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
    1200           0 :     deadline_period_ = TimeDuration(qos_.deadline.period);
    1201           0 :     deadline_queue_enabled_ = true;
    1202             :   }
    1203             : 
    1204           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1205           0 :   disco->pre_reader(this);
    1206             : 
    1207           0 :   this->set_enabled();
    1208             : 
    1209           0 :   if (topic_servant_ && !transport_disabled_) {
    1210             :     try {
    1211           0 :       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
    1212           0 :           this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
    1213           0 :     } catch (const Transport::Exception&) {
    1214           0 :       ACE_ERROR((LM_ERROR,
    1215             :           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
    1216             :           ACE_TEXT("Transport Exception.\n")));
    1217           0 :       return DDS::RETCODE_ERROR;
    1218           0 :     }
    1219             : 
    1220           0 :     const DDS::ReturnCode_t setup_deserialization_result = setup_deserialization();
    1221           0 :     if (setup_deserialization_result != DDS::RETCODE_OK) {
    1222           0 :       return setup_deserialization_result;
    1223             :     }
    1224             : 
    1225           0 :     const TransportLocatorSeq& trans_conf_info = connection_info();
    1226             : 
    1227           0 :     CORBA::String_var filterClassName = "";
    1228           0 :     CORBA::String_var filterExpression = "";
    1229           0 :     DDS::StringSeq exprParams;
    1230             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1231             :     {
    1232           0 :       ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
    1233           0 :       if (content_filtered_topic_) {
    1234           0 :         filterClassName = content_filtered_topic_->get_filter_class_name();
    1235           0 :         filterExpression = content_filtered_topic_->get_filter_expression();
    1236           0 :         content_filtered_topic_->get_expression_parameters(exprParams);
    1237             :       }
    1238           0 :     }
    1239             : #endif
    1240             : 
    1241           0 :     DDS::SubscriberQos sub_qos;
    1242           0 :     subscriber->get_qos(sub_qos);
    1243             : 
    1244             :     TypeSupportImpl* const typesupport =
    1245           0 :       dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
    1246           0 :     if (!typesupport) {
    1247           0 :       return DDS::RETCODE_ERROR;
    1248             :     }
    1249             : 
    1250           0 :     XTypes::TypeInformation type_info;
    1251           0 :     typesupport->to_type_info(type_info);
    1252             : 
    1253           0 :     XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
    1254           0 :     typesupport->add_types(type_lookup_service);
    1255             : 
    1256           0 :     install_type_support(typesupport);
    1257             : 
    1258             :     const GUID_t subscription_id =
    1259           0 :       disco->add_subscription(domain_id_,
    1260           0 :         dp_id_,
    1261           0 :         topic_servant_->get_id(),
    1262           0 :         rchandle_from(this),
    1263           0 :         qos_,
    1264             :         trans_conf_info,
    1265             :         sub_qos,
    1266             :         filterClassName,
    1267             :         filterExpression,
    1268             :         exprParams,
    1269             :         type_info);
    1270             : 
    1271             : #if defined(OPENDDS_SECURITY)
    1272             :     {
    1273           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
    1274           0 :       security_config_ = participant->get_security_config();
    1275           0 :       dynamic_type_ = typesupport->get_type();
    1276           0 :     }
    1277             : #endif
    1278             : 
    1279             :     {
    1280           0 :       ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
    1281           0 :       subscription_id_ = subscription_id;
    1282           0 :       has_subscription_id_ = true;
    1283           0 :       subscription_id_condition_.notify_all();
    1284           0 :     }
    1285             : 
    1286           0 :     if (subscription_id == GUID_UNKNOWN) {
    1287           0 :       if (DCPS_debug_level >= 1) {
    1288           0 :         ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataReaderImpl::enable: "
    1289             :           "add_subscription failed\n"));
    1290             :       }
    1291           0 :       return DDS::RETCODE_ERROR;
    1292             :     }
    1293             : 
    1294           0 :     if (DCPS_debug_level >= 2) {
    1295           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::enable: "
    1296             :         "got GUID %C, subscribed to topic name \"%C\" type \"%C\"\n",
    1297             :         LogGuid(get_guid()).c_str(),
    1298             :         topic_servant_->topic_name(), topic_servant_->type_name()));
    1299             :     }
    1300           0 :   }
    1301             : 
    1302           0 :   DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
    1303           0 :   if (topic_servant_) {
    1304           0 :     const CORBA::String_var name = topic_servant_->get_name();
    1305           0 :     return_value = subscriber->reader_enabled(name.in(), this);
    1306             : 
    1307           0 :     if (this->monitor_) {
    1308           0 :       this->monitor_->report();
    1309             :     }
    1310           0 :   }
    1311             : 
    1312           0 :   if (return_value == DDS::RETCODE_OK) {
    1313           0 :     const Observer_rch observer = get_observer(Observer::e_ENABLED);
    1314           0 :     if (observer) {
    1315           0 :       observer->on_enabled(this);
    1316             :     }
    1317           0 :   }
    1318             : 
    1319           0 :   return return_value;
    1320           0 : }
    1321             : 
    1322             : void
    1323           0 : DataReaderImpl::writer_activity(const DataSampleHeader& header)
    1324             : {
    1325             :   // caller should have the sample_lock_ !!!
    1326             : 
    1327           0 :   WriterInfo_rch writer;
    1328             : 
    1329             :   // The received_activity() has to be called outside the writers_lock_
    1330             :   // because it probably acquire writers_lock_ read lock recursively
    1331             :   // (in handle_timeout). This could cause deadlock when there are writers
    1332             :   // waiting.
    1333             :   {
    1334           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
    1335           0 :     WriterMapType::iterator iter = writers_.find(header.publication_id_);
    1336             : 
    1337           0 :     if (iter != writers_.end()) {
    1338           0 :       writer = iter->second;
    1339             : 
    1340           0 :     } else if (DCPS_debug_level > 4) {
    1341             :       // This may not be an error since it could happen that the sample
    1342             :       // is delivered to the datareader after the write is dis-associated
    1343             :       // with this datareader.
    1344           0 :       ACE_DEBUG((LM_DEBUG,
    1345             :           ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
    1346             :           ACE_TEXT("reader %C is not associated with writer %C.\n"),
    1347             :           LogGuid(get_guid()).c_str(),
    1348             :           LogGuid(header.publication_id_).c_str()));
    1349             :     }
    1350           0 :   }
    1351             : 
    1352           0 :   if (!writer.is_nil()) {
    1353           0 :     writer->received_activity(MonotonicTimePoint::now());
    1354             : 
    1355           0 :     if ((header.message_id_ == SAMPLE_DATA) ||
    1356           0 :         (header.message_id_ == INSTANCE_REGISTRATION) ||
    1357           0 :         (header.message_id_ == UNREGISTER_INSTANCE) ||
    1358           0 :         (header.message_id_ == DISPOSE_INSTANCE) ||
    1359           0 :         (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
    1360             : 
    1361             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1362           0 :       if (header.coherent_change_) {
    1363           0 :         writer->add_coherent_samples(header.sequence_);
    1364             :       }
    1365             : #endif
    1366             :     }
    1367             :   }
    1368           0 : }
    1369             : 
    1370             : void
    1371           0 : DataReaderImpl::data_received(const ReceivedDataSample& sample)
    1372             : {
    1373             :   DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
    1374             : 
    1375           0 :   DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
    1376             :   {
    1377           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
    1378           0 :     RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(sample.header_.publication_id_);
    1379           0 :     if (pos != publication_id_to_handle_map_.end()) {
    1380           0 :       publication_handle = pos->second;
    1381             :     }
    1382           0 :   }
    1383             : 
    1384             :   // ensure some other thread is not changing the sample container
    1385             :   // or statuses related to samples.
    1386           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
    1387             : 
    1388           0 :   if (get_deleted()) return;
    1389             : 
    1390           0 :   if (DCPS_debug_level > 9) {
    1391           0 :     ACE_DEBUG((LM_DEBUG,
    1392             :         ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
    1393             :         ACE_TEXT("%C received sample: %C.\n"),
    1394             :         LogGuid(get_guid()).c_str(),
    1395             :         to_string(sample.header_).c_str()));
    1396             :   }
    1397             : 
    1398           0 :   const ValueDispatcher* vd = get_value_dispatcher();
    1399           0 :   const Observer_rch observer = get_observer(Observer::e_SAMPLE_RECEIVED);
    1400             : 
    1401           0 :   RcHandle<MessageHolder> real_data;
    1402           0 :   SubscriptionInstance_rch instance;
    1403           0 :   switch (sample.header_.message_id_) {
    1404           0 :   case SAMPLE_DATA:
    1405             :   case INSTANCE_REGISTRATION: {
    1406           0 :     if (!check_historic(sample)) break;
    1407             : 
    1408           0 :     DataSampleHeader const & header = sample.header_;
    1409             : 
    1410           0 :     this->writer_activity(header);
    1411             : 
    1412             :     // Verify data has not exceeded its lifespan.
    1413           0 :     if (this->filter_sample(header)) break;
    1414             : 
    1415             :     // This adds the reader to the set/list of readers with data.
    1416           0 :     RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    1417           0 :     if (subscriber) {
    1418           0 :       subscriber->data_received(this);
    1419             :     }
    1420             : 
    1421             :     // Only gather statistics about real samples, not registration data, etc.
    1422           0 :     if (header.message_id_ == SAMPLE_DATA) {
    1423           0 :       this->process_latency(sample);
    1424             :     }
    1425             : 
    1426             :     // This also adds to the sample container and makes any callbacks
    1427             :     // and condition modifications.
    1428             : 
    1429           0 :     bool is_new_instance = false;
    1430           0 :     bool filtered = false;
    1431           0 :     if (sample.header_.key_fields_only_) {
    1432           0 :       dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING, false);
    1433             :     } else {
    1434           0 :       real_data = dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, FULL_MARSHALING, observer && vd);
    1435             :     }
    1436             : 
    1437             :     // Per sample logging
    1438           0 :     if (DCPS_debug_level >= 8) {
    1439           0 :       ACE_DEBUG((LM_DEBUG,
    1440             :           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
    1441             :           ACE_TEXT("instance %d is_new_instance %d filtered %d\n"),
    1442             :           LogGuid(get_guid()).c_str(),
    1443             :           LogGuid(header.publication_id_).c_str(),
    1444             :           instance ? instance->instance_handle_ : 0,
    1445             :           is_new_instance, filtered));
    1446             :     }
    1447             : 
    1448           0 :     if (filtered) break; // sample filtered from instance
    1449             : 
    1450           0 :     if (instance) accept_sample_processing(instance, header, is_new_instance);
    1451           0 :   }
    1452           0 :   break;
    1453             : 
    1454             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1455           0 :   case END_COHERENT_CHANGES: {
    1456           0 :     CoherentChangeControl control;
    1457             : 
    1458           0 :     this->writer_activity(sample.header_);
    1459             : 
    1460           0 :     Message_Block_Ptr payload(sample.data(&mb_alloc_));
    1461             :     Serializer serializer(
    1462             :         payload.get(), Encoding::KIND_UNALIGNED_CDR,
    1463           0 :         sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
    1464           0 :     if (!(serializer >> control)) {
    1465           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
    1466             :           ACE_TEXT("deserialization coherent change control failed.\n")));
    1467           0 :       return;
    1468             :     }
    1469             : 
    1470           0 :     if (DCPS_debug_level > 0) {
    1471           0 :       std::stringstream buffer;
    1472           0 :       buffer << control << std::endl;
    1473             : 
    1474           0 :       ACE_DEBUG((LM_DEBUG,
    1475             :           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
    1476             :           ACE_TEXT("END_COHERENT_CHANGES %C\n"),
    1477             :           buffer.str().c_str()));
    1478           0 :     }
    1479             : 
    1480           0 :     WriterInfo_rch writer;
    1481             :     {
    1482           0 :       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
    1483             : 
    1484             :       WriterMapType::iterator it =
    1485           0 :           this->writers_.find(sample.header_.publication_id_);
    1486             : 
    1487           0 :       if (it == this->writers_.end()) {
    1488           0 :         ACE_DEBUG((LM_WARNING,
    1489             :             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
    1490             :             ACE_TEXT(" subscription %C failed to find ")
    1491             :             ACE_TEXT(" publication data for %C!\n"),
    1492             :             LogGuid(get_guid()).c_str(),
    1493             :             LogGuid(sample.header_.publication_id_).c_str()));
    1494           0 :         return;
    1495             :       }
    1496             :       else {
    1497           0 :         writer = it->second;
    1498             :       }
    1499           0 :       it->second->set_group_info(control);
    1500           0 :     }
    1501             : 
    1502           0 :     if (this->verify_coherent_changes_completion(writer.in())) {
    1503           0 :       this->notify_read_conditions();
    1504             :     }
    1505           0 :   }
    1506           0 :   break;
    1507             : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
    1508             : 
    1509           0 :   case DATAWRITER_LIVELINESS: {
    1510           0 :     if (DCPS_debug_level >= 4) {
    1511           0 :       ACE_DEBUG((LM_DEBUG,
    1512             :                  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
    1513             :                  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
    1514             :                  LogGuid(get_guid()).c_str(),
    1515             :                  LogGuid(sample.header_.publication_id_).c_str()));
    1516             :     }
    1517           0 :     this->writer_activity(sample.header_);
    1518             : 
    1519             :     // tell all instances they got a liveliness message
    1520             :     {
    1521           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1522           0 :       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
    1523           0 :           iter != instances_.end();
    1524           0 :           ++iter) {
    1525           0 :         if (iter->second->instance_state_->writes_instance(sample.header_.publication_id_)) {
    1526           0 :           iter->second->instance_state_->lively(sample.header_.publication_id_);
    1527             :         }
    1528             :       }
    1529           0 :     }
    1530             : 
    1531             :   }
    1532           0 :   break;
    1533             : 
    1534           0 :   case DISPOSE_INSTANCE: {
    1535           0 :     if (!check_historic(sample)) break;
    1536           0 :     this->writer_activity(sample.header_);
    1537           0 :     SubscriptionInstance_rch instance;
    1538             : 
    1539           0 :     if (deadline_queue_enabled_) {
    1540             :       // Find the instance first for timer cancellation since
    1541             :       // the instance may be deleted during dispose and can
    1542             :       // not be accessed.
    1543           0 :       ReceivedDataSample dup(sample);
    1544           0 :       this->lookup_instance(dup, instance);
    1545             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1546           0 :       OwnershipManagerPtr owner_manager = this->ownership_manager();
    1547             : 
    1548           0 :       if (! this->is_exclusive_ownership_
    1549           0 :           || (owner_manager
    1550           0 :               && (instance)
    1551           0 :               && (owner_manager->is_owner(instance->instance_handle_,
    1552           0 :                   sample.header_.publication_id_)))) {
    1553             : #endif
    1554           0 :         cancel_deadline(instance);
    1555             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1556             :       }
    1557             : #endif
    1558           0 :     }
    1559           0 :     instance.reset();
    1560           0 :     this->dispose_unregister(sample, publication_handle, instance);
    1561           0 :   }
    1562           0 :   this->notify_read_conditions();
    1563           0 :   break;
    1564             : 
    1565           0 :   case UNREGISTER_INSTANCE: {
    1566           0 :     if (!check_historic(sample)) break;
    1567           0 :     this->writer_activity(sample.header_);
    1568           0 :     SubscriptionInstance_rch instance;
    1569             : 
    1570           0 :     if (deadline_queue_enabled_) {
    1571             :       // Find the instance first for timer cancellation since
    1572             :       // the instance may be deleted during dispose and can
    1573             :       // not be accessed.
    1574           0 :       ReceivedDataSample dup(sample);
    1575           0 :       this->lookup_instance(dup, instance);
    1576           0 :       if (instance) {
    1577             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1578           0 :         if (! this->is_exclusive_ownership_
    1579           0 :             || (this->is_exclusive_ownership_
    1580           0 :                 && instance->instance_state_->is_last(sample.header_.publication_id_))) {
    1581             : #endif
    1582           0 :           cancel_deadline(instance);
    1583             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1584             :         }
    1585             : #endif
    1586             :       }
    1587           0 :     }
    1588           0 :     instance.reset();
    1589           0 :     this->dispose_unregister(sample, publication_handle, instance);
    1590           0 :   }
    1591           0 :   this->notify_read_conditions();
    1592           0 :   break;
    1593             : 
    1594           0 :   case DISPOSE_UNREGISTER_INSTANCE: {
    1595           0 :     if (!check_historic(sample)) break;
    1596           0 :     this->writer_activity(sample.header_);
    1597           0 :     SubscriptionInstance_rch instance;
    1598             : 
    1599           0 :     if (deadline_queue_enabled_) {
    1600             :       // Find the instance first for timer cancellation since
    1601             :       // the instance may be deleted during dispose and can
    1602             :       // not be accessed.
    1603           0 :       ReceivedDataSample dup(sample);
    1604           0 :       this->lookup_instance(dup, instance);
    1605             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1606           0 :       OwnershipManagerPtr owner_manager = this->ownership_manager();
    1607           0 :       if (! this->is_exclusive_ownership_
    1608           0 :           || (owner_manager
    1609           0 :               && (instance)
    1610           0 :               && (owner_manager->is_owner (instance->instance_handle_,
    1611           0 :                   sample.header_.publication_id_)))
    1612           0 :           || (is_exclusive_ownership_
    1613           0 :               && (instance)
    1614           0 :               && instance->instance_state_->is_last(sample.header_.publication_id_))) {
    1615             : #endif
    1616           0 :         if (instance) {
    1617           0 :           cancel_deadline(instance);
    1618             :         }
    1619             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1620             :       }
    1621             : #endif
    1622           0 :     }
    1623           0 :     instance.reset();
    1624           0 :     this->dispose_unregister(sample, publication_handle, instance);
    1625           0 :   }
    1626           0 :   this->notify_read_conditions();
    1627           0 :   break;
    1628             : 
    1629           0 :   case END_HISTORIC_SAMPLES: {
    1630           0 :     if (sample.header_.message_length_ >= sizeof(GUID_t)) {
    1631           0 :       Message_Block_Ptr payload(sample.data(&mb_alloc_));
    1632           0 :       Serializer ser(payload.get(), Encoding::KIND_UNALIGNED_CDR);
    1633           0 :       GUID_t readerId = GUID_UNKNOWN;
    1634           0 :       if (!(ser >> readerId)) {
    1635           0 :         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
    1636             :             ACE_TEXT("deserialization reader failed.\n")));
    1637           0 :         return;
    1638             :       }
    1639           0 :       const GUID_t repo_id(get_guid());
    1640           0 :       if (readerId != GUID_UNKNOWN && readerId != repo_id) {
    1641           0 :         break; // not our message
    1642             :       }
    1643           0 :     }
    1644           0 :     if (DCPS_debug_level > 4) {
    1645           0 :       ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
    1646             :     }
    1647             :     // Going to acquire writers lock, release samples lock
    1648           0 :     guard.release();
    1649           0 :     resume_sample_processing(sample.header_.publication_id_);
    1650           0 :     if (DCPS_debug_level > 4) {
    1651           0 :       ACE_DEBUG((
    1652             :           LM_INFO,
    1653             :           "(%P|%t) Resumed sample processing for durable writer %C\n",
    1654             :           LogGuid(sample.header_.publication_id_).c_str()));
    1655             :     }
    1656           0 :     break;
    1657             :   }
    1658             : 
    1659           0 :   default:
    1660           0 :     ACE_ERROR((LM_ERROR,
    1661             :         "(%P|%t) ERROR: DataReaderImpl::data_received"
    1662             :         "unexpected message_id = %d\n",
    1663             :         sample.header_.message_id_));
    1664           0 :     break;
    1665             :   }
    1666             : 
    1667           0 :   if (observer && real_data && vd) {
    1668             :     const DDS::Time_t timestamp = {
    1669           0 :       sample.header_.source_timestamp_sec_,
    1670           0 :       sample.header_.source_timestamp_nanosec_
    1671           0 :     };
    1672           0 :     Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, sample.header_.instance_state(), timestamp, sample.header_.sequence_, real_data->get(), *vd);
    1673           0 :     observer->on_sample_received(this, s);
    1674             :   }
    1675           0 : }
    1676             : 
    1677             : RcHandle<EntityImpl>
    1678           0 : DataReaderImpl::parent() const
    1679             : {
    1680           0 :   return subscriber_servant_.lock();
    1681             : }
    1682             : 
    1683             : bool
    1684           0 : DataReaderImpl::check_transport_qos(const TransportInst& ti)
    1685             : {
    1686           0 :   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
    1687           0 :     return ti.is_reliable();
    1688             :   }
    1689           0 :   return true;
    1690             : }
    1691             : 
    1692           0 : void DataReaderImpl::notify_read_conditions()
    1693             : {
    1694             :   //sample lock is already held
    1695           0 :   ReadConditionSet local_read_conditions = read_conditions_;
    1696           0 :   ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    1697             : 
    1698           0 :   for (ReadConditionSet::iterator it = local_read_conditions.begin(),
    1699           0 :       end = local_read_conditions.end(); it != end; ++it) {
    1700           0 :     ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
    1701           0 :     if (ci) {
    1702           0 :       ci->signal_all();
    1703             :     } else {
    1704           0 :       ACE_ERROR((LM_ERROR,
    1705             :         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
    1706             :         ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
    1707             :     }
    1708             :   }
    1709           0 : }
    1710             : 
    1711             : RcHandle<SubscriberImpl>
    1712           0 : DataReaderImpl::get_subscriber_servant()
    1713             : {
    1714           0 :   return subscriber_servant_.lock();
    1715             : }
    1716             : 
    1717           0 : bool DataReaderImpl::have_sample_states(
    1718             :     DDS::SampleStateMask sample_states) const
    1719             : {
    1720             :   //!!!caller should have acquired sample_lock_
    1721           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
    1722           0 :   return lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE).size();
    1723           0 : }
    1724             : 
    1725             : bool
    1726           0 : DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
    1727             : {
    1728             :   //!!!caller should have acquired sample_lock_
    1729           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
    1730           0 :   return lookup_matching_instances(DDS::ANY_SAMPLE_STATE, view_states, DDS::ANY_INSTANCE_STATE).size();
    1731           0 : }
    1732             : 
    1733           0 : bool DataReaderImpl::have_instance_states(
    1734             :     DDS::InstanceStateMask instance_states) const
    1735             : {
    1736             :   //!!!caller should have acquired sample_lock_
    1737           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
    1738           0 :   return lookup_matching_instances(DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, instance_states).size();
    1739           0 : }
    1740             : 
    1741             : /// Fold-in the three separate loops of have_sample_states(),
    1742             : /// have_view_states(), and have_instance_states().  Takes the sample_lock_.
    1743           0 : bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
    1744             :     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
    1745             : {
    1746           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> sample_guard(sample_lock_);
    1747           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
    1748             : 
    1749           0 :   return lookup_matching_instances(sample_states, view_states, instance_states).size();
    1750           0 : }
    1751             : 
    1752             : DDS::DataReaderListener_ptr
    1753           0 : DataReaderImpl::listener_for(DDS::StatusKind kind)
    1754             : {
    1755             :   // per 2.1.4.3.1 Listener Access to Plain Communication Status
    1756             :   // use this entities factory if listener is mask not enabled
    1757             :   // for this kind.
    1758           0 :   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    1759           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    1760           0 :   if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
    1761           0 :     g.release();
    1762           0 :     return subscriber->listener_for(kind);
    1763             : 
    1764             :   } else {
    1765           0 :     return DDS::DataReaderListener::_duplicate(listener_.in());
    1766             :   }
    1767           0 : }
    1768             : 
    1769           0 : void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
    1770             :     const ReceivedDataElement *ptr)
    1771             : {
    1772             : 
    1773           0 :   sample_info.sample_rank = 0;
    1774             : 
    1775             :   // generation_rank =
    1776             :   //    (MRSIC.disposed_generation_count +
    1777             :   //     MRSIC.no_writers_generation_count)
    1778             :   //  - (S.disposed_generation_count +
    1779             :   //     S.no_writers_generation_count)
    1780             :   //
    1781           0 :   sample_info.generation_rank =
    1782           0 :       (sample_info.disposed_generation_count +
    1783           0 :           sample_info.no_writers_generation_count) -
    1784           0 :           sample_info.generation_rank;
    1785             : 
    1786             :   // absolute_generation_rank =
    1787             :   //     (MRS.disposed_generation_count +
    1788             :   //      MRS.no_writers_generation_count)
    1789             :   //   - (S.disposed_generation_count +
    1790             :   //      S.no_writers_generation_count)
    1791             :   //
    1792           0 :   sample_info.absolute_generation_rank =
    1793           0 :       (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
    1794           0 :           static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
    1795           0 :           sample_info.absolute_generation_rank;
    1796             : 
    1797           0 :   sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
    1798           0 : }
    1799             : 
    1800           0 : CORBA::Long DataReaderImpl::total_samples() const
    1801             : {
    1802             :   //!!!caller should have acquired sample_lock_
    1803           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
    1804             : 
    1805           0 :   CORBA::Long count(0);
    1806             : 
    1807           0 :   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
    1808           0 :       iter != instances_.end();
    1809           0 :       ++iter) {
    1810           0 :     SubscriptionInstance_rch ptr = iter->second;
    1811             : 
    1812           0 :     count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size());
    1813           0 :   }
    1814             : 
    1815           0 :   return count;
    1816           0 : }
    1817             : 
    1818             : void
    1819           0 : DataReaderImpl::LivelinessTimer::check_liveliness()
    1820             : {
    1821           0 :   execute_or_enqueue(make_rch<CheckLivelinessCommand>(this));
    1822           0 : }
    1823             : 
    1824             : int
    1825           0 : DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
    1826             :                                                 const void * /*arg*/)
    1827             : {
    1828           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    1829             : 
    1830           0 :   check_liveliness_i(false, MonotonicTimePoint(tv));
    1831           0 :   return 0;
    1832           0 : }
    1833             : 
    1834             : void
    1835           0 : DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
    1836             :                                                     const MonotonicTimePoint& now)
    1837             : {
    1838             :   // Working copy of the active timer Id.
    1839             : 
    1840           0 :   RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
    1841           0 :   if (! data_reader) {
    1842           0 :     this->reactor()->purge_pending_notifications(this);
    1843           0 :     return;
    1844             :   }
    1845             : 
    1846           0 :   long local_timer_id = liveliness_timer_id_;
    1847           0 :   bool timer_was_reset = false;
    1848             : 
    1849           0 :   if (local_timer_id != -1 && cancel) {
    1850           0 :     if (DCPS_debug_level >= 5) {
    1851           0 :       ACE_DEBUG((LM_DEBUG,
    1852             :                  ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
    1853             :                  ACE_TEXT(" canceling timer for reader %C.\n"),
    1854             :                  LogGuid(data_reader->get_guid()).c_str()));
    1855             :     }
    1856             : 
    1857             :     // called from add_associations and there is already a timer
    1858             :     // so cancel the existing timer.
    1859           0 :     if (this->reactor()->cancel_timer(local_timer_id) == -1) {
    1860             :       // this could fail because the reactor's call and
    1861             :       // the add_associations' call to this could overlap
    1862             :       // so it is not a failure.
    1863           0 :       ACE_DEBUG((LM_DEBUG,
    1864             :                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
    1865             :                  ACE_TEXT(" %p.\n"), ACE_TEXT("cancel_timer")));
    1866             :     }
    1867             : 
    1868           0 :     timer_was_reset = true;
    1869             :   }
    1870             : 
    1871             :   // Used after the lock scope ends.
    1872           0 :   MonotonicTimePoint smallest(MonotonicTimePoint::max_value);
    1873           0 :   int alive_writers = 0;
    1874             : 
    1875             :   // This is a bit convoluted.  The reasoning goes as follows:
    1876             :   // 1) We grab the current timer Id value when we enter the method.
    1877             :   // 2) We *might* cancel the timer if it is active.
    1878             :   // 3) The timer *might* be rescheduled while we do not hold the sample lock.
    1879             :   // 4) If we (or another thread) canceled the timer that we can tell, then
    1880             :   // 5) we should clear the Id value,
    1881             :   // 6) unless it has been rescheduled.
    1882             :   // We are using a changed timer Id value as a proxy for having been
    1883             :   // rescheduled.
    1884           0 :   if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
    1885           0 :     liveliness_timer_id_ = -1;
    1886             :   }
    1887             : 
    1888             :   // Iterate over each writer to this reader
    1889             :   {
    1890           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
    1891             :         read_guard,
    1892             :         data_reader->writers_lock_);
    1893           0 :     WriterMapType writers = data_reader->writers_;
    1894           0 :     read_guard.release();
    1895             : 
    1896           0 :     for (WriterMapType::iterator iter = writers.begin();
    1897           0 :         iter != writers.end();
    1898           0 :         ++iter) {
    1899             :       // deal with possibly not being alive or
    1900             :       // tell when it will not be alive next (if no activity)
    1901           0 :       const MonotonicTimePoint next_absolute(iter->second->check_activity(now));
    1902           0 :       if (!next_absolute.is_max()) {
    1903           0 :         alive_writers++;
    1904           0 :         smallest = std::min(smallest, next_absolute);
    1905             :       }
    1906           0 :     }
    1907           0 :   }
    1908             : 
    1909           0 :   if (!alive_writers) {
    1910             :     // no live writers so no need to schedule a timer
    1911             :     // but be sure we don't try to cancel the timer later.
    1912           0 :     liveliness_timer_id_ = -1;
    1913             :   }
    1914             : 
    1915           0 :   if (DCPS_debug_level >= 5) {
    1916           0 :     ACE_DEBUG((LM_DEBUG,
    1917             :         ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
    1918             :         ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
    1919             :         LogGuid(data_reader->get_guid()).c_str(),
    1920             :         alive_writers,
    1921             :         !cancel));
    1922             :   }
    1923             : 
    1924             :   // Call into the reactor after releasing the sample lock.
    1925           0 :   if (alive_writers) {
    1926             :     // compare the time now with the earliest(smallest) deadline we found
    1927           0 :     TimeDuration relative;
    1928           0 :     if (now < smallest) {
    1929           0 :       relative = smallest - now;
    1930             :     } else {
    1931           0 :       relative = TimeDuration(0, 1); // ASAP
    1932             :     }
    1933           0 :     liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative.value());
    1934             : 
    1935           0 :     if (liveliness_timer_id_ == -1) {
    1936           0 :       ACE_ERROR((LM_ERROR,
    1937             :           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
    1938             :           ACE_TEXT(" %p.\n"), ACE_TEXT("schedule_timer")));
    1939             :     }
    1940           0 :   }
    1941           0 : }
    1942             : 
    1943             : void
    1944           0 : DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
    1945             : {
    1946             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1947           0 :   OwnershipManagerPtr owner_manager = this->ownership_manager();
    1948           0 :   if (owner_manager) {
    1949           0 :     owner_manager->remove_writers(handle);
    1950             :   }
    1951             : #endif
    1952             : 
    1953           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
    1954           0 :   SubscriptionInstance_rch instance = this->get_handle_instance(handle);
    1955             : 
    1956           0 :   if (!instance) {
    1957           0 :     ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
    1958             :         "could not find the instance by handle 0x%x\n", handle));
    1959           0 :     return;
    1960             :   }
    1961             : 
    1962           0 :   this->purge_data(instance);
    1963             : 
    1964             :   {
    1965           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1966           0 :     instances_.erase(handle);
    1967           0 :   }
    1968             : 
    1969           0 :   this->release_instance_i(handle);
    1970           0 :   if (this->monitor_) {
    1971           0 :     this->monitor_->report();
    1972             :   }
    1973           0 : }
    1974             : 
    1975             : void
    1976           0 : DataReaderImpl::state_updated(DDS::InstanceHandle_t handle)
    1977             : {
    1978           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    1979           0 :   state_updated_i(handle);
    1980           0 : }
    1981             : 
    1982           0 : OpenDDS::DCPS::WriterStats::WriterStats(
    1983             :     int amount,
    1984           0 :     DataCollector<double>::OnFull type) : stats_(amount, type)
    1985             : {
    1986           0 : }
    1987             : 
    1988           0 : void OpenDDS::DCPS::WriterStats::add_stat(const TimeDuration& delay)
    1989             : {
    1990           0 :   double datum = static_cast<double>(delay.value().sec());
    1991           0 :   datum += delay.value().usec() / 1000000.0;
    1992           0 :   this->stats_.add(datum);
    1993           0 : }
    1994             : 
    1995           0 : OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
    1996             : {
    1997             :   LatencyStatistics value;
    1998             : 
    1999           0 :   value.publication = GUID_UNKNOWN;
    2000           0 :   value.n           = this->stats_.n();
    2001           0 :   value.maximum     = this->stats_.maximum();
    2002           0 :   value.minimum     = this->stats_.minimum();
    2003           0 :   value.mean        = this->stats_.mean();
    2004           0 :   value.variance    = this->stats_.var();
    2005             : 
    2006           0 :   return value;
    2007             : }
    2008             : 
    2009           0 : void OpenDDS::DCPS::WriterStats::reset_stats()
    2010             : {
    2011           0 :   this->stats_.reset();
    2012           0 : }
    2013             : 
    2014             : #ifndef OPENDDS_SAFETY_PROFILE
    2015           0 : std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
    2016             : {
    2017           0 :   str << std::dec << this->stats_.size()
    2018           0 :                               << " samples out of " << this->stats_.n() << std::endl;
    2019           0 :   return str << this->stats_;
    2020             : }
    2021             : #endif //OPENDDS_SAFETY_PROFILE
    2022             : 
    2023             : void
    2024           0 : DataReaderImpl::writer_removed(WriterInfo& info)
    2025             : {
    2026           0 :   const GUID_t info_writer_id = info.writer_id();
    2027             : 
    2028           0 :   if (DCPS_debug_level >= 5) {
    2029           0 :     ACE_DEBUG((LM_DEBUG,
    2030             :         ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
    2031             :         ACE_TEXT("reader %C from writer %C.\n"),
    2032             :         LogGuid(get_guid()).c_str(),
    2033             :         LogGuid(info_writer_id).c_str()));
    2034             :   }
    2035             : 
    2036             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2037           0 :   OwnershipManagerPtr owner_manager = this->ownership_manager();
    2038           0 :   if (owner_manager) {
    2039           0 :     owner_manager->remove_writer(info_writer_id);
    2040           0 :     info.clear_owner_evaluated();
    2041             :   }
    2042             : #endif
    2043             : 
    2044             :   {
    2045           0 :     DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
    2046             :     {
    2047           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
    2048           0 :       RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
    2049           0 :       if (pos != publication_id_to_handle_map_.end()) {
    2050           0 :         publication_handle = pos->second;
    2051             :       }
    2052           0 :     }
    2053             : 
    2054           0 :     bool liveliness_changed = false;
    2055             : 
    2056           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2057             : 
    2058           0 :     const WriterInfo::WriterState info_state = info.state();
    2059             : 
    2060           0 :     if (info_state == WriterInfo::ALIVE) {
    2061           0 :       --liveliness_changed_status_.alive_count;
    2062           0 :       --liveliness_changed_status_.alive_count_change;
    2063           0 :       liveliness_changed = true;
    2064             :     }
    2065             : 
    2066           0 :     if (info_state == WriterInfo::DEAD) {
    2067           0 :       --liveliness_changed_status_.not_alive_count;
    2068           0 :       --liveliness_changed_status_.not_alive_count_change;
    2069           0 :       liveliness_changed = true;
    2070             :     }
    2071             : 
    2072           0 :     liveliness_changed_status_.last_publication_handle = info.handle();
    2073           0 :     instances_liveliness_update(info_writer_id, publication_handle);
    2074             : 
    2075           0 :     if (liveliness_changed) {
    2076           0 :       set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
    2077           0 :       this->notify_liveliness_change();
    2078             :     }
    2079           0 :   }
    2080           0 : }
    2081             : 
    2082             : void
    2083           0 : DataReaderImpl::writer_became_alive(WriterInfo& info, const MonotonicTimePoint& /* when */)
    2084             : {
    2085           0 :   const GUID_t info_writer_id = info.writer_id();
    2086             : 
    2087           0 :   if (DCPS_debug_level >= 5) {
    2088           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
    2089             :                ACE_TEXT("reader %C from writer %C previous state %C.\n"),
    2090             :                LogGuid(get_guid()).c_str(),
    2091             :                LogGuid(info_writer_id).c_str(),
    2092             :                info.get_state_str()));
    2093             :   }
    2094             : 
    2095             :   // NOTE: each instance will change to ALIVE_STATE when they receive a sample
    2096             : 
    2097           0 :   const WriterInfo::WriterState info_state = info.state();
    2098             : 
    2099             :   {
    2100           0 :     bool liveliness_changed = false;
    2101             : 
    2102           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2103             : 
    2104           0 :     if (info_state != WriterInfo::ALIVE) {
    2105           0 :       liveliness_changed_status_.alive_count++;
    2106           0 :       liveliness_changed_status_.alive_count_change++;
    2107           0 :       liveliness_changed = true;
    2108             :     }
    2109             : 
    2110           0 :     if (info_state == WriterInfo::DEAD) {
    2111           0 :       liveliness_changed_status_.not_alive_count--;
    2112           0 :       liveliness_changed_status_.not_alive_count_change--;
    2113             :     }
    2114             : 
    2115           0 :     if (liveliness_changed_status_.alive_count < 0) {
    2116           0 :       ACE_ERROR((LM_ERROR,
    2117             :                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
    2118             :                  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
    2119             :                  liveliness_changed_status_.alive_count));
    2120           0 :       return;
    2121             :     }
    2122             : 
    2123           0 :     if (liveliness_changed_status_.not_alive_count < 0) {
    2124           0 :       ACE_ERROR((LM_ERROR,
    2125             :                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
    2126             :                  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
    2127             :                  liveliness_changed_status_.not_alive_count));
    2128           0 :       return;
    2129             :     }
    2130             : 
    2131           0 :     liveliness_changed_status_.last_publication_handle = info.handle();
    2132             : 
    2133             :     // Change the state to ALIVE since handle_timeout may call writer_became_dead
    2134             :     // which need the current state info.
    2135           0 :     info.state(WriterInfo::ALIVE);
    2136             : 
    2137           0 :     if (this->monitor_) {
    2138           0 :       this->monitor_->report();
    2139             :     }
    2140             : 
    2141             :     // Call listener only when there are liveliness status changes.
    2142           0 :     if (liveliness_changed) {
    2143           0 :       set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
    2144           0 :       this->notify_liveliness_change();
    2145             :     }
    2146           0 :   }
    2147             : 
    2148             :   // this call will start the liveliness timer if it is not already set
    2149           0 :   liveliness_timer_->check_liveliness();
    2150             : }
    2151             : 
    2152             : void
    2153           0 : DataReaderImpl::writer_became_dead(WriterInfo& info)
    2154             : {
    2155           0 :   const GUID_t info_writer_id = info.writer_id();
    2156             : 
    2157           0 :   if (DCPS_debug_level >= 5) {
    2158           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
    2159             :                ACE_TEXT("reader %C from writer %C previous state %C.\n"),
    2160             :                LogGuid(get_guid()).c_str(),
    2161             :                LogGuid(info_writer_id).c_str(),
    2162             :                info.get_state_str()));
    2163             :   }
    2164             : 
    2165             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2166           0 :   OwnershipManagerPtr owner_manager = this->ownership_manager();
    2167           0 :   if (owner_manager) {
    2168           0 :     owner_manager->remove_writer(info_writer_id);
    2169           0 :     info.clear_owner_evaluated();
    2170             :   }
    2171             : #endif
    2172             : 
    2173           0 :   bool liveliness_changed = false;
    2174             : 
    2175           0 :   const WriterInfo::WriterState info_state = info.state();
    2176             : 
    2177           0 :   DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
    2178             :   {
    2179           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
    2180           0 :     RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
    2181           0 :     if (pos != publication_id_to_handle_map_.end()) {
    2182           0 :       publication_handle = pos->second;
    2183             :     }
    2184           0 :   }
    2185             : 
    2186             :   {
    2187           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2188             : 
    2189           0 :     if (info_state != WriterInfo::DEAD) {
    2190           0 :       ++liveliness_changed_status_.not_alive_count;
    2191           0 :       ++liveliness_changed_status_.not_alive_count_change;
    2192           0 :       liveliness_changed = true;
    2193             :     }
    2194             : 
    2195           0 :     if (info_state == WriterInfo::ALIVE) {
    2196           0 :       --liveliness_changed_status_.alive_count;
    2197           0 :       --liveliness_changed_status_.alive_count_change;
    2198             :     }
    2199             : 
    2200           0 :     if (liveliness_changed_status_.alive_count < 0) {
    2201           0 :       ACE_ERROR((LM_ERROR,
    2202             :                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
    2203             :                  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
    2204             :                  liveliness_changed_status_.alive_count));
    2205           0 :       return;
    2206             :     }
    2207             : 
    2208           0 :     if (liveliness_changed_status_.not_alive_count < 0) {
    2209           0 :       ACE_ERROR((LM_ERROR,
    2210             :                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
    2211             :                  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
    2212             :                  liveliness_changed_status_.not_alive_count));
    2213           0 :       return;
    2214             :     }
    2215             : 
    2216           0 :     liveliness_changed_status_.last_publication_handle = info.handle();
    2217             : 
    2218           0 :     info.state(WriterInfo::DEAD);
    2219             : 
    2220           0 :     if (this->monitor_) {
    2221           0 :       this->monitor_->report();
    2222             :     }
    2223             : 
    2224           0 :     instances_liveliness_update(info_writer_id, publication_handle);
    2225             : 
    2226             :     // Call listener only when there are liveliness status changes.
    2227           0 :     if (liveliness_changed) {
    2228           0 :       set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
    2229           0 :       this->notify_liveliness_change();
    2230             :     }
    2231           0 :   }
    2232           0 : }
    2233             : 
    2234             : void
    2235           0 : DataReaderImpl::instances_liveliness_update(const GUID_t& writer,
    2236             :                                             DDS::InstanceHandle_t publication_handle)
    2237             : {
    2238             :   // sample_lock_ must be held.
    2239           0 :   InstanceSet localinsts;
    2240             :   {
    2241           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    2242           0 :     if (instances_.size() == 0) {
    2243           0 :       return;
    2244             :     }
    2245           0 :     for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
    2246           0 :          iter != instances_.end(); ++iter) {
    2247           0 :       if (iter->second->instance_state_->writes_instance(writer)) {
    2248           0 :         localinsts.insert(iter->first);
    2249             :       }
    2250             :     }
    2251           0 :   }
    2252             : 
    2253           0 :   for (InstanceSet::iterator iter = localinsts.begin(); iter != localinsts.end(); ++iter) {
    2254           0 :     set_instance_state_i(*iter, publication_handle, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, SystemTimePoint::now(), writer);
    2255             :   }
    2256           0 : }
    2257             : 
    2258             : 
    2259             : void
    2260           0 : DataReaderImpl::set_sample_lost_status(
    2261             :     const DDS::SampleLostStatus& status)
    2262             : {
    2263             :   //!!!caller should have acquired sample_lock_
    2264           0 :   sample_lost_status_ = status;
    2265           0 : }
    2266             : 
    2267             : void
    2268           0 : DataReaderImpl::set_sample_rejected_status(
    2269             :     const DDS::SampleRejectedStatus& status)
    2270             : {
    2271             :   //!!!caller should have acquired sample_lock_
    2272           0 :   sample_rejected_status_ = status;
    2273           0 : }
    2274             : 
    2275           0 : void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
    2276             :                                         DDS::InstanceHandle_t,
    2277             :                                         SubscriptionInstance_rch&)
    2278             : {
    2279           0 :   if (DCPS_debug_level > 0) {
    2280           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
    2281             :   }
    2282           0 : }
    2283             : 
    2284           0 : void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
    2285             : {
    2286           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
    2287           0 :   StatsMapType::iterator location = this->statistics_.find(sample.header_.publication_id_);
    2288             : 
    2289           0 :   if (location != this->statistics_.end()) {
    2290           0 :     const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
    2291             : 
    2292             :     // Only when the user has specified a latency budget or statistics
    2293             :     // are enabled we need to calculate our latency
    2294           0 :     if ((this->statistics_enabled()) ||
    2295           0 :         (this->qos_.latency_budget.duration > zero)) {
    2296             :       const DDS::Time_t timestamp = {
    2297           0 :         sample.header_.source_timestamp_sec_,
    2298           0 :         sample.header_.source_timestamp_nanosec_
    2299           0 :       };
    2300           0 :       const TimeDuration latency = SystemTimePoint::now() - SystemTimePoint(timestamp);
    2301             : 
    2302           0 :       if (this->statistics_enabled()) {
    2303           0 :         location->second.add_stat(latency);
    2304             :       }
    2305             : 
    2306           0 :       if (DCPS_debug_level > 9) {
    2307           0 :         ACE_DEBUG((LM_DEBUG,
    2308             :             ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
    2309             :             ACE_TEXT("measured latency of %C for current sample.\n"),
    2310             :             latency.str().c_str()));
    2311             :       }
    2312             : 
    2313           0 :       if (this->qos_.latency_budget.duration > zero) {
    2314             :         // Check latency against the budget.
    2315           0 :         if (latency > TimeDuration(this->qos_.latency_budget.duration)) {
    2316           0 :           this->notify_latency(sample.header_.publication_id_);
    2317             :         }
    2318             :       }
    2319           0 :     }
    2320           0 :   } else if (DCPS_debug_level > 0) {
    2321             :     /// NB: This message is generated contemporaneously with a similar
    2322             :     ///     message from writer_activity().  That message is not marked
    2323             :     ///     as an error, so we follow that lead and leave this as an
    2324             :     ///     informational message, guarded by debug level.  This seems
    2325             :     ///     to be due to late samples (samples delivered after an
    2326             :     ///     association has been torn down).  We may want to promote this
    2327             :     ///     to a warning if other conditions causing this symptom are
    2328             :     ///     discovered.
    2329           0 :     ACE_DEBUG((LM_DEBUG,
    2330             :         ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
    2331             :         ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
    2332             :         LogGuid(get_guid()).c_str(),
    2333             :         LogGuid(sample.header_.publication_id_).c_str()));
    2334             :   }
    2335           0 : }
    2336             : 
    2337           0 : void DataReaderImpl::notify_latency(GUID_t writer)
    2338             : {
    2339             :   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
    2340             :   // is given to this DataReader then narrow() fails.
    2341           0 :   DataReaderListener_var listener = get_ext_listener();
    2342             : 
    2343           0 :   if (!CORBA::is_nil(listener.in())) {
    2344           0 :     WriterIdSeq writerIds;
    2345           0 :     writerIds.length(1);
    2346           0 :     writerIds[ 0] = writer;
    2347             : 
    2348           0 :     DDS::InstanceHandleSeq handles;
    2349           0 :     this->lookup_instance_handles(writerIds, handles);
    2350             : 
    2351           0 :     if (handles.length() >= 1) {
    2352           0 :       this->budget_exceeded_status_.last_instance_handle = handles[ 0];
    2353             : 
    2354             :     } else {
    2355           0 :       this->budget_exceeded_status_.last_instance_handle = -1;
    2356             :     }
    2357             : 
    2358           0 :     ++this->budget_exceeded_status_.total_count;
    2359           0 :     ++this->budget_exceeded_status_.total_count_change;
    2360             : 
    2361           0 :     listener->on_budget_exceeded(this, this->budget_exceeded_status_);
    2362             : 
    2363           0 :     this->budget_exceeded_status_.total_count_change = 0;
    2364           0 :   }
    2365           0 : }
    2366             : 
    2367             : #ifndef OPENDDS_SAFETY_PROFILE
    2368             : void
    2369           0 : DataReaderImpl::get_latency_stats(
    2370             :     OpenDDS::DCPS::LatencyStatisticsSeq & stats)
    2371             : {
    2372           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
    2373           0 :   stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
    2374           0 :   int index = 0;
    2375             : 
    2376           0 :   for (StatsMapType::const_iterator current = this->statistics_.begin();
    2377           0 :       current != this->statistics_.end();
    2378           0 :       ++current, ++index) {
    2379           0 :     stats[ index] = current->second.get_stats();
    2380           0 :     stats[ index].publication = current->first;
    2381             :   }
    2382           0 : }
    2383             : #endif
    2384             : 
    2385             : void
    2386           0 : DataReaderImpl::reset_latency_stats()
    2387             : {
    2388           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
    2389           0 :   for (StatsMapType::iterator current = this->statistics_.begin();
    2390           0 :       current != this->statistics_.end();
    2391           0 :       ++current) {
    2392           0 :     current->second.reset_stats();
    2393             :   }
    2394           0 : }
    2395             : 
    2396             : CORBA::Boolean
    2397           0 : DataReaderImpl::statistics_enabled()
    2398             : {
    2399           0 :   return statistics_enabled_;
    2400             : }
    2401             : 
    2402             : void
    2403           0 : DataReaderImpl::statistics_enabled(
    2404             :     CORBA::Boolean statistics_enabled)
    2405             : {
    2406           0 :   statistics_enabled_ = statistics_enabled;
    2407           0 : }
    2408             : 
    2409             : void
    2410           0 : DataReaderImpl::prepare_to_delete()
    2411             : {
    2412           0 :   const Observer_rch observer = get_observer(Observer::e_DELETED);
    2413           0 :   if (observer) {
    2414           0 :     observer->on_deleted(this);
    2415             :   }
    2416             : 
    2417           0 :   this->set_deleted(true);
    2418           0 :   this->stop_associating();
    2419           0 :   this->send_final_acks();
    2420           0 :   subscription_id_condition_.notify_all();
    2421           0 : }
    2422             : 
    2423             : SubscriptionInstance_rch
    2424           0 : DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
    2425             : {
    2426           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
    2427             : 
    2428           0 :   SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
    2429           0 :   if (iter == instances_.end()) {
    2430           0 :     ACE_DEBUG((LM_WARNING,
    2431             :         ACE_TEXT("(%P|%t) WARNING: ")
    2432             :         ACE_TEXT("DataReaderImpl::get_handle_instance: ")
    2433             :         ACE_TEXT("lookup for 0x%x failed\n"),
    2434             :         handle));
    2435           0 :     return SubscriptionInstance_rch();
    2436             :   } // if (0 != instances_.find(handle, instance))
    2437             : 
    2438           0 :   return iter->second;
    2439           0 : }
    2440             : 
    2441             : DDS::InstanceHandle_t
    2442           0 : DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
    2443             : {
    2444           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    2445           0 :   if (!participant)
    2446           0 :     return DDS::HANDLE_NIL;
    2447             : 
    2448           0 :   if (is_bit()) {
    2449           0 :     const GUID_t id = bit_key_to_guid(key);
    2450           0 :     return participant->assign_handle(id);
    2451             : 
    2452             :   } else {
    2453           0 :     return participant->assign_handle();
    2454             :   }
    2455           0 : }
    2456             : 
    2457           0 : void DataReaderImpl::return_handle(DDS::InstanceHandle_t handle)
    2458             : {
    2459           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
    2460           0 :   if (participant) {
    2461           0 :     participant->return_handle(handle);
    2462             :   }
    2463           0 : }
    2464             : 
    2465             : void
    2466           0 : DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
    2467             : {
    2468             :   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
    2469             : 
    2470             :   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
    2471             :   // is given to this DataReader then narrow() fails.
    2472           0 :   DataReaderListener_var the_listener = get_ext_listener();
    2473             : 
    2474           0 :   if (!CORBA::is_nil(the_listener.in())) {
    2475           0 :     SubscriptionLostStatus status;
    2476             : 
    2477             :     // Since this callback may come after remove_association which removes
    2478             :     // the writer from id_to_handle map, we can ignore this error.
    2479           0 :     this->lookup_instance_handles(pubids, status.publication_handles);
    2480           0 :     the_listener->on_subscription_disconnected(this, status);
    2481           0 :   }
    2482           0 : }
    2483             : 
    2484             : void
    2485           0 : DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
    2486             : {
    2487             :   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
    2488             : 
    2489           0 :   if (!this->is_bit_) {
    2490             :     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
    2491             :     // is given to this DataReader then narrow() fails.
    2492           0 :     DataReaderListener_var the_listener = get_ext_listener();
    2493             : 
    2494           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2495           0 :       SubscriptionLostStatus status;
    2496             : 
    2497             :       // If it's reconnected then the reader should be in id_to_handle
    2498           0 :       this->lookup_instance_handles(pubids, status.publication_handles);
    2499             : 
    2500           0 :       the_listener->on_subscription_reconnected(this,  status);
    2501           0 :     }
    2502           0 :   }
    2503           0 : }
    2504             : 
    2505             : void
    2506           0 : DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
    2507             : {
    2508             :   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
    2509             : 
    2510           0 :   if (!this->is_bit_) {
    2511             :     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
    2512             :     // is given to this DataReader then narrow() fails.
    2513           0 :     DataReaderListener_var the_listener = get_ext_listener();
    2514             : 
    2515           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2516           0 :       SubscriptionLostStatus status;
    2517             : 
    2518           0 :       CORBA::ULong len = handles.length();
    2519           0 :       status.publication_handles.length(len);
    2520             : 
    2521           0 :       for (CORBA::ULong i = 0; i < len; ++ i) {
    2522           0 :         status.publication_handles[i] = handles[i];
    2523             :       }
    2524             : 
    2525           0 :       the_listener->on_subscription_lost(this, status);
    2526           0 :     }
    2527           0 :   }
    2528           0 : }
    2529             : 
    2530             : void
    2531           0 : DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
    2532             : {
    2533             :   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
    2534             : 
    2535             :   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
    2536             :   // is given to this DataReader then narrow() fails.
    2537           0 :   DataReaderListener_var the_listener = get_ext_listener();
    2538             : 
    2539           0 :   if (!CORBA::is_nil(the_listener.in())) {
    2540           0 :     SubscriptionLostStatus status;
    2541             : 
    2542             :     // Since this callback may come after remove_association which removes
    2543             :     // the writer from id_to_handle map, we can ignore this error.
    2544           0 :     this->lookup_instance_handles(pubids, status.publication_handles);
    2545           0 :     the_listener->on_subscription_lost(this, status);
    2546           0 :   }
    2547           0 : }
    2548             : 
    2549             : 
    2550             : void
    2551           0 : DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
    2552             :     DDS::InstanceHandleSeq & hdls)
    2553             : {
    2554           0 :   CORBA::ULong const num_wrts = ids.length();
    2555             : 
    2556           0 :   if (DCPS_debug_level > 9) {
    2557           0 :     const char* separator = "";
    2558           0 :     OPENDDS_STRING guids;
    2559             : 
    2560           0 :     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
    2561           0 :       guids += separator;
    2562           0 :       guids += LogGuid(ids[i]).conv_;
    2563           0 :       separator = ", ";
    2564             :     }
    2565             : 
    2566           0 :     ACE_DEBUG((LM_DEBUG,
    2567             :         ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
    2568             :         ACE_TEXT("searching for handles for writer Ids: %C.\n"),
    2569             :         guids.c_str()));
    2570           0 :   }
    2571             : 
    2572           0 :   hdls.length(num_wrts);
    2573             : 
    2574           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    2575           0 :   if (participant) {
    2576           0 :     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
    2577           0 :       hdls[i] = participant->lookup_handle(ids[i]);
    2578             :     }
    2579             :   }
    2580           0 : }
    2581             : 
    2582             : bool
    2583           0 : DataReaderImpl::filter_sample(const DataSampleHeader& header)
    2584             : {
    2585           0 :   const SystemTimePoint now = SystemTimePoint::now();
    2586             : 
    2587             :   // Expire historic data if QoS indicates VOLATILE.
    2588           0 :   if (!always_get_history_ && header.historic_sample_
    2589           0 :       && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
    2590           0 :     if (DCPS_debug_level >= 8) {
    2591           0 :       ACE_DEBUG((LM_DEBUG,
    2592             :           ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
    2593             :           ACE_TEXT("Discarded historic data.\n")));
    2594             :     }
    2595             : 
    2596           0 :     return true; // Data filtered.
    2597             :   }
    2598             : 
    2599             :   // The LIFESPAN_DURATION_FLAG is set when sample data is sent
    2600             :   // with a non-default LIFESPAN duration value.
    2601           0 :   if (header.lifespan_duration_) {
    2602             :     // Finite lifespan.  Check if data has expired.
    2603             : 
    2604             :     const DDS::Time_t expiration_dds_time = {
    2605           0 :       header.source_timestamp_sec_ + header.lifespan_duration_sec_,
    2606           0 :       header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
    2607           0 :     };
    2608           0 :     const SystemTimePoint expiration_time(expiration_dds_time);
    2609             : 
    2610             :     // We assume that the publisher host's clock and subcriber host's
    2611             :     // clock are synchronized (allowed by the spec).
    2612           0 :     if (now >= expiration_time) {
    2613           0 :       if (DCPS_debug_level >= 8) {
    2614           0 :         const TimeDuration diff(now - expiration_time);
    2615           0 :         ACE_DEBUG((LM_DEBUG,
    2616             :           ACE_TEXT("(%P|%t) Received data ")
    2617             :           ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
    2618             :           diff.value().sec(),
    2619             :           diff.value().usec()));
    2620           0 :       }
    2621             : 
    2622           0 :       return true; // Data filtered.
    2623             :     }
    2624           0 :   }
    2625             : 
    2626           0 :   return false;
    2627           0 : }
    2628             : 
    2629             : bool
    2630           0 : DataReaderImpl::ownership_filter_instance(const SubscriptionInstance_rch& instance,
    2631             :   const GUID_t& pubid)
    2632             : {
    2633             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2634           0 :   if (this->is_exclusive_ownership_) {
    2635             : 
    2636           0 :     ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
    2637           0 :     WriterMapType::iterator iter = writers_.find(pubid);
    2638             : 
    2639           0 :     if (iter == writers_.end()) {
    2640           0 :       if (DCPS_debug_level > 4) {
    2641             :         // This may not be an error since it could happen that the sample
    2642             :         // is delivered to the datareader after the write is dis-associated
    2643             :         // with this datareader.
    2644           0 :         ACE_DEBUG((LM_DEBUG,
    2645             :                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
    2646             :                    ACE_TEXT("reader %C is not associated with writer %C.\n"),
    2647             :                    LogGuid(get_guid()).c_str(),
    2648             :                    LogGuid(pubid).c_str()));
    2649             :       }
    2650           0 :       return true;
    2651             :     }
    2652             : 
    2653             : 
    2654             :     // Evaulate the owner of the instance if not selected and filter
    2655             :     // current message if it's not from owner writer.
    2656           0 :     if ( instance->instance_state_->get_owner() == GUID_UNKNOWN
    2657           0 :         || ! iter->second->is_owner_evaluated(instance->instance_handle_)) {
    2658           0 :       OwnershipManagerPtr owner_manager = this->ownership_manager();
    2659             : 
    2660           0 :       bool is_owner = owner_manager && owner_manager->select_owner (
    2661           0 :         instance->instance_handle_,
    2662           0 :         iter->second->writer_id(),
    2663           0 :         iter->second->writer_qos_ownership_strength(),
    2664           0 :         instance->instance_state_);
    2665           0 :       iter->second->set_owner_evaluated(instance->instance_handle_, true);
    2666             : 
    2667           0 :       if (! is_owner) {
    2668           0 :         if (DCPS_debug_level >= 1) {
    2669           0 :           ACE_DEBUG((LM_DEBUG,
    2670             :                      ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
    2671             :                      ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
    2672             :                      LogGuid(get_guid()).c_str(),
    2673             :                      LogGuid(pubid).c_str(),
    2674             :                      LogGuid(instance->instance_state_->get_owner()).c_str()));
    2675             :         }
    2676           0 :         return true;
    2677             :       }
    2678           0 :     }
    2679           0 :     else if (! (instance->instance_state_->get_owner() == pubid)) {
    2680           0 :       if (DCPS_debug_level >= 1) {
    2681           0 :         ACE_DEBUG((LM_DEBUG,
    2682             :                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
    2683             :                    ACE_TEXT("reader %C writer %C is not owner %C\n"),
    2684             :                    LogGuid(get_guid()).c_str(),
    2685             :                    LogGuid(pubid).c_str(),
    2686             :                    LogGuid(instance->instance_state_->get_owner()).c_str()));
    2687             :       }
    2688           0 :       return true;
    2689             :     }
    2690           0 :   }
    2691             : #else
    2692             :   ACE_UNUSED_ARG(pubid);
    2693             :   ACE_UNUSED_ARG(instance);
    2694             : #endif
    2695           0 :   return false;
    2696             : }
    2697             : 
    2698             : bool
    2699           0 : DataReaderImpl::time_based_filter_instance(const SubscriptionInstance_rch& instance,
    2700             :                                            MonotonicTimePoint& now,
    2701             :                                            MonotonicTimePoint& deadline)
    2702             : {
    2703           0 :   now = MonotonicTimePoint::now();
    2704           0 :   const TimeDuration minimum_separation(qos_.time_based_filter.minimum_separation);
    2705             : 
    2706             :   // TIME_BASED_FILTER processing; expire data samples
    2707             :   // if minimum separation is not met for instance.
    2708           0 :   if (!minimum_separation.is_zero()) {
    2709           0 :     if (now - instance->last_accepted_ < minimum_separation) {
    2710           0 :       deadline = now + minimum_separation;
    2711           0 :       return true; // Data filtered.
    2712             :     }
    2713             :   }
    2714             : 
    2715           0 :   instance->last_accepted_ = now;
    2716             : 
    2717           0 :   return false;
    2718           0 : }
    2719             : 
    2720           0 : bool DataReaderImpl::is_bit() const
    2721             : {
    2722           0 :   return this->is_bit_;
    2723             : }
    2724             : 
    2725             : bool
    2726           0 : DataReaderImpl::has_zero_copies()
    2727             : {
    2728           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2729             :       guard,
    2730             :       this->sample_lock_,
    2731             :       true /* assume we have loans */);
    2732           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
    2733             : 
    2734           0 :   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
    2735           0 :       iter != instances_.end();
    2736           0 :       ++iter) {
    2737           0 :     SubscriptionInstance_rch ptr = iter->second;
    2738             : 
    2739           0 :     if (ptr->rcvd_samples_.has_zero_copies()) {
    2740           0 :       return true;
    2741             :     }
    2742           0 :   }
    2743             : 
    2744           0 :   return false;
    2745           0 : }
    2746             : 
    2747           0 : void DataReaderImpl::notify_liveliness_change()
    2748             : {
    2749             :   // sample_lock_ must be held.
    2750             :   // N.B. writers_lock_ should already be acquired when
    2751             :   //      this method is called.
    2752             : 
    2753             :   DDS::DataReaderListener_var listener
    2754           0 :   = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
    2755             : 
    2756           0 :   if (!CORBA::is_nil(listener.in())) {
    2757           0 :     const DDS::LivelinessChangedStatus status = liveliness_changed_status_;
    2758           0 :     liveliness_changed_status_.alive_count_change = 0;
    2759           0 :     liveliness_changed_status_.not_alive_count_change = 0;
    2760           0 :     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2761           0 :     listener->on_liveliness_changed(this, status);
    2762           0 :   }
    2763           0 :   notify_status_condition();
    2764             : 
    2765           0 :   if (DCPS_debug_level > 9) {
    2766           0 :     ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    2767           0 :     OPENDDS_STRING output_str;
    2768           0 :     output_str += "subscription ";
    2769           0 :     output_str += LogGuid(get_guid()).conv_;
    2770           0 :     output_str += ", listener at: 0x";
    2771           0 :     output_str += to_dds_string(this->listener_.in());
    2772             : 
    2773           0 :     for (WriterMapType::iterator current = this->writers_.begin();
    2774           0 :          current != this->writers_.end();
    2775           0 :          ++current) {
    2776           0 :       const GUID_t id = current->first;
    2777           0 :       output_str += "\n\tNOTIFY: writer[ ";
    2778           0 :       output_str += LogGuid(id).conv_;
    2779           0 :       output_str += "] == ";
    2780           0 :       output_str += current->second->get_state_str();
    2781             :     }
    2782             : 
    2783           0 :     ACE_DEBUG((LM_DEBUG,
    2784             :                ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
    2785             :                ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
    2786             :                ACE_TEXT("\tNOTIFY: %C\n"),
    2787             :                listener.in(),
    2788             :                listener_mask_,
    2789             :                output_str.c_str()));
    2790           0 :   }
    2791           0 : }
    2792             : 
    2793           0 : void DataReaderImpl::post_read_or_take()
    2794             : {
    2795           0 :   set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
    2796           0 :   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    2797           0 :   if (subscriber) {
    2798           0 :     subscriber->set_status_changed_flag(
    2799             :       DDS::DATA_ON_READERS_STATUS, false);
    2800             :   }
    2801           0 : }
    2802             : 
    2803             : ACE_Reactor_Timer_Interface*
    2804           0 : DataReaderImpl::get_reactor()
    2805             : {
    2806           0 :   return this->reactor_;
    2807             : }
    2808             : 
    2809             : OpenDDS::DCPS::GUID_t
    2810           0 : DataReaderImpl::get_topic_id()
    2811             : {
    2812           0 :   return topic_id_;
    2813             : }
    2814             : 
    2815             : OpenDDS::DCPS::GUID_t
    2816           0 : DataReaderImpl::get_dp_id()
    2817             : {
    2818           0 :   return dp_id_;
    2819             : }
    2820             : 
    2821             : void
    2822           0 : DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
    2823             : {
    2824           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2825           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
    2826             : 
    2827           0 :   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
    2828           0 :       end = instances_.end(); iter != end; ++iter) {
    2829           0 :     instance_handles.push_back(iter->first);
    2830             :   }
    2831           0 : }
    2832             : 
    2833             : void
    2834           0 : DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
    2835             : {
    2836           0 :   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
    2837             :       read_guard,
    2838             :       this->writers_lock_);
    2839           0 :   for (WriterMapType::iterator iter = writers_.begin();
    2840           0 :       iter != writers_.end();
    2841           0 :       ++iter) {
    2842           0 :     writer_states.push_back(WriterStatePair(iter->first,
    2843           0 :         iter->second->state()));
    2844             :   }
    2845           0 : }
    2846             : 
    2847             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2848             : void
    2849           0 : DataReaderImpl::update_ownership_strength(const GUID_t& pub_id,
    2850             :     const CORBA::Long& ownership_strength)
    2851             : {
    2852           0 :   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
    2853             :       read_guard,
    2854             :       this->writers_lock_);
    2855           0 :   for (WriterMapType::iterator iter = writers_.begin();
    2856           0 :       iter != writers_.end();
    2857           0 :       ++iter) {
    2858           0 :     if (iter->second->writer_id() == pub_id) {
    2859           0 :       if (ownership_strength != iter->second->writer_qos_ownership_strength()) {
    2860           0 :         if (DCPS_debug_level >= 1) {
    2861           0 :           ACE_DEBUG((LM_DEBUG,
    2862             :               ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
    2863             :               ACE_TEXT("local %C update remote %C strength from %d to %d\n"),
    2864             :               LogGuid(get_guid()).c_str(),
    2865             :               LogGuid(pub_id).c_str(),
    2866             :               iter->second->writer_qos_ownership_strength(), ownership_strength));
    2867             :         }
    2868           0 :         iter->second->writer_qos_ownership_strength(ownership_strength);
    2869           0 :         iter->second->clear_owner_evaluated();
    2870             :       }
    2871           0 :       break;
    2872             :     }
    2873             :   }
    2874           0 : }
    2875             : #endif
    2876             : 
    2877             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    2878           0 : bool DataReaderImpl::verify_coherent_changes_completion(WriterInfo* writer)
    2879             : {
    2880           0 :   Coherent_State state = COMPLETED;
    2881           0 :   bool accept_here = true;
    2882             : 
    2883           0 :   const GUID_t writer_id = writer->writer_id();
    2884           0 :   const GUID_t publisher_id = writer->publisher_id();
    2885             : 
    2886           0 :   if (subqos_.presentation.access_scope != ::DDS::INSTANCE_PRESENTATION_QOS &&
    2887           0 :       subqos_.presentation.coherent_access) {
    2888             :     // verify current coherent changes from single writer
    2889           0 :     state = writer->coherent_change_received();
    2890           0 :     if (writer->group_coherent()) { // GROUP coherent any state
    2891           0 :       RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    2892           0 :       if (subscriber && state != NOT_COMPLETED_YET) {
    2893             :         // verify if all readers received complete coherent changes in a group.
    2894           0 :         subscriber->coherent_change_received(publisher_id, this, state);
    2895           0 :         accept_here = false; // coherent_change_received does that itself
    2896             :       }
    2897           0 :     } else if (state != NOT_COMPLETED_YET) { // TOPIC coherent with final state
    2898           0 :       if (state == REJECTED) {
    2899           0 :         reject_coherent(writer_id, publisher_id);
    2900             :       }
    2901           0 :       writer->reset_coherent_info();
    2902             :     }
    2903             :   }
    2904             : 
    2905           0 :   if (state == COMPLETED && accept_here) {
    2906           0 :     accept_coherent(writer_id, publisher_id);
    2907           0 :     coherent_changes_completed(this);
    2908             :   }
    2909             : 
    2910           0 :   return state == COMPLETED;
    2911             : }
    2912             : 
    2913             : 
    2914           0 : void DataReaderImpl::accept_coherent(const GUID_t& writer_id,
    2915             :     const GUID_t& publisher_id)
    2916             : {
    2917           0 :   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
    2918           0 :     ACE_DEBUG((LM_DEBUG,
    2919             :         ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
    2920             :         ACE_TEXT(" reader %C writer %C publisher %C\n"),
    2921             :         LogGuid(get_guid()).c_str(),
    2922             :         LogGuid(writer_id).c_str(),
    2923             :         LogGuid(publisher_id).c_str()));
    2924             :   }
    2925           0 :   SubscriptionInstanceSet localsubs;
    2926             :   {
    2927           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
    2928           0 :     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
    2929           0 :          iter != this->instances_.end(); ++iter) {
    2930           0 :       localsubs.insert(iter->second);
    2931             :     }
    2932           0 :   }
    2933           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2934           0 :   for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
    2935           0 :        iter != localsubs.end(); iter++) {
    2936           0 :     (*iter)->rcvd_strategy_->accept_coherent(writer_id, publisher_id);
    2937             :   }
    2938           0 : }
    2939             : 
    2940             : 
    2941           0 : void DataReaderImpl::reject_coherent(const GUID_t& writer_id,
    2942             :     const GUID_t& publisher_id)
    2943             : {
    2944           0 :   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
    2945           0 :     ACE_DEBUG((LM_DEBUG,
    2946             :         ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
    2947             :         ACE_TEXT(" reader %C writer %C publisher %C\n"),
    2948             :         LogGuid(get_guid()).c_str(),
    2949             :         LogGuid(writer_id).c_str(),
    2950             :         LogGuid(publisher_id).c_str()));
    2951             :   }
    2952             : 
    2953           0 :   SubscriptionInstanceSet localsubs;
    2954             :   {
    2955           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
    2956           0 :     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
    2957           0 :          iter != this->instances_.end(); ++iter) {
    2958           0 :       localsubs.insert(iter->second);
    2959             :     }
    2960           0 :   }
    2961           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2962           0 :   for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
    2963           0 :        iter != localsubs.end(); iter++) {
    2964           0 :     (*iter)->rcvd_strategy_->reject_coherent(writer_id, publisher_id);
    2965             :   }
    2966           0 :   this->reset_coherent_info(writer_id, publisher_id);
    2967           0 : }
    2968             : 
    2969             : 
    2970           0 : void DataReaderImpl::reset_coherent_info(const GUID_t& writer_id,
    2971             :     const GUID_t& publisher_id)
    2972             : {
    2973           0 :   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
    2974             : 
    2975           0 :   WriterMapType::iterator itEnd = this->writers_.end();
    2976           0 :   for (WriterMapType::iterator it = this->writers_.begin();
    2977           0 :       it != itEnd; ++it) {
    2978           0 :     if (it->second->writer_id() == writer_id
    2979           0 :         && it->second->publisher_id() == publisher_id) {
    2980           0 :       it->second->reset_coherent_info();
    2981             :     }
    2982             :   }
    2983           0 : }
    2984             : 
    2985             : 
    2986             : void
    2987           0 : DataReaderImpl::coherent_change_received(const GUID_t& publisher_id, Coherent_State& result)
    2988             : {
    2989           0 :   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
    2990             : 
    2991           0 :   result = COMPLETED;
    2992           0 :   for (WriterMapType::iterator iter = writers_.begin();
    2993           0 :       iter != writers_.end();
    2994           0 :       ++iter) {
    2995             : 
    2996           0 :     if (iter->second->publisher_id() == publisher_id) {
    2997           0 :       const Coherent_State state = iter->second->coherent_change_received();
    2998           0 :       if (state == NOT_COMPLETED_YET) {
    2999           0 :         result = NOT_COMPLETED_YET;
    3000           0 :         break;
    3001             :       }
    3002           0 :       else if (state == REJECTED) {
    3003           0 :         result = REJECTED;
    3004             :       }
    3005             :     }
    3006             :   }
    3007           0 : }
    3008             : 
    3009             : 
    3010             : void
    3011           0 : DataReaderImpl::coherent_changes_completed(DataReaderImpl* reader)
    3012             : {
    3013           0 :   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
    3014           0 :   if (!subscriber) {
    3015           0 :     return;
    3016             :   }
    3017             : 
    3018           0 :   subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
    3019           0 :   this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
    3020             : 
    3021             :   ::DDS::SubscriberListener_var sub_listener =
    3022           0 :       subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
    3023           0 :   if (!CORBA::is_nil(sub_listener.in()))
    3024             :   {
    3025           0 :     if (!is_bit()) {
    3026           0 :       this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
    3027           0 :       subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
    3028           0 :       if (reader == this) {
    3029             :         // Release the sample_lock before listener callback.
    3030           0 :         ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    3031           0 :         sub_listener->on_data_on_readers(subscriber.in());
    3032           0 :       }
    3033             :     } else {
    3034           0 :       TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(subscriber, sub_listener, rchandle_from(this), reader == this, true));
    3035             :     }
    3036             :   }
    3037             :   else
    3038             :   {
    3039           0 :     subscriber->notify_status_condition();
    3040             : 
    3041             :     ::DDS::DataReaderListener_var listener =
    3042           0 :         this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
    3043             : 
    3044           0 :     if (!CORBA::is_nil(listener.in()))
    3045             :     {
    3046           0 :       if (!is_bit()) {
    3047           0 :         set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
    3048           0 :         subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
    3049           0 :         if (reader == this) {
    3050             :           // Release the sample_lock before listener callback.
    3051           0 :           ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    3052           0 :           listener->on_data_available(this);
    3053           0 :         } else {
    3054           0 :           listener->on_data_available(this);
    3055             :         }
    3056             :       } else {
    3057           0 :         TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(this), reader == this, true, true));
    3058             :       }
    3059             :     }
    3060             :     else
    3061             :     {
    3062           0 :       this->notify_status_condition();
    3063             :     }
    3064           0 :   }
    3065           0 : }
    3066             : 
    3067             : 
    3068           0 : void DataReaderImpl::begin_access()
    3069             : {
    3070           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3071           0 :   this->coherent_ = true;
    3072           0 : }
    3073             : 
    3074             : 
    3075           0 : void DataReaderImpl::end_access()
    3076             : {
    3077           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3078           0 :   this->coherent_ = false;
    3079           0 :   this->group_coherent_ordered_data_.reset();
    3080           0 :   this->post_read_or_take();
    3081           0 : }
    3082             : 
    3083             : 
    3084           0 : void DataReaderImpl::get_ordered_data(GroupRakeData& data,
    3085             :     DDS::SampleStateMask sample_states,
    3086             :     DDS::ViewStateMask view_states,
    3087             :     DDS::InstanceStateMask instance_states)
    3088             : {
    3089           0 :   SubscriptionInstanceSet localsubs;
    3090             :   {
    3091           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    3092           0 :     for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
    3093           0 :          iter != instances_.end(); ++iter) {
    3094           0 :       localsubs.insert(iter->second);
    3095             :     }
    3096           0 :   }
    3097             : 
    3098           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3099             : 
    3100           0 :   for (SubscriptionInstanceSet::iterator iter = localsubs.begin(); iter != localsubs.end(); ++iter) {
    3101           0 :     const SubscriptionInstance_rch inst = *iter;
    3102           0 :     if (inst->instance_state_->match(view_states, instance_states)) {
    3103           0 :       size_t i(0);
    3104           0 :       for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
    3105           0 :            item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
    3106           0 :           data.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
    3107           0 :           group_coherent_ordered_data_.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
    3108             :       }
    3109             :     }
    3110           0 :   }
    3111           0 : }
    3112             : 
    3113             : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
    3114             : 
    3115             : void
    3116           0 : DataReaderImpl::set_subscriber_qos(
    3117             :     const DDS::SubscriberQos & qos)
    3118             : {
    3119           0 :   this->subqos_ = qos;
    3120           0 : }
    3121             : 
    3122             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    3123             : void
    3124           0 : DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
    3125             : {
    3126           0 :   cft->add_reader(*this);
    3127             :   {
    3128           0 :     ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
    3129           0 :     content_filtered_topic_ = cft;
    3130           0 :   }
    3131           0 : }
    3132             : 
    3133             : DDS::ContentFilteredTopic_ptr
    3134           0 : DataReaderImpl::get_cf_topic() const
    3135             : {
    3136           0 :   ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
    3137           0 :   return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
    3138           0 : }
    3139             : #endif
    3140             : 
    3141             : #ifndef OPENDDS_NO_MULTI_TOPIC
    3142             : void
    3143           0 : DataReaderImpl::enable_multi_topic(MultiTopicImpl* mt)
    3144             : {
    3145           0 :   multi_topic_ = mt;
    3146           0 : }
    3147             : #endif
    3148             : 
    3149             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
    3150             : 
    3151             : void
    3152           0 : DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
    3153             : {
    3154           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    3155           0 :   disco->update_subscription_params(domain_id_,
    3156           0 :       dp_id_,
    3157           0 :       subscription_id_,
    3158             :       params);
    3159           0 : }
    3160             : #endif
    3161             : 
    3162             : void
    3163           0 : DataReaderImpl::reset_ownership(::DDS::InstanceHandle_t instance)
    3164             : {
    3165           0 :   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
    3166           0 :   for (WriterMapType::iterator iter = writers_.begin();
    3167           0 :       iter != writers_.end();
    3168           0 :       ++iter) {
    3169           0 :     iter->second->set_owner_evaluated(instance, false);
    3170             :   }
    3171           0 : }
    3172             : 
    3173             : void
    3174           0 : DataReaderImpl::resume_sample_processing(const GUID_t& pub_id)
    3175             : {
    3176           0 :   WriterInfo_rch info;
    3177             :   {
    3178           0 :     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
    3179           0 :     WriterMapType::iterator where = writers_.find(pub_id);
    3180           0 :     if (writers_.end() != where) {
    3181           0 :       info = where->second;
    3182             :     }
    3183           0 :   }
    3184             : 
    3185           0 :   if (info) {
    3186           0 :     OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
    3187             :     // Stop filtering these
    3188           0 :     if (info->check_end_historic_samples(end_historic_sweeper_.in(), to_deliver)) {
    3189           0 :       deliver_historic(to_deliver);
    3190           0 :       info->finished_delivering_historic();
    3191             :     }
    3192           0 :   }
    3193           0 : }
    3194             : 
    3195           0 : bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
    3196             : {
    3197           0 :   ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
    3198           0 :   WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
    3199           0 :   if (iter != writers_.end()) {
    3200           0 :     const SequenceNumber& seq = sample.header_.sequence_;
    3201           0 :     SequenceNumber last_historic_seq;
    3202           0 :     if (iter->second->check_historic(seq, sample, last_historic_seq)) {
    3203           0 :       return false;
    3204             :     }
    3205           0 :     if (last_historic_seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
    3206           0 :         && !sample.header_.historic_sample_
    3207           0 :         && seq <= last_historic_seq) {
    3208             :       // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
    3209           0 :       return false;
    3210             :     }
    3211             :   }
    3212           0 :   return true;
    3213           0 : }
    3214             : 
    3215           0 : void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
    3216             : {
    3217             :   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
    3218           0 :   const iter_t end = samples.end();
    3219           0 :   for (iter_t iter = samples.begin(); iter != end; ++iter) {
    3220           0 :     iter->second.header_.historic_sample_ = true;
    3221           0 :     data_received(iter->second);
    3222             :   }
    3223           0 : }
    3224             : 
    3225             : void
    3226           0 : DataReaderImpl::add_link(const DataLink_rch& link, const GUID_t& peer)
    3227             : {
    3228           0 :   if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
    3229             : 
    3230           0 :     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
    3231             : 
    3232           0 :     WriterMapType::iterator it = writers_.find(peer);
    3233           0 :     if (it != writers_.end()) {
    3234             :       // Schedule timer if necessary
    3235             :       //   - only need to check reader qos - we know the writer must be >= reader
    3236           0 :       end_historic_sweeper_->schedule_timer(it->second);
    3237             :     }
    3238           0 :   }
    3239           0 :   TransportClient::add_link(link, peer);
    3240           0 :   OPENDDS_STRING type;
    3241             :   {
    3242           0 :     TransportImpl_rch impl = link->impl();
    3243           0 :     if (impl) {
    3244           0 :       type = impl->transport_type();
    3245             :     }
    3246           0 :   }
    3247             : 
    3248           0 :   if (type == "rtps_udp" || type == "multicast") {
    3249           0 :     resume_sample_processing(peer);
    3250             :   }
    3251           0 : }
    3252             : 
    3253             : void
    3254           0 : DataReaderImpl::register_for_writer(const GUID_t& participant,
    3255             :                                     const GUID_t& readerid,
    3256             :                                     const GUID_t& writerid,
    3257             :                                     const TransportLocatorSeq& locators,
    3258             :                                     DiscoveryListener* listener)
    3259             : {
    3260           0 :   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
    3261           0 : }
    3262             : 
    3263             : void
    3264           0 : DataReaderImpl::unregister_for_writer(const GUID_t& participant,
    3265             :                                       const GUID_t& readerid,
    3266             :                                       const GUID_t& writerid)
    3267             : {
    3268           0 :   TransportClient::unregister_for_writer(participant, readerid, writerid);
    3269           0 : }
    3270             : 
    3271             : void
    3272           0 : DataReaderImpl::update_locators(const GUID_t& writerId,
    3273             :                                 const TransportLocatorSeq& locators)
    3274             : {
    3275             :   {
    3276           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
    3277           0 :     WriterMapType::const_iterator iter = writers_.find(writerId);
    3278           0 :     if (iter == writers_.end()) {
    3279           0 :       return;
    3280             :     }
    3281           0 :   }
    3282           0 :   TransportClient::update_locators(writerId, locators);
    3283             : }
    3284             : 
    3285             : WeakRcHandle<ICE::Endpoint>
    3286           0 : DataReaderImpl::get_ice_endpoint()
    3287             : {
    3288           0 :   return TransportClient::get_ice_endpoint();
    3289             : }
    3290             : 
    3291           0 : DDS::ReturnCode_t DataReaderImpl::setup_deserialization()
    3292             : {
    3293           0 :   bool xcdr1_mutable = false;
    3294           0 :   bool illegal_unaligned = false;
    3295           0 :   for (CORBA::ULong i = 0; i < qos_.representation.value.length(); ++i) {
    3296             :     Encoding::Kind encoding_kind;
    3297           0 :     if (repr_to_encoding_kind(qos_.representation.value[i], encoding_kind)) {
    3298           0 :       if (encoding_kind == Encoding::KIND_XCDR1 && type_support_->max_extensibility() == MUTABLE) {
    3299           0 :         xcdr1_mutable = true;
    3300           0 :       } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR && cdr_encapsulation()) {
    3301           0 :         illegal_unaligned = true;
    3302             :       } else {
    3303           0 :         decoding_modes_.insert(encoding_kind);
    3304             :       }
    3305           0 :     } else if (DCPS_debug_level) {
    3306           0 :       ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: "
    3307             :                  "DataReaderImpl::setup_deserialization: "
    3308             :                  "Encountered unsupported or unknown data representation: %C\n",
    3309             :                  repr_to_string(qos_.representation.value[i]).c_str()));
    3310             :     }
    3311             :   }
    3312           0 :   if (decoding_modes_.empty()) {
    3313           0 :     if (DCPS_debug_level) {
    3314           0 :       DCPS::String error_message;
    3315           0 :       if (xcdr1_mutable) {
    3316           0 :         error_message = " Unsupported combination of XCDR1 and mutable";
    3317           0 :       } else if (illegal_unaligned) {
    3318           0 :         error_message = " Unaligned CDR is not allowed in rtps_udp transport";
    3319             :       }
    3320           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: "
    3321             :                  "DataReaderImpl::setup_deserialization: "
    3322             :                  "Could not find a valid data representation.%C\n",
    3323             :                  error_message.c_str()));
    3324           0 :     }
    3325           0 :     return DDS::RETCODE_ERROR;
    3326             :   }
    3327           0 :   if (DCPS_debug_level >= 2) {
    3328           0 :     OPENDDS_STRING encodings;
    3329           0 :     EncodingKinds::iterator it = decoding_modes_.begin();
    3330           0 :     for (; it != decoding_modes_.end(); ++it) {
    3331           0 :       if (!encodings.empty()) {
    3332           0 :         encodings += ", ";
    3333             :       }
    3334           0 :       encodings += Encoding::kind_to_string(*it);
    3335             :     }
    3336           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::setup_deserialization: "
    3337             :                "Setup successfully with the following data representation%C: %C\n",
    3338             :                encodings.size() != 1 ? "s" : "",
    3339             :                encodings.c_str()));
    3340           0 :   }
    3341             : 
    3342           0 :   return DDS::RETCODE_OK;
    3343             : }
    3344             : 
    3345           0 : void DataReaderImpl::accept_sample_processing(const SubscriptionInstance_rch& instance,
    3346             :                                               const DataSampleHeader& header,
    3347             :                                               bool is_new_instance)
    3348             : {
    3349           0 :   bool accepted = true;
    3350             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    3351           0 :   bool verify_coherent = false;
    3352             : #endif
    3353           0 :   WriterInfo_rch writer;
    3354             : 
    3355           0 :   if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
    3356           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
    3357             : 
    3358           0 :     WriterMapType::iterator where = writers_.find(header.publication_id_);
    3359             : 
    3360           0 :     if (where != writers_.end()) {
    3361           0 :       if (header.coherent_change_) {
    3362             : 
    3363             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    3364             :         // Received coherent change
    3365           0 :         where->second->coherent_change(header.group_coherent_, header.publisher_id_);
    3366           0 :         verify_coherent = true;
    3367             : #endif
    3368           0 :         writer = where->second;
    3369             :       }
    3370             :     }
    3371             :     else {
    3372           0 :       ACE_DEBUG((LM_WARNING,
    3373             :         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::accept_sample_processing - ")
    3374             :         ACE_TEXT("subscription %C failed to find ")
    3375             :         ACE_TEXT("publication data for %C.\n"),
    3376             :         LogGuid(get_guid()).c_str(),
    3377             :         LogGuid(header.publication_id_).c_str()));
    3378             :     }
    3379           0 :   }
    3380             : 
    3381             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    3382           0 :   if (verify_coherent) {
    3383           0 :     accepted = verify_coherent_changes_completion(writer.in());
    3384             :   }
    3385             : #endif
    3386             : 
    3387           0 :   if (instance && deadline_queue_enabled_) {
    3388           0 :     instance->last_sample_tv_ = instance->cur_sample_tv_;
    3389           0 :     instance->cur_sample_tv_.set_to_now();
    3390             : 
    3391           0 :     if (is_new_instance) {
    3392           0 :       schedule_deadline(instance, false);
    3393             :     } else {
    3394           0 :       process_deadline(instance, MonotonicTimePoint::now(), false);
    3395             :     }
    3396             :   }
    3397             : 
    3398           0 :   if (accepted) {
    3399           0 :     notify_read_conditions();
    3400             :   }
    3401           0 : }
    3402             : 
    3403             : #if defined(OPENDDS_SECURITY)
    3404           0 : DDS::Security::ParticipantCryptoHandle DataReaderImpl::get_crypto_handle() const
    3405             : {
    3406           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
    3407           0 :   return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
    3408           0 : }
    3409             : #endif
    3410             : 
    3411           0 : EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
    3412             :                                                                  ACE_thread_t owner,
    3413           0 :                                                                  DataReaderImpl* reader)
    3414             :   : ReactorInterceptor (reactor, owner)
    3415           0 :   , reader_(*reader)
    3416           0 : { }
    3417             : 
    3418           0 : EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
    3419           0 : { }
    3420             : 
    3421           0 : void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
    3422             : {
    3423           0 :   info->waiting_for_end_historic_samples(true);
    3424           0 :   execute_or_enqueue(make_rch<ScheduleCommand>(this, ref(info)));
    3425           0 : }
    3426             : 
    3427           0 : void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
    3428             : {
    3429           0 :   info->waiting_for_end_historic_samples(false);
    3430           0 :   execute_or_enqueue(make_rch<CancelCommand>(this, ref(info)));
    3431           0 : }
    3432             : 
    3433           0 : int EndHistoricSamplesMissedSweeper::handle_timeout(
    3434             :     const ACE_Time_Value& ,
    3435             :     const void* arg)
    3436             : {
    3437           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    3438             : 
    3439           0 :   WriterInfo* const info =
    3440             :     const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
    3441           0 :   const GUID_t pub_id = info->writer_id();
    3442             : 
    3443             :   {
    3444           0 :     ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
    3445           0 :     info_set_.erase(rchandle_from(info));
    3446           0 :   }
    3447             : 
    3448           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
    3449           0 :   if (!reader)
    3450           0 :     return 0;
    3451             : 
    3452           0 :   if (DCPS_debug_level >= 1) {
    3453           0 :     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
    3454             :                LogGuid(reader->get_guid()).c_str(),
    3455             :                LogGuid(pub_id).c_str()));
    3456             :   }
    3457             : 
    3458           0 :   reader->resume_sample_processing(pub_id);
    3459           0 :   return 0;
    3460           0 : }
    3461             : 
    3462           0 : void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
    3463             : {
    3464           0 :   static const ACE_Time_Value ten_seconds(10);
    3465           0 :   info_->schedule_historic_samples_timer(sweeper_, ten_seconds);
    3466           0 :   const bool insert_result = sweeper_->info_set_.insert(info_).second;
    3467             : 
    3468           0 :   if (insert_result && DCPS_debug_level) {
    3469           0 :     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - sweeper %@ is now scheduled\n", info_.in()));
    3470             :   }
    3471           0 : }
    3472             : 
    3473           0 : void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
    3474             : {
    3475           0 :   info_->cancel_historic_samples_timer(sweeper_);
    3476           0 :   const bool erase_result = sweeper_->info_set_.erase(info_) > 0;
    3477             : 
    3478           0 :   if (erase_result && DCPS_debug_level) {
    3479           0 :     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - sweeper %@ is no longer scheduled\n", info_.in()));
    3480             :   }
    3481           0 : }
    3482             : 
    3483           0 : void DataReaderImpl::transport_discovery_change()
    3484             : {
    3485           0 :   populate_connection_info();
    3486           0 :   const TransportLocatorSeq& trans_conf_info = connection_info();
    3487           0 :   const GUID_t dp_id_copy = dp_id_;
    3488           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    3489           0 :   disco->update_subscription_locators(domain_id_,
    3490             :                                       dp_id_copy,
    3491           0 :                                       get_guid(),
    3492             :                                       trans_conf_info);
    3493           0 : }
    3494             : 
    3495           0 : void DataReaderImpl::OnDataOnReaders::execute()
    3496             : {
    3497           0 :   RcHandle<SubscriberImpl> subscriber = subscriber_.lock();
    3498           0 :   RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
    3499           0 :   if (!subscriber || !data_reader) {
    3500           0 :     return;
    3501             :   }
    3502             : 
    3503           0 :   if (set_reader_status_) {
    3504           0 :     data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
    3505             :   }
    3506           0 :   subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
    3507             : 
    3508           0 :   if (call_) {
    3509           0 :     sub_listener_->on_data_on_readers(subscriber.in());
    3510             :   }
    3511           0 : }
    3512             : 
    3513           0 : void DataReaderImpl::OnDataAvailable::execute()
    3514             : {
    3515           0 :   RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
    3516             : 
    3517           0 :   if (data_reader && set_reader_status_) {
    3518           0 :     data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
    3519             :   }
    3520             : 
    3521           0 :   if (data_reader && set_subscriber_status_) {
    3522           0 :     RcHandle<SubscriberImpl> subscriber = data_reader->get_subscriber_servant();
    3523           0 :     if (subscriber) {
    3524           0 :       subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
    3525             :     }
    3526           0 :   }
    3527             : 
    3528           0 :   if (call_ && data_reader) {
    3529           0 :     listener_->on_data_available(data_reader.in());
    3530             :   }
    3531           0 : }
    3532             : 
    3533           0 : void DataReaderImpl::initialize_lookup_maps()
    3534             : {
    3535             :   // These all start at 1 (0 mask is bogus) and include the full mask (any)
    3536           0 :   for (CORBA::ULong is = 1; is <= MAX_SAMPLE_STATE_MASK; ++is) {
    3537           0 :     for (CORBA::ULong iv = 1; iv <= MAX_VIEW_STATE_MASK; ++iv) {
    3538           0 :       for (CORBA::ULong ii = 1; ii <= MAX_INSTANCE_STATE_MASK; ++ii) {
    3539           0 :         combined_state_lookup_[to_combined_states(is, iv, ii)] = HandleSet();
    3540             :       }
    3541             :     }
    3542             :   }
    3543             :   // catch-all for "bogus" lookups
    3544           0 :   combined_state_lookup_[0] = HandleSet();
    3545           0 : }
    3546             : 
    3547           0 : void DataReaderImpl::update_lookup_maps(const SubscriptionInstanceMapType::iterator& input)
    3548             : {
    3549           0 :   for (LookupMap::iterator it = combined_state_lookup_.begin(); it != combined_state_lookup_.end(); ++it) {
    3550           0 :     if (it->first == 0) continue;
    3551             :     CORBA::ULong sample_states, view_states, instance_states;
    3552           0 :     split_combined_states(it->first, sample_states, view_states, instance_states);
    3553           0 :     if (input->second->matches(sample_states, view_states, instance_states)) {
    3554           0 :       it->second.insert(input->first);
    3555             :     } else {
    3556           0 :       it->second.erase(input->first);
    3557             :     }
    3558             :   }
    3559           0 : }
    3560             : 
    3561           0 : void DataReaderImpl::remove_from_lookup_maps(DDS::InstanceHandle_t handle)
    3562             : {
    3563           0 :   for (LookupMap::iterator it = combined_state_lookup_.begin(), the_end = combined_state_lookup_.end(); it != the_end; ++it) {
    3564           0 :     if (it->first == 0) continue;
    3565           0 :     it->second.erase(handle);
    3566             :   }
    3567           0 : }
    3568             : 
    3569           0 : const DataReaderImpl::HandleSet& DataReaderImpl::lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
    3570             : {
    3571           0 :   const CORBA::ULong combined_states = to_combined_states(sample_states, view_states, instance_states);
    3572           0 :   LookupMap::const_iterator ci = combined_state_lookup_.find(combined_states);
    3573           0 :   OPENDDS_ASSERT(ci != combined_state_lookup_.end());
    3574           0 :   return ci->second;
    3575             : }
    3576             : 
    3577           0 : void DataReaderImpl::schedule_deadline(SubscriptionInstance_rch instance,
    3578             :                                        bool timer_called)
    3579             : {
    3580             :   // Should be called with sample_lock_.
    3581           0 :   if (instance->deadline_ == MonotonicTimePoint::zero_value) {
    3582           0 :     instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
    3583           0 :     const bool schedule = deadline_queue_.empty();
    3584           0 :     deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
    3585           0 :     if (!timer_called) {
    3586           0 :       if (schedule) {
    3587           0 :         deadline_task_->schedule(deadline_period_);
    3588           0 :       } else if (deadline_queue_.begin()->second == instance) {
    3589             :         // Moved to front.
    3590           0 :         deadline_task_->cancel();
    3591           0 :         deadline_task_->schedule(deadline_period_);
    3592             :       }
    3593             :     }
    3594             :   }
    3595           0 : }
    3596             : 
    3597           0 : void DataReaderImpl::cancel_deadline(SubscriptionInstance_rch instance)
    3598             : {
    3599             :   // Should be called with sample_lock_.
    3600           0 :   if (instance->deadline_ != MonotonicTimePoint::zero_value) {
    3601           0 :     for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
    3602           0 :       if (pos->second == instance) {
    3603           0 :         deadline_queue_.erase(pos);
    3604           0 :         break;
    3605             :       }
    3606             :     }
    3607           0 :     instance->deadline_ = MonotonicTimePoint::zero_value;
    3608             :   }
    3609           0 : }
    3610             : 
    3611           0 : void DataReaderImpl::process_deadline(SubscriptionInstance_rch instance,
    3612             :                                       const MonotonicTimePoint& now,
    3613             :                                       bool timer_called)
    3614             : {
    3615             :   // Should be called with sample_lock_.
    3616             : 
    3617           0 :   if (instance->deadline_ != MonotonicTimePoint::zero_value) {
    3618           0 :     bool missed = false;
    3619             : 
    3620           0 :     if (instance->cur_sample_tv_.is_zero()) { // not received any sample.
    3621           0 :       missed = true;
    3622             : 
    3623           0 :     } else if (timer_called) { // handle_timeout is called
    3624           0 :       missed = (now - instance->cur_sample_tv_) >= deadline_period_;
    3625             : 
    3626             :     } else { // upon receiving sample.
    3627           0 :       missed = (instance->cur_sample_tv_ - instance->last_sample_tv_) > deadline_period_;
    3628             :     }
    3629             : 
    3630           0 :     if (missed) {
    3631           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, sample_lock_);
    3632             :       // Only update the status upon timer is called and not
    3633             :       // when receiving a sample after the interval.
    3634             :       // Otherwise the counter is doubled.
    3635           0 :       if (timer_called) {
    3636           0 :         ++requested_deadline_missed_status_.total_count;
    3637           0 :         requested_deadline_missed_status_.total_count_change =
    3638           0 :           requested_deadline_missed_status_.total_count - last_deadline_missed_total_count_;
    3639           0 :         requested_deadline_missed_status_.last_instance_handle = instance->instance_handle_;
    3640             : 
    3641           0 :         set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
    3642             : 
    3643           0 :         DDS::DataReaderListener_var listener = listener_for(DDS::REQUESTED_DEADLINE_MISSED_STATUS);
    3644             : 
    3645             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    3646           0 :         if (instance->instance_state_->is_exclusive()) {
    3647           0 :           DataReaderImpl::OwnershipManagerPtr owner_manager = ownership_manager();
    3648           0 :           if (owner_manager)
    3649           0 :             owner_manager->remove_writers (instance->instance_handle_);
    3650           0 :         }
    3651             : #endif
    3652             : 
    3653           0 :         if (!CORBA::is_nil(listener.in())) {
    3654             :           // Copy before releasing the lock.
    3655           0 :           DDS::RequestedDeadlineMissedStatus const status = requested_deadline_missed_status_;
    3656             : 
    3657             :           // Release the lock during the upcall.
    3658           0 :           ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    3659             :           // @todo Will this operation ever throw?  If so we may want to
    3660             :           //       catch all exceptions, and act accordingly.
    3661           0 :           listener->on_requested_deadline_missed(this, status);
    3662             : 
    3663             :           // We need to update the last total count value to our current total
    3664             :           // so that the next time we will calculate the correct total_count_change;
    3665           0 :           last_deadline_missed_total_count_ = requested_deadline_missed_status_.total_count;
    3666           0 :         }
    3667             : 
    3668           0 :         notify_status_condition();
    3669           0 :       }
    3670           0 :     }
    3671             : 
    3672             :     // This next part is without status_lock_ held to avoid reactor deadlock.
    3673           0 :     if (timer_called) {
    3674           0 :       instance->deadline_ = MonotonicTimePoint::zero_value;
    3675           0 :       schedule_deadline(instance, timer_called);
    3676             :     } else {
    3677           0 :       cancel_deadline(instance);
    3678           0 :       schedule_deadline(instance, timer_called);
    3679             :     }
    3680             :   }
    3681             : }
    3682             : 
    3683           0 : void DataReaderImpl::cancel_all_deadlines()
    3684             : {
    3685           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3686           0 :   deadline_queue_.clear();
    3687           0 :   deadline_task_->cancel();
    3688           0 : }
    3689             : 
    3690           0 : void DataReaderImpl::reset_deadline_period(const TimeDuration& deadline_period)
    3691             : {
    3692           0 :   if (deadline_period_ != deadline_period) {
    3693           0 :     deadline_period_ = deadline_period;
    3694             : 
    3695           0 :     if (deadline_queue_enabled_) {
    3696           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
    3697           0 :       const MonotonicTimePoint now = MonotonicTimePoint::now();
    3698           0 :       for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
    3699           0 :            iter != this->instances_.end();
    3700           0 :            ++iter) {
    3701           0 :         if (iter->second->deadline_ != MonotonicTimePoint::zero_value) {
    3702           0 :           reschedule_deadline(iter->second, now);
    3703             :         }
    3704             :       }
    3705           0 :     }
    3706             :   }
    3707             : }
    3708             : 
    3709           0 : void DataReaderImpl::reschedule_deadline(SubscriptionInstance_rch instance,
    3710             :                                          const MonotonicTimePoint& now)
    3711             : {
    3712           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3713             : 
    3714             :   // So the datareader can call back into us.
    3715           0 :   if (instance->deadline_ != MonotonicTimePoint::zero_value) {
    3716             : 
    3717             :     // Remove.
    3718           0 :     for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
    3719           0 :       if (pos->second == instance) {
    3720           0 :         deadline_queue_.erase(pos);
    3721           0 :         break;
    3722             :       }
    3723             :     }
    3724             : 
    3725           0 :     instance->deadline_ = now + (deadline_period_ - (instance->deadline_ - now));
    3726             : 
    3727           0 :     const bool schedule = deadline_queue_.empty();
    3728           0 :     deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
    3729           0 :     if (schedule) {
    3730           0 :       deadline_task_->schedule(deadline_period_);
    3731           0 :     } else if (deadline_queue_.begin()->second == instance) {
    3732             :       // Moved to front.
    3733           0 :       deadline_task_->cancel();
    3734           0 :       deadline_task_->schedule(deadline_period_);
    3735             :     }
    3736             :   }
    3737           0 : }
    3738             : 
    3739           0 : void DataReaderImpl::deadline_task(const MonotonicTimePoint& now)
    3740             : {
    3741           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    3742             : 
    3743           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    3744           0 :   for (DeadlineQueue::iterator pos = deadline_queue_.begin(), limit = deadline_queue_.end(); pos != limit && pos->first <= now;) {
    3745           0 :     SubscriptionInstance_rch instance = pos->second;
    3746           0 :     deadline_queue_.erase(pos++);
    3747             :     // pos is no longer valid.
    3748           0 :     process_deadline(instance, now, true);
    3749           0 :   }
    3750             : 
    3751           0 :   if (!deadline_queue_.empty()) {
    3752           0 :     deadline_task_->schedule(deadline_queue_.begin()->first - now);
    3753             :   }
    3754           0 : }
    3755             : 
    3756             : } // namespace DCPS
    3757             : } // namespace OpenDDS
    3758             : 
    3759             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16