DataReaderImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataReaderImpl.h"
00010 #include "tao/ORB_Core.h"
00011 #include "SubscriptionInstance.h"
00012 #include "ReceivedDataElementList.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "FeatureDisabledQosCheck.h"
00017 #include "GuidConverter.h"
00018 #include "TopicImpl.h"
00019 #include "Serializer.h"
00020 #include "SubscriberImpl.h"
00021 #include "Transient_Kludge.h"
00022 #include "Util.h"
00023 #include "RequestedDeadlineWatchdog.h"
00024 #include "QueryConditionImpl.h"
00025 #include "ReadConditionImpl.h"
00026 #include "MonitorFactory.h"
00027 #include "dds/DCPS/transport/framework/EntryExit.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 #include "dds/DdsDcpsCoreC.h"
00030 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00031 #include "dds/DCPS/SafetyProfileStreams.h"
00032 #if !defined (DDS_HAS_MINIMUM_BIT)
00033 #include "BuiltInTopicUtils.h"
00034 #include "dds/DdsDcpsCoreTypeSupportC.h"
00035 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00036 
00037 #include "ace/Reactor.h"
00038 #include "ace/Auto_Ptr.h"
00039 #include "ace/OS_NS_sys_time.h"
00040 
00041 #include <cstdio>
00042 #include <stdexcept>
00043 
00044 #if !defined (__ACE_INLINE__)
00045 # include "DataReaderImpl.inl"
00046 #endif /* !__ACE_INLINE__ */
00047 
00048 namespace OpenDDS {
00049 namespace DCPS {
00050 
00051 DataReaderImpl::DataReaderImpl()
00052 : rd_allocator_(0),
00053   qos_(TheServiceParticipant->initial_DataReaderQos()),
00054   reverse_sample_lock_(sample_lock_),
00055   participant_servant_(0),
00056   topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058   is_exclusive_ownership_ (false),
00059 #endif
00060 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00061   owner_manager_ (0),
00062 #endif
00063     coherent_(false),
00064     subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00065     topic_desc_(0),
00066     listener_mask_(DEFAULT_STATUS_MASK),
00067     domain_id_(0),
00068     subscriber_servant_(0),
00069     end_historic_sweeper_(new EndHistoricSamplesMissedSweeper(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00070     remove_association_sweeper_(new RemoveAssociationSweeper<DataReaderImpl>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071     n_chunks_(TheServiceParticipant->n_chunks()),
00072     reverse_pub_handle_lock_(publication_handle_lock_),
00073     reactor_(0),
00074     liveliness_timer_(new LivelinessTimer(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00075     last_deadline_missed_total_count_(0),
00076     watchdog_(),
00077     is_bit_(false),
00078     initialized_(false),
00079     always_get_history_(false),
00080     statistics_enabled_(false),
00081     raw_latency_buffer_size_(0),
00082     raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00083     monitor_(0),
00084     periodic_monitor_(0),
00085     transport_disabled_(false)
00086 {
00087   reactor_ = TheServiceParticipant->timer();
00088 
00089   liveliness_changed_status_.alive_count = 0;
00090   liveliness_changed_status_.not_alive_count = 0;
00091   liveliness_changed_status_.alive_count_change = 0;
00092   liveliness_changed_status_.not_alive_count_change = 0;
00093   liveliness_changed_status_.last_publication_handle =
00094       DDS::HANDLE_NIL;
00095 
00096   requested_deadline_missed_status_.total_count = 0;
00097   requested_deadline_missed_status_.total_count_change = 0;
00098   requested_deadline_missed_status_.last_instance_handle =
00099       DDS::HANDLE_NIL;
00100 
00101   requested_incompatible_qos_status_.total_count = 0;
00102   requested_incompatible_qos_status_.total_count_change = 0;
00103   requested_incompatible_qos_status_.last_policy_id = 0;
00104   requested_incompatible_qos_status_.policies.length(0);
00105 
00106   subscription_match_status_.total_count = 0;
00107   subscription_match_status_.total_count_change = 0;
00108   subscription_match_status_.current_count = 0;
00109   subscription_match_status_.current_count_change = 0;
00110   subscription_match_status_.last_publication_handle =
00111       DDS::HANDLE_NIL;
00112 
00113   sample_lost_status_.total_count = 0;
00114   sample_lost_status_.total_count_change = 0;
00115 
00116   sample_rejected_status_.total_count = 0;
00117   sample_rejected_status_.total_count_change = 0;
00118   sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00119   sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00120 
00121   this->budget_exceeded_status_.total_count = 0;
00122   this->budget_exceeded_status_.total_count_change = 0;
00123   this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00124 
00125   monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00126   periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00127 }
00128 
00129 // This method is called when there are no longer any reference to the
00130 // the servant.
00131 DataReaderImpl::~DataReaderImpl()
00132 {
00133   DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00134 
00135   {
00136     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00137                    read_guard,
00138                    this->writers_lock_);
00139     // Cancel any uncancelled sweeper timers to decrement reference count.
00140     WriterMapType::iterator writer;
00141     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00142       end_historic_sweeper_->cancel_timer(writer->second);
00143       remove_association_sweeper_->cancel_timer(writer->second);
00144     }
00145   }
00146 
00147   end_historic_sweeper_->wait();
00148   end_historic_sweeper_->destroy();
00149 
00150   remove_association_sweeper_->wait();
00151   remove_association_sweeper_->destroy();
00152 
00153   liveliness_timer_->cancel_timer();
00154   liveliness_timer_->wait();
00155   liveliness_timer_->destroy();
00156 
00157   if (initialized_) {
00158     delete rd_allocator_;
00159   }
00160 }
00161 
00162 // this method is called when delete_datareader is called.
00163 void
00164 DataReaderImpl::cleanup()
00165 {
00166   {
00167     // Is this lock necessary?
00168     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00169         guard,
00170         this->sample_lock_);
00171 
00172     liveliness_timer_->cancel_timer();
00173   }
00174   liveliness_timer_->wait();
00175 
00176   // Cancel any watchdog timers
00177   { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00178   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00179       iter != instances_.end();
00180       ++iter) {
00181     SubscriptionInstance *ptr = iter->second;
00182     if (this->watchdog_ && ptr->deadline_timer_id_ != -1) {
00183       this->watchdog_->cancel_timer(ptr);
00184     }
00185   }
00186   }
00187 
00188 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00189   if (owner_manager_) {
00190     owner_manager_->unregister_reader(topic_servant_->type_name(), this);
00191   }
00192 #endif
00193 
00194   if (topic_servant_) {
00195     topic_servant_->remove_entity_ref();
00196     topic_servant_->_remove_ref();
00197   }
00198 
00199   dr_local_objref_ = DDS::DataReader::_nil();
00200 
00201 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00202   if (!CORBA::is_nil(content_filtered_topic_.in())) {
00203     ContentFilteredTopicImpl* cft =
00204         dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in());
00205     cft->remove_reader(*this);
00206     cft->update_reader_count(false);
00207     content_filtered_topic_ = DDS::ContentFilteredTopic::_nil ();
00208   }
00209 #endif
00210 
00211   {
00212     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00213                    read_guard,
00214                    this->writers_lock_);
00215     // Cancel any uncancelled sweeper timers
00216     WriterMapType::iterator writer;
00217     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00218       end_historic_sweeper_->cancel_timer(writer->second);
00219       remove_association_sweeper_->cancel_timer(writer->second);
00220     }
00221   }
00222 
00223   end_historic_sweeper_->wait();
00224   remove_association_sweeper_->wait();
00225 }
00226 
00227 void DataReaderImpl::init(
00228     TopicDescriptionImpl* a_topic_desc,
00229     const DDS::DataReaderQos &  qos,
00230     DDS::DataReaderListener_ptr a_listener,
00231     const DDS::StatusMask &     mask,
00232     DomainParticipantImpl*        participant,
00233     SubscriberImpl*               subscriber,
00234     DDS::DataReader_ptr         dr_objref)
00235 {
00236   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00237   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00238     topic_servant_ = a_topic;
00239     topic_servant_->_add_ref();
00240 
00241     topic_servant_->add_entity_ref();
00242   }
00243 
00244   CORBA::String_var topic_name = a_topic_desc->get_name();
00245 
00246 #if !defined (DDS_HAS_MINIMUM_BIT)
00247   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00248       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00249       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00250       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00251 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00252 
00253   qos_ = qos;
00254 
00255 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00256   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00257 #endif
00258 
00259   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00260   listener_mask_ = mask;
00261 
00262   // Only store the participant pointer, since it is our "grand"
00263   // parent, we will exist as long as it does
00264   participant_servant_ = participant;
00265 
00266 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00267   if (is_exclusive_ownership_) {
00268     owner_manager_ = participant_servant_->ownership_manager ();
00269   }
00270 #endif
00271 
00272   domain_id_ = participant_servant_->get_domain_id();
00273 
00274   // Only store the subscriber pointer, since it is our parent, we
00275   // will exist as long as it does.
00276   subscriber_servant_ = subscriber;
00277   dr_local_objref_    = DDS::DataReader::_duplicate(dr_objref);
00278 
00279   if (this->subscriber_servant_->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00280     ACE_DEBUG((LM_WARNING,
00281         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00282         ACE_TEXT("failed to get SubscriberQos\n")));
00283   }
00284 
00285   initialized_ = true;
00286 }
00287 
00288 DDS::InstanceHandle_t
00289 DataReaderImpl::get_instance_handle()
00290 {
00291   return this->participant_servant_->id_to_handle(subscription_id_);
00292 }
00293 
00294 void
00295 DataReaderImpl::add_association(const RepoId& yourId,
00296     const WriterAssociation& writer,
00297     bool active)
00298 {
00299   if (DCPS_debug_level) {
00300     GuidConverter reader_converter(yourId);
00301     GuidConverter writer_converter(writer.writerId);
00302     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00303         ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00304         OPENDDS_STRING(reader_converter).c_str(),
00305         OPENDDS_STRING(writer_converter).c_str()));
00306   }
00307 
00308   if (entity_deleted_.value()) {
00309     if (DCPS_debug_level) {
00310       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00311           ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00312     }
00313     return;
00314   }
00315 
00316   // We are being called back from the repository before we are done
00317   // processing after our call to the repository that caused this call
00318   // (from the repository) to be made.
00319   if (GUID_UNKNOWN == subscription_id_) {
00320     subscription_id_ = yourId;
00321   }
00322 
00323   //Why do we need the publication_handle_lock_ here?  No access to id_to_handle_map_...
00324   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00325 
00326 
00327   // For each writer in the list of writers to associate with, we
00328   // create a WriterInfo and a WriterStats object and store them in
00329   // our internal maps.
00330   //
00331   {
00332 
00333     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00334 
00335     const PublicationId& writer_id = writer.writerId;
00336     RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337     std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00338         // This insertion is idempotent.
00339         WriterMapType::value_type(
00340           writer_id,
00341           info));
00342 
00343       // Schedule timer if necessary
00344       //   - only need to check reader qos - we know the writer must be >= reader
00345       if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00346         info->waiting_for_end_historic_samples_ = true;
00347       }
00348 
00349       this->statistics_.insert(
00350         StatsMapType::value_type(
00351             writer_id,
00352             WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00353 
00354     // If this is a durable reader
00355     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00356       // TODO schedule timer for removing flag from writers
00357     }
00358 
00359     if (DCPS_debug_level > 4) {
00360       GuidConverter converter(writer_id);
00361       ACE_DEBUG((LM_DEBUG,
00362           "(%P|%t) DataReaderImpl::add_association: "
00363           "inserted writer %C.return %d \n",
00364           OPENDDS_STRING(converter).c_str(), bpair.second));
00365 
00366       WriterMapType::iterator iter = writers_.find(writer_id);
00367       if (iter != writers_.end()) {
00368         // This may not be an error since it could happen that the sample
00369         // is delivered to the datareader after the write is dis-associated
00370         // with this datareader.
00371         GuidConverter reader_converter(subscription_id_);
00372         GuidConverter writer_converter(writer_id);
00373         ACE_DEBUG((LM_DEBUG,
00374             ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00375             ACE_TEXT("reader %C is associated with writer %C.\n"),
00376             OPENDDS_STRING(reader_converter).c_str(),
00377             OPENDDS_STRING(writer_converter).c_str()));
00378       }
00379     }
00380   }
00381 
00382   // Propagate the add_associations processing down into the Transport
00383   // layer here.  This will establish the transport support and reserve
00384   // usage of an existing connection or initiate creation of a new
00385   // connection if no suitable connection is available.
00386   AssociationData data;
00387   data.remote_id_ = writer.writerId;
00388   data.remote_data_ = writer.writerTransInfo;
00389   data.publication_transport_priority_ =
00390       writer.writerQos.transport_priority.value;
00391   data.remote_reliable_ =
00392       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00393   data.remote_durable_ =
00394       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00395 
00396   //Do not hold publication_handle_lock_ when calling associate due to possible reactor
00397   //deadlock on passive side completion
00398   //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_
00399   //is held here anyway
00400   guard.release();
00401 
00402   if (!associate(data, active)) {
00403     if (DCPS_debug_level) {
00404       ACE_DEBUG((LM_ERROR,
00405           ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00406           ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00407     }
00408   }
00409 }
00410 
00411 void
00412 DataReaderImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00413 {
00414   if (!(flags & ASSOC_OK)) {
00415     if (DCPS_debug_level) {
00416       const GuidConverter conv(remote_id);
00417       ACE_DEBUG((LM_ERROR,
00418           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00419           ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00420           OPENDDS_STRING(conv).c_str()));
00421     }
00422     return;
00423   }
00424 
00425   const bool active = flags & ASSOC_ACTIVE;
00426   {
00427 
00428     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00429 
00430     // LIVELINESS policy timers are managed here.
00431     if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00432       if (DCPS_debug_level >= 5) {
00433         GuidConverter converter(subscription_id_);
00434         ACE_DEBUG((LM_DEBUG,
00435             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00436             ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00437             OPENDDS_STRING(converter).c_str()));
00438       }
00439       // this call will start the timer if it is not already set
00440       liveliness_timer_->check_liveliness();
00441     }
00442   }
00443   // We no longer hold the publication_handle_lock_.
00444 
00445   if (!is_bit_) {
00446 
00447     DDS::InstanceHandle_t handle = participant_servant_->id_to_handle(remote_id);
00448 
00449     // We acquire the publication_handle_lock_ for the remainder of our
00450     // processing.
00451     {
00452       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00453 
00454       // This insertion is idempotent.
00455       id_to_handle_map_.insert(
00456           RepoIdToHandleMap::value_type(remote_id, handle));
00457 
00458       if (DCPS_debug_level > 4) {
00459         GuidConverter converter(remote_id);
00460         ACE_DEBUG((LM_DEBUG,
00461             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00462             ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00463             OPENDDS_STRING(converter).c_str(),
00464             handle));
00465       }
00466 
00467       // We need to adjust these after the insertions have all completed
00468       // since insertions are not guaranteed to increase the number of
00469       // currently matched publications.
00470       const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00471       subscription_match_status_.current_count_change =
00472           matchedPublications - subscription_match_status_.current_count;
00473       subscription_match_status_.current_count = matchedPublications;
00474 
00475       ++subscription_match_status_.total_count;
00476       ++subscription_match_status_.total_count_change;
00477 
00478       subscription_match_status_.last_publication_handle = handle;
00479 
00480       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00481 
00482       DDS::DataReaderListener_var listener =
00483           listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00484 
00485       if (!CORBA::is_nil(listener)) {
00486         listener->on_subscription_matched(dr_local_objref_,
00487             subscription_match_status_);
00488 
00489         // TBD - why does the spec say to change this but not change
00490         //       the ChangeFlagStatus after a listener call?
00491 
00492         // Client will look at it so next time it looks the change should be 0
00493         subscription_match_status_.total_count_change = 0;
00494         subscription_match_status_.current_count_change = 0;
00495       }
00496 
00497       notify_status_condition();
00498     }
00499 
00500     {
00501       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00502       ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00503 
00504       if(!writers_.count(remote_id)){
00505         return;
00506       }
00507       writers_[remote_id]->handle_ = handle;
00508     }
00509   }
00510 
00511   if (!active) {
00512     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00513 
00514     disco->association_complete(domain_id_, participant_servant_->get_id(),
00515         subscription_id_, remote_id);
00516   }
00517 
00518   if (monitor_) {
00519     monitor_->report();
00520   }
00521 }
00522 
00523 void
00524 DataReaderImpl::association_complete(const RepoId& /*remote_id*/)
00525 {
00526   // For the current DCPSInfoRepo implementation, the DataReader side will
00527   // always be passive, so association_complete() will not be called.
00528 }
00529 
00530 void
00531 DataReaderImpl::remove_associations(const WriterIdSeq& writers,
00532     bool notify_lost)
00533 {
00534   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00535 
00536   if (writers.length() == 0) {
00537     return;
00538   }
00539 
00540   if (DCPS_debug_level >= 1) {
00541     GuidConverter reader_converter(subscription_id_);
00542     GuidConverter writer_converter(writers[0]);
00543     ACE_DEBUG((LM_DEBUG,
00544         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00545         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00546         is_bit_,
00547         OPENDDS_STRING(reader_converter).c_str(),
00548         OPENDDS_STRING(writer_converter).c_str(),
00549         writers.length()));
00550   }
00551   if (!this->entity_deleted_.value()) {
00552     // stop pending associations for these writer ids
00553     this->stop_associating(writers.get_buffer(), writers.length());
00554 
00555     // writers which are considered non-active and can
00556     // be removed immediately
00557     WriterIdSeq non_active_writers;
00558     {
00559       CORBA::ULong wr_len = writers.length();
00560       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00561 
00562       for (CORBA::ULong i = 0; i < wr_len; i++) {
00563         PublicationId writer_id = writers[i];
00564 
00565         WriterMapType::iterator it = this->writers_.find(writer_id);
00566         if (it != this->writers_.end() &&
00567             it->second->active(TheServiceParticipant->pending_timeout())) {
00568           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00569         } else {
00570           push_back(non_active_writers, writer_id);
00571         }
00572       }
00573     }
00574     remove_associations_i(non_active_writers, notify_lost);
00575   } else {
00576     remove_associations_i(writers, notify_lost);
00577   }
00578 }
00579 
00580 void
00581 DataReaderImpl::remove_or_reschedule(const PublicationId& pub_id)
00582 {
00583   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584   WriterMapType::iterator where = writers_.find(pub_id);
00585   if (writers_.end() != where) {
00586     WriterInfo& info = *where->second;
00587     WriterIdSeq writers;
00588     push_back(writers, pub_id);
00589     bool notify = info.notify_lost_;
00590     if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591       write_guard.release();
00592       remove_associations_i(writers, notify);
00593     } else {
00594       write_guard.release();
00595       remove_associations(writers, notify);
00596     }
00597   }
00598 }
00599 
00600 void
00601 DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
00602     bool notify_lost)
00603 {
00604   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00605 
00606   if (writers.length() == 0) {
00607     return;
00608   }
00609 
00610   if (DCPS_debug_level >= 1) {
00611     GuidConverter reader_converter(subscription_id_);
00612     GuidConverter writer_converter(writers[0]);
00613     ACE_DEBUG((LM_DEBUG,
00614         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00615         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616         is_bit_,
00617         OPENDDS_STRING(reader_converter).c_str(),
00618         OPENDDS_STRING(writer_converter).c_str(),
00619         writers.length()));
00620   }
00621   DDS::InstanceHandleSeq handles;
00622 
00623   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00624 
00625   // This is used to hold the list of writers which were actually
00626   // removed, which is a proper subset of the writers which were
00627   // requested to be removed.
00628   WriterIdSeq updated_writers;
00629 
00630   CORBA::ULong wr_len;
00631 
00632   //Remove the writers from writer list. If the supplied writer
00633   //is not in the cached writers list then it is already removed.
00634   //We just need remove the writers in the list that have not been
00635   //removed.
00636   {
00637     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638 
00639     wr_len = writers.length();
00640 
00641     for (CORBA::ULong i = 0; i < wr_len; i++) {
00642       PublicationId writer_id = writers[i];
00643 
00644       WriterMapType::iterator it = this->writers_.find(writer_id);
00645 
00646       if (it != this->writers_.end()) {
00647         it->second->removed();
00648         end_historic_sweeper_->cancel_timer(it->second);
00649         remove_association_sweeper_->cancel_timer(it->second);
00650       }
00651 
00652       if (this->writers_.erase(writer_id) == 0) {
00653         if (DCPS_debug_level >= 1) {
00654           GuidConverter converter(writer_id);
00655           ACE_DEBUG((LM_DEBUG,
00656               ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00657               ACE_TEXT("the writer local %C was already removed.\n"),
00658               OPENDDS_STRING(converter).c_str()));
00659         }
00660 
00661       } else {
00662         push_back(updated_writers, writer_id);
00663       }
00664     }
00665   }
00666 
00667   wr_len = updated_writers.length();
00668 
00669   // Return now if the supplied writers have been removed already.
00670   if (wr_len == 0) {
00671     return;
00672   }
00673 
00674   if (!is_bit_) {
00675     // The writer should be in the id_to_handle map at this time.  Note
00676     // it if it not there.
00677     if (this->lookup_instance_handles(updated_writers, handles) == false) {
00678       if (DCPS_debug_level > 4) {
00679         ACE_DEBUG((LM_DEBUG,
00680             ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00681             ACE_TEXT("lookup_instance_handles failed.\n")));
00682       }
00683     }
00684 
00685     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00686       id_to_handle_map_.erase(updated_writers[i]);
00687     }
00688   }
00689 
00690   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00691     {
00692       this->disassociate(updated_writers[i]);
00693     }
00694   }
00695 
00696   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00697   if (!this->is_bit_) {
00698     // Derive the change in the number of publications writing to this reader.
00699     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00700     this->subscription_match_status_.current_count_change
00701     = matchedPublications - this->subscription_match_status_.current_count;
00702 
00703     // Only process status if the number of publications has changed.
00704     if (this->subscription_match_status_.current_count_change != 0) {
00705       this->subscription_match_status_.current_count = matchedPublications;
00706 
00707       /// Section 7.1.4.1: total_count will not decrement.
00708 
00709       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00710       this->subscription_match_status_.last_publication_handle
00711       = handles[ wr_len - 1];
00712 
00713       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00714 
00715       DDS::DataReaderListener_var listener
00716       = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00717 
00718       if (!CORBA::is_nil(listener.in())) {
00719         listener->on_subscription_matched(
00720             dr_local_objref_.in(),
00721             this->subscription_match_status_);
00722 
00723         // Client will look at it so next time it looks the change should be 0
00724         this->subscription_match_status_.total_count_change = 0;
00725         this->subscription_match_status_.current_count_change = 0;
00726       }
00727       notify_status_condition();
00728     }
00729   }
00730 
00731   // If this remove_association is invoked when the InfoRepo
00732   // detects a lost writer then make a callback to notify
00733   // subscription lost.
00734   if (notify_lost) {
00735     this->notify_subscription_lost(handles);
00736   }
00737 
00738   if (this->monitor_) {
00739     this->monitor_->report();
00740   }
00741 }
00742 
00743 void
00744 DataReaderImpl::remove_all_associations()
00745 {
00746   DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00747   // stop pending associations
00748   this->stop_associating();
00749 
00750   OpenDDS::DCPS::WriterIdSeq writers;
00751   int size;
00752 
00753   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00754 
00755   {
00756     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00757 
00758     size = static_cast<int>(writers_.size());
00759     writers.length(size);
00760 
00761     WriterMapType::iterator curr_writer = writers_.begin();
00762     WriterMapType::iterator end_writer = writers_.end();
00763 
00764     int i = 0;
00765 
00766     while (curr_writer != end_writer) {
00767       writers[i++] = curr_writer->first;
00768       ++curr_writer;
00769     }
00770   }
00771 
00772   try {
00773     CORBA::Boolean dont_notify_lost = 0;
00774 
00775     if (0 < size) {
00776       remove_associations(writers, dont_notify_lost);
00777     }
00778 
00779   } catch (const CORBA::Exception&) {
00780       ACE_DEBUG((LM_WARNING,
00781                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00782                  ACE_TEXT("caught exception from remove_associations.\n")));
00783   }
00784 }
00785 
00786 void
00787 DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00788 {
00789   DDS::DataReaderListener_var listener =
00790       listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00791 
00792   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00793       guard,
00794       this->publication_handle_lock_);
00795 
00796 
00797   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00798     // This test should make the method idempotent.
00799     return;
00800   }
00801 
00802   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00803       true);
00804 
00805   // copy status and increment change
00806   requested_incompatible_qos_status_.total_count = status.total_count;
00807   requested_incompatible_qos_status_.total_count_change +=
00808       status.count_since_last_send;
00809   requested_incompatible_qos_status_.last_policy_id =
00810       status.last_policy_id;
00811   requested_incompatible_qos_status_.policies = status.policies;
00812 
00813   if (!CORBA::is_nil(listener.in())) {
00814     listener->on_requested_incompatible_qos(dr_local_objref_.in(),
00815         requested_incompatible_qos_status_);
00816 
00817     // TBD - why does the spec say to change total_count_change but not
00818     // change the ChangeFlagStatus after a listener call?
00819 
00820     // client just looked at it so next time it looks the
00821     // change should be 0
00822     requested_incompatible_qos_status_.total_count_change = 0;
00823   }
00824 
00825   notify_status_condition();
00826 }
00827 
00828 void
00829 DataReaderImpl::inconsistent_topic()
00830 {
00831   topic_servant_->inconsistent_topic();
00832 }
00833 
00834 void
00835 DataReaderImpl::signal_liveliness(const RepoId& remote_participant)
00836 {
00837   RepoId prefix = remote_participant;
00838   prefix.entityId = EntityId_t();
00839 
00840   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00841 
00842   typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00843   typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00844   WriterSet writers;
00845 
00846   {
00847     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00848     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00849            limit = writers_.end();
00850          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00851          ++pos) {
00852       writers.push_back(std::make_pair(pos->first, pos->second));
00853     }
00854   }
00855 
00856   ACE_Time_Value when = ACE_OS::gettimeofday();
00857   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00858        pos != limit;
00859        ++pos) {
00860     pos->second->received_activity(when);
00861   }
00862 
00863   if (!writers.empty()) {
00864     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00865     for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00866          pos != limit;
00867          ++pos) {
00868       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00869            iter != instances_.end();
00870            ++iter) {
00871         SubscriptionInstance *ptr = iter->second;
00872         ptr->instance_state_.lively(pos->first);
00873       }
00874     }
00875   }
00876 }
00877 
00878 DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
00879     DDS::SampleStateMask sample_states,
00880     DDS::ViewStateMask view_states,
00881     DDS::InstanceStateMask instance_states)
00882 {
00883   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00884   DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00885       view_states, instance_states);
00886   read_conditions_.insert(rc);
00887   return rc._retn();
00888 }
00889 
00890 #ifndef OPENDDS_NO_QUERY_CONDITION
00891 DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
00892     DDS::SampleStateMask sample_states,
00893     DDS::ViewStateMask view_states,
00894     DDS::InstanceStateMask instance_states,
00895     const char* query_expression,
00896     const DDS::StringSeq& query_parameters)
00897 {
00898   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00899   try {
00900     DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00901         view_states, instance_states, query_expression, query_parameters);
00902     DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00903     read_conditions_.insert(rc);
00904     return qc._retn();
00905   } catch (const std::exception& e) {
00906     if (DCPS_debug_level) {
00907       ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ")
00908           ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00909           e.what()));
00910     }
00911     return 0;
00912   }
00913 }
00914 #endif
00915 
00916 bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
00917 {
00918   //sample lock already held
00919   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00920   return read_conditions_.find(rc) != read_conditions_.end();
00921 }
00922 
00923 DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
00924     DDS::ReadCondition_ptr a_condition)
00925 {
00926   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00927       DDS::RETCODE_OUT_OF_RESOURCES);
00928   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00929   return read_conditions_.erase(rc)
00930       ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00931 }
00932 
00933 DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
00934 {
00935   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00936       DDS::RETCODE_OUT_OF_RESOURCES);
00937   read_conditions_.clear();
00938   return DDS::RETCODE_OK;
00939 }
00940 
00941 DDS::ReturnCode_t DataReaderImpl::set_qos(
00942     const DDS::DataReaderQos & qos)
00943 {
00944 
00945   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00946   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00947   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00948 
00949   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00950     if (qos_ == qos)
00951       return DDS::RETCODE_OK;
00952 
00953     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00954       return DDS::RETCODE_IMMUTABLE_POLICY;
00955 
00956     } else {
00957       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00958       DDS::SubscriberQos subscriberQos;
00959       this->subscriber_servant_->get_qos(subscriberQos);
00960       const bool status =
00961           disco->update_subscription_qos(
00962               this->participant_servant_->get_domain_id(),
00963               this->participant_servant_->get_id(),
00964               this->subscription_id_,
00965               qos,
00966               subscriberQos);
00967       if (!status) {
00968         ACE_ERROR_RETURN((LM_ERROR,
00969             ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00970             ACE_TEXT("qos not updated. \n")),
00971             DDS::RETCODE_ERROR);
00972       }
00973     }
00974 
00975     // Reset the deadline timer if the period has changed.
00976     if (qos_.deadline.period.sec != qos.deadline.period.sec
00977         || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00978       if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00979           && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00980         this->watchdog_ =
00981             new RequestedDeadlineWatchdog(
00982                 this->sample_lock_,
00983                 qos.deadline,
00984                 this,
00985                 this->dr_local_objref_.in(),
00986                 this->requested_deadline_missed_status_,
00987                 this->last_deadline_missed_total_count_);
00988 
00989       } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00990           && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00991         this->watchdog_->cancel_all();
00992         this->watchdog_->destroy();
00993         this->watchdog_ = 0;
00994 
00995       } else {
00996         this->watchdog_->reset_interval(
00997             duration_to_time_value(qos.deadline.period));
00998       }
00999     }
01000 
01001     qos_ = qos;
01002 
01003     return DDS::RETCODE_OK;
01004 
01005   } else {
01006     return DDS::RETCODE_INCONSISTENT_POLICY;
01007   }
01008 }
01009 
01010 DDS::ReturnCode_t
01011 DataReaderImpl::get_qos(
01012     DDS::DataReaderQos & qos)
01013 {
01014   qos = qos_;
01015   return DDS::RETCODE_OK;
01016 }
01017 
01018 DDS::ReturnCode_t DataReaderImpl::set_listener(
01019     DDS::DataReaderListener_ptr a_listener,
01020     DDS::StatusMask mask)
01021 {
01022   listener_mask_ = mask;
01023   //note: OK to duplicate  a nil object ref
01024   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
01025   return DDS::RETCODE_OK;
01026 }
01027 
01028 DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
01029 {
01030   return DDS::DataReaderListener::_duplicate(listener_.in());
01031 }
01032 
01033 DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
01034 {
01035 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01036   DDS::ContentFilteredTopic_ptr cft = this->get_cf_topic();
01037   if (cft) {
01038     return cft; // get_cf_topic has already _duplicated()
01039   }
01040 #endif
01041   return DDS::TopicDescription::_duplicate(topic_desc_.in());
01042 }
01043 
01044 DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
01045 {
01046   return DDS::Subscriber::_duplicate(subscriber_servant_);
01047 }
01048 
01049 DDS::ReturnCode_t
01050 DataReaderImpl::get_sample_rejected_status(
01051     DDS::SampleRejectedStatus & status)
01052 {
01053   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01054 
01055   set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
01056   status = sample_rejected_status_;
01057   sample_rejected_status_.total_count_change = 0;
01058   return DDS::RETCODE_OK;
01059 }
01060 
01061 DDS::ReturnCode_t
01062 DataReaderImpl::get_liveliness_changed_status(
01063     DDS::LivelinessChangedStatus & status)
01064 {
01065   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01066 
01067   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
01068       false);
01069   status = liveliness_changed_status_;
01070 
01071   liveliness_changed_status_.alive_count_change = 0;
01072   liveliness_changed_status_.not_alive_count_change = 0;
01073 
01074   return DDS::RETCODE_OK;
01075 }
01076 
01077 DDS::ReturnCode_t
01078 DataReaderImpl::get_requested_deadline_missed_status(
01079     DDS::RequestedDeadlineMissedStatus & status)
01080 {
01081   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01082 
01083   set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
01084       false);
01085 
01086   this->requested_deadline_missed_status_.total_count_change =
01087       this->requested_deadline_missed_status_.total_count
01088       - this->last_deadline_missed_total_count_;
01089 
01090   // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
01091   // is updated by the RequestedDeadlineWatchdog.
01092 
01093   // Update for next status check.
01094   this->last_deadline_missed_total_count_ =
01095       this->requested_deadline_missed_status_.total_count;
01096 
01097   status = requested_deadline_missed_status_;
01098 
01099   return DDS::RETCODE_OK;
01100 }
01101 
01102 DDS::ReturnCode_t
01103 DataReaderImpl::get_requested_incompatible_qos_status(
01104     DDS::RequestedIncompatibleQosStatus & status)
01105 {
01106 
01107   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01108       this->publication_handle_lock_);
01109 
01110   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
01111       false);
01112   status = requested_incompatible_qos_status_;
01113   requested_incompatible_qos_status_.total_count_change = 0;
01114 
01115   return DDS::RETCODE_OK;
01116 }
01117 
01118 DDS::ReturnCode_t
01119 DataReaderImpl::get_subscription_matched_status(
01120     DDS::SubscriptionMatchedStatus & status)
01121 {
01122 
01123   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01124       this->publication_handle_lock_);
01125 
01126   set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
01127   status = subscription_match_status_;
01128   subscription_match_status_.total_count_change = 0;
01129   subscription_match_status_.current_count_change = 0;
01130 
01131   return DDS::RETCODE_OK;
01132 }
01133 
01134 DDS::ReturnCode_t
01135 DataReaderImpl::get_sample_lost_status(
01136     DDS::SampleLostStatus & status)
01137 {
01138   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01139 
01140   set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
01141   status = sample_lost_status_;
01142   sample_lost_status_.total_count_change = 0;
01143   return DDS::RETCODE_OK;
01144 }
01145 
01146 DDS::ReturnCode_t
01147 DataReaderImpl::wait_for_historical_data(
01148     const DDS::Duration_t & /* max_wait */)
01149 {
01150   // Add your implementation here
01151   return 0;
01152 }
01153 
01154 DDS::ReturnCode_t
01155 DataReaderImpl::get_matched_publications(
01156     DDS::InstanceHandleSeq & publication_handles)
01157 {
01158   if (enabled_ == false) {
01159     ACE_ERROR_RETURN((LM_ERROR,
01160         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01161         ACE_TEXT(" Entity is not enabled. \n")),
01162         DDS::RETCODE_NOT_ENABLED);
01163   }
01164 
01165   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01166       guard,
01167       this->publication_handle_lock_,
01168       DDS::RETCODE_ERROR);
01169 
01170   // Copy out the handles for the current set of publications.
01171   int index = 0;
01172   publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01173 
01174   for (RepoIdToHandleMap::iterator
01175       current = this->id_to_handle_map_.begin();
01176       current != this->id_to_handle_map_.end();
01177       ++current, ++index) {
01178     publication_handles[ index] = current->second;
01179   }
01180 
01181   return DDS::RETCODE_OK;
01182 }
01183 
01184 #if !defined (DDS_HAS_MINIMUM_BIT)
01185 DDS::ReturnCode_t
01186 DataReaderImpl::get_matched_publication_data(
01187     DDS::PublicationBuiltinTopicData & publication_data,
01188     DDS::InstanceHandle_t publication_handle)
01189 {
01190   if (enabled_ == false) {
01191     ACE_ERROR_RETURN((LM_ERROR,
01192         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01193         ACE_TEXT("get_matched_publication_data: ")
01194         ACE_TEXT("Entity is not enabled. \n")),
01195         DDS::RETCODE_NOT_ENABLED);
01196   }
01197 
01198 
01199   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01200       guard,
01201       this->publication_handle_lock_,
01202       DDS::RETCODE_ERROR);
01203 
01204 
01205   BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01206   DDS::PublicationBuiltinTopicDataDataReader_var,
01207   DDS::PublicationBuiltinTopicDataSeq > hh;
01208 
01209   DDS::PublicationBuiltinTopicDataSeq data;
01210 
01211   DDS::ReturnCode_t ret
01212   = hh.instance_handle_to_bit_data(participant_servant_,
01213       BUILT_IN_PUBLICATION_TOPIC,
01214       publication_handle,
01215       data);
01216 
01217   if (ret == DDS::RETCODE_OK) {
01218     publication_data = data[0];
01219   }
01220 
01221   return ret;
01222 }
01223 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01224 
01225 DDS::ReturnCode_t
01226 DataReaderImpl::enable()
01227 {
01228   //According spec:
01229   // - Calling enable on an already enabled Entity returns OK and has no
01230   // effect.
01231   // - Calling enable on an Entity whose factory is not enabled will fail
01232   // and return PRECONDITION_NOT_MET.
01233 
01234   if (this->is_enabled()) {
01235     return DDS::RETCODE_OK;
01236   }
01237 
01238   if (this->subscriber_servant_->is_enabled() == false) {
01239     return DDS::RETCODE_PRECONDITION_NOT_MET;
01240   }
01241 
01242   if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01243     // The spec says qos_.history.depth is "has no effect"
01244     // when history.kind = KEEP_ALL so use max_samples_per_instance
01245     depth_ = qos_.resource_limits.max_samples_per_instance;
01246 
01247   } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
01248     depth_ = qos_.history.depth;
01249   }
01250 
01251   if (depth_ == DDS::LENGTH_UNLIMITED) {
01252     // DDS::LENGTH_UNLIMITED is negative so make it a positive
01253     // value that is for all intents and purposes unlimited
01254     // and we can use it for comparisons.
01255     // use 2147483647L because that is the greatest value a signed
01256     // CORBA::Long can have.
01257     // WARNING: The client risks running out of memory in this case.
01258     depth_ = 2147483647L;
01259   }
01260 
01261   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01262     n_chunks_ = qos_.resource_limits.max_samples;
01263   }
01264 
01265   //else using value from Service_Participant
01266 
01267   // enable the type specific part of this DataReader
01268   this->enable_specific();
01269 
01270   //Note: the QoS used to set n_chunks_ is Changable=No so
01271   // it is OK that we cannot change the size of our allocators.
01272   rd_allocator_ = new ReceivedDataAllocator(n_chunks_);
01273 
01274   if (DCPS_debug_level >= 2)
01275     ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01276         " Cached_Allocator_With_Overflow %x with %d chunks\n",
01277         rd_allocator_, n_chunks_));
01278 
01279   if ((qos_.liveliness.lease_duration.sec !=
01280       DDS::DURATION_INFINITE_SEC) &&
01281       (qos_.liveliness.lease_duration.nanosec !=
01282           DDS::DURATION_INFINITE_NSEC)) {
01283     liveliness_lease_duration_ =
01284         duration_to_time_value(qos_.liveliness.lease_duration);
01285   }
01286 
01287   // Setup the requested deadline watchdog if the configured deadline
01288   // period is not the default (infinite).
01289   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01290 
01291   if (this->watchdog_ == 0
01292       && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01293           || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01294     this->watchdog_ =
01295         new RequestedDeadlineWatchdog(
01296             this->sample_lock_,
01297             this->qos_.deadline,
01298             this,
01299             this->dr_local_objref_.in(),
01300             this->requested_deadline_missed_status_,
01301             this->last_deadline_missed_total_count_);
01302   }
01303 
01304   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01305   disco->pre_reader(this);
01306 
01307   this->set_enabled();
01308 
01309   if (topic_servant_ && !transport_disabled_) {
01310 
01311     try {
01312       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01313           this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01314     } catch (const Transport::Exception&) {
01315       ACE_ERROR((LM_ERROR,
01316           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01317           ACE_TEXT("Transport Exception.\n")));
01318       return DDS::RETCODE_ERROR;
01319 
01320     }
01321 
01322     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01323 
01324     CORBA::String_var filterClassName = "";
01325     CORBA::String_var filterExpression = "";
01326     DDS::StringSeq exprParams;
01327 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01328     DDS::ContentFilteredTopic_var cft = this->get_cf_topic();
01329     if (cft) {
01330       OpenDDS::DCPS::ContentFilteredTopicImpl* impl =
01331         dynamic_cast<OpenDDS::DCPS::ContentFilteredTopicImpl*>(cft.in());
01332       if (impl) {
01333         filterClassName = impl->get_filter_class_name();
01334       }
01335       filterExpression = cft->get_filter_expression();
01336       cft->get_expression_parameters(exprParams);
01337     }
01338 #endif
01339 
01340     DDS::SubscriberQos sub_qos;
01341     this->subscriber_servant_->get_qos(sub_qos);
01342 
01343     this->subscription_id_ =
01344         disco->add_subscription(this->domain_id_,
01345             this->participant_servant_->get_id(),
01346             this->topic_servant_->get_id(),
01347             this,
01348             this->qos_,
01349             trans_conf_info,
01350             sub_qos,
01351             filterClassName,
01352             filterExpression,
01353             exprParams);
01354 
01355     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01356       ACE_ERROR((LM_ERROR,
01357           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01358           ACE_TEXT("add_subscription returned invalid id.\n")));
01359       return DDS::RETCODE_ERROR;
01360     }
01361   }
01362 
01363   if (topic_servant_) {
01364     const CORBA::String_var name = topic_servant_->get_name();
01365     DDS::ReturnCode_t return_value =
01366         this->subscriber_servant_->reader_enabled(name.in(), this);
01367 
01368     if (this->monitor_) {
01369       this->monitor_->report();
01370     }
01371 
01372     return return_value;
01373   } else {
01374     return DDS::RETCODE_OK;
01375   }
01376 }
01377 
01378 void
01379 DataReaderImpl::writer_activity(const DataSampleHeader& header)
01380 {
01381   // caller should have the sample_lock_ !!!
01382 
01383   RcHandle<WriterInfo> writer;
01384 
01385   // The received_activity() has to be called outside the writers_lock_
01386   // because it probably acquire writers_lock_ read lock recursively
01387   // (in handle_timeout). This could cause deadlock when there are writers
01388   // waiting.
01389   {
01390     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01391     WriterMapType::iterator iter = writers_.find(header.publication_id_);
01392 
01393     if (iter != writers_.end()) {
01394       writer = iter->second;
01395 
01396     } else if (DCPS_debug_level > 4) {
01397       // This may not be an error since it could happen that the sample
01398       // is delivered to the datareader after the write is dis-associated
01399       // with this datareader.
01400       GuidConverter reader_converter(subscription_id_);
01401       GuidConverter writer_converter(header.publication_id_);
01402       ACE_DEBUG((LM_DEBUG,
01403           ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01404           ACE_TEXT("reader %C is not associated with writer %C.\n"),
01405           OPENDDS_STRING(reader_converter).c_str(),
01406           OPENDDS_STRING(writer_converter).c_str()));
01407     }
01408   }
01409 
01410   if (!writer.is_nil()) {
01411     ACE_Time_Value when = ACE_OS::gettimeofday();
01412     writer->received_activity(when);
01413 
01414     if ((header.message_id_ == SAMPLE_DATA) ||
01415         (header.message_id_ == INSTANCE_REGISTRATION) ||
01416         (header.message_id_ == UNREGISTER_INSTANCE) ||
01417         (header.message_id_ == DISPOSE_INSTANCE) ||
01418         (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01419 
01420       const SequenceNumber defaultSN;
01421       SequenceRange resetRange(defaultSN, header.sequence_);
01422 
01423       if (writer->seen_data_ && !header.sequence_repair_) {
01424         // Data samples should be acknowledged prior to any
01425         // reader-side filtering to ensure discontinuities
01426         // are not unintentionally introduced.
01427         writer->ack_sequence(header.sequence_);
01428 
01429       } else {
01430         // In order to properly track out of order delivery,
01431         // a baseline must be established based on the first
01432         // data sample received.
01433         writer->seen_data_ = true;
01434         writer->ack_sequence_.reset();
01435         writer->ack_sequence_.insert(resetRange);
01436       }
01437 
01438 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01439       if (header.coherent_change_) {
01440         if (writer->coherent_samples_ == 0) {
01441           writer->coherent_sample_sequence_.reset();
01442           writer->coherent_sample_sequence_.insert(resetRange);
01443         }
01444         else {
01445           writer->coherent_sample_sequence_.insert(header.sequence_);
01446         }
01447       }
01448 #endif
01449     }
01450   }
01451 }
01452 
01453 void
01454 DataReaderImpl::data_received(const ReceivedDataSample& sample)
01455 {
01456   DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01457 
01458   // ensure some other thread is not changing the sample container
01459   // or statuses related to samples.
01460   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01461 
01462   if (get_deleted()) return;
01463 
01464   if (DCPS_debug_level > 9) {
01465     GuidConverter converter(subscription_id_);
01466     ACE_DEBUG((LM_DEBUG,
01467         ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01468         ACE_TEXT("%C received sample: %C.\n"),
01469         OPENDDS_STRING(converter).c_str(),
01470         to_string(sample.header_).c_str()));
01471   }
01472 
01473   switch (sample.header_.message_id_) {
01474   case SAMPLE_DATA:
01475   case INSTANCE_REGISTRATION: {
01476     if (!check_historic(sample)) break;
01477 
01478     DataSampleHeader const & header = sample.header_;
01479 
01480     this->writer_activity(header);
01481 
01482     // Verify data has not exceeded its lifespan.
01483     if (this->filter_sample(header)) break;
01484 
01485     // This adds the reader to the set/list of readers with data.
01486     this->subscriber_servant_->data_received(this);
01487 
01488     // Only gather statistics about real samples, not registration data, etc.
01489     if (header.message_id_ == SAMPLE_DATA) {
01490       this->process_latency(sample);
01491     }
01492 
01493     // This also adds to the sample container and makes any callbacks
01494     // and condition modifications.
01495 
01496     SubscriptionInstance* instance = 0;
01497     bool is_new_instance = false;
01498     bool filtered = false;
01499     if (sample.header_.key_fields_only_) {
01500       dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01501     } else {
01502       dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01503     }
01504 
01505     // Per sample logging
01506     if (DCPS_debug_level >= 8) {
01507       GuidConverter reader_converter(subscription_id_);
01508       GuidConverter writer_converter(header.publication_id_);
01509 
01510       ACE_DEBUG ((LM_DEBUG,
01511           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01512           ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01513           OPENDDS_STRING(reader_converter).c_str(),
01514           OPENDDS_STRING(writer_converter).c_str(),
01515           instance ? instance->instance_handle_ : 0,
01516               is_new_instance, filtered));
01517     }
01518 
01519     if (filtered) break; // sample filtered from instance
01520     bool accepted = true;
01521 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01522     bool verify_coherent = false;
01523 #endif
01524     RcHandle<WriterInfo> writer;
01525 
01526     if (header.publication_id_.entityId.entityKind
01527         != OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER) {
01528       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01529 
01530       WriterMapType::iterator where
01531       = this->writers_.find(header.publication_id_);
01532 
01533       if (where != this->writers_.end()) {
01534         if (header.coherent_change_) {
01535 
01536 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01537           // Received coherent change
01538           where->second->group_coherent_ = header.group_coherent_;
01539           where->second->publisher_id_ = header.publisher_id_;
01540           ++where->second->coherent_samples_;
01541           verify_coherent = true;
01542 #endif
01543           writer = where->second;
01544         }
01545       } else {
01546         GuidConverter subscriptionBuffer(this->subscription_id_);
01547         GuidConverter publicationBuffer(header.publication_id_);
01548         ACE_DEBUG((LM_WARNING,
01549             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01550             ACE_TEXT("subscription %C failed to find ")
01551             ACE_TEXT("publication data for %C.\n"),
01552             OPENDDS_STRING(subscriptionBuffer).c_str(),
01553             OPENDDS_STRING(publicationBuffer).c_str()));
01554       }
01555     }
01556 
01557 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01558     if (verify_coherent) {
01559       accepted = this->verify_coherent_changes_completion(writer.in());
01560     }
01561 #endif
01562 
01563     if (this->watchdog_) {
01564       instance->last_sample_tv_ = instance->cur_sample_tv_;
01565       instance->cur_sample_tv_ = ACE_OS::gettimeofday();
01566 
01567       // Watchdog can't be called with sample_lock_ due to reactor deadlock
01568       ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01569       if (is_new_instance) {
01570         this->watchdog_->schedule_timer(instance);
01571 
01572       } else {
01573         this->watchdog_->execute(instance, false);
01574       }
01575     }
01576 
01577     if (accepted) {
01578       this->notify_read_conditions();
01579     }
01580   }
01581   break;
01582 
01583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01584   case END_COHERENT_CHANGES: {
01585     CoherentChangeControl control;
01586 
01587     this->writer_activity(sample.header_);
01588 
01589     Serializer serializer(
01590         sample.sample_, sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01591     serializer >> control;
01592 
01593     if (DCPS_debug_level > 0) {
01594       std::stringstream buffer;
01595       buffer << control << std::endl;
01596 
01597       ACE_DEBUG((LM_DEBUG,
01598           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01599           ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01600           buffer.str().c_str()));
01601     }
01602 
01603     RcHandle<WriterInfo> writer;
01604     {
01605       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01606 
01607       WriterMapType::iterator it =
01608           this->writers_.find(sample.header_.publication_id_);
01609 
01610       if (it == this->writers_.end()) {
01611         GuidConverter sub_id(this->subscription_id_);
01612         GuidConverter pub_id(sample.header_.publication_id_);
01613         ACE_DEBUG((LM_WARNING,
01614             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01615             ACE_TEXT(" subscription %C failed to find ")
01616             ACE_TEXT(" publication data for %C!\n"),
01617             OPENDDS_STRING(sub_id).c_str(),
01618             OPENDDS_STRING(pub_id).c_str()));
01619         return;
01620       }
01621       else {
01622         writer = it->second;
01623       }
01624       it->second->set_group_info (control);
01625     }
01626 
01627     if (this->verify_coherent_changes_completion(writer.in())) {
01628       this->notify_read_conditions();
01629     }
01630   }
01631   break;
01632 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01633 
01634   case DATAWRITER_LIVELINESS: {
01635     if (DCPS_debug_level >= 4) {
01636       GuidConverter reader_converter(subscription_id_);
01637       GuidConverter writer_converter(sample.header_.publication_id_);
01638       ACE_DEBUG((LM_DEBUG,
01639                  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01640                  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01641                  OPENDDS_STRING(reader_converter).c_str(),
01642                  OPENDDS_STRING(writer_converter).c_str()));
01643     }
01644     this->writer_activity(sample.header_);
01645 
01646     // tell all instances they got a liveliness message
01647     { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01648     for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01649         iter != instances_.end();
01650         ++iter) {
01651       SubscriptionInstance *ptr = iter->second;
01652 
01653       ptr->instance_state_.lively(sample.header_.publication_id_);
01654     }
01655     }
01656 
01657   }
01658   break;
01659 
01660   case DISPOSE_INSTANCE: {
01661     if (!check_historic(sample)) break;
01662     this->writer_activity(sample.header_);
01663     SubscriptionInstance* instance = 0;
01664 
01665     if (this->watchdog_) {
01666       // Find the instance first for timer cancellation since
01667       // the instance may be deleted during dispose and can
01668       // not be accessed.
01669       ReceivedDataSample dup(sample);
01670       this->lookup_instance(dup, instance);
01671 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01672       if (! this->is_exclusive_ownership_
01673           || (this->is_exclusive_ownership_
01674               && (instance != 0 )
01675               && (this->owner_manager_->is_owner (instance->instance_handle_,
01676                   sample.header_.publication_id_)))) {
01677 #endif
01678         this->watchdog_->cancel_timer(instance);
01679 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01680       }
01681 #endif
01682     }
01683     instance = 0;
01684     this->dispose_unregister(sample, instance);
01685   }
01686   this->notify_read_conditions();
01687   break;
01688 
01689   case UNREGISTER_INSTANCE: {
01690     if (!check_historic(sample)) break;
01691     this->writer_activity(sample.header_);
01692     SubscriptionInstance* instance = 0;
01693 
01694     if (this->watchdog_) {
01695       // Find the instance first for timer cancellation since
01696       // the instance may be deleted during dispose and can
01697       // not be accessed.
01698       ReceivedDataSample dup(sample);
01699       this->lookup_instance(dup, instance);
01700       if( instance != 0) {
01701 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01702         if (! this->is_exclusive_ownership_
01703             || (this->is_exclusive_ownership_
01704                 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01705 #endif
01706           this->watchdog_->cancel_timer(instance);
01707 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01708         }
01709 #endif
01710       }
01711     }
01712     instance = 0;
01713     this->dispose_unregister(sample, instance);
01714   }
01715   this->notify_read_conditions();
01716   break;
01717 
01718   case DISPOSE_UNREGISTER_INSTANCE: {
01719     if (!check_historic(sample)) break;
01720     this->writer_activity(sample.header_);
01721     SubscriptionInstance* instance = 0;
01722 
01723     if (this->watchdog_) {
01724       // Find the instance first for timer cancellation since
01725       // the instance may be deleted during dispose and can
01726       // not be accessed.
01727       ReceivedDataSample dup(sample);
01728       this->lookup_instance(dup, instance);
01729 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01730       if (! this->is_exclusive_ownership_
01731           || (this->is_exclusive_ownership_
01732               && (instance != 0 )
01733               && (this->owner_manager_->is_owner (instance->instance_handle_,
01734                   sample.header_.publication_id_)))
01735                   || (this->is_exclusive_ownership_
01736                       && (instance != 0 )
01737                       && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01738 #endif
01739         this->watchdog_->cancel_timer(instance);
01740 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01741       }
01742 #endif
01743     }
01744     instance = 0;
01745     this->dispose_unregister(sample, instance);
01746   }
01747   this->notify_read_conditions();
01748   break;
01749 
01750   case END_HISTORIC_SAMPLES: {
01751     if (sample.header_.message_length_ >= sizeof(RepoId)) {
01752       Serializer ser(sample.sample_);
01753       RepoId readerId = GUID_UNKNOWN;
01754       ser >> readerId;
01755       if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01756         break; // not our message
01757       }
01758     }
01759     if (DCPS_debug_level > 4) {
01760       ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01761     }
01762     // Going to acquire writers lock, release samples lock
01763     guard.release();
01764     this->resume_sample_processing(sample.header_.publication_id_);
01765     if (DCPS_debug_level > 4) {
01766       GuidConverter pub_id(sample.header_.publication_id_);
01767       ACE_DEBUG((
01768           LM_INFO,
01769           "(%P|%t) Resumed sample processing for durable writer %C\n",
01770           OPENDDS_STRING(pub_id).c_str()));
01771     }
01772     break;
01773   }
01774 
01775   default:
01776     ACE_ERROR((LM_ERROR,
01777         "(%P|%t) ERROR: DataReaderImpl::data_received"
01778         "unexpected message_id = %d\n",
01779         sample.header_.message_id_));
01780     break;
01781   }
01782 }
01783 
01784 EntityImpl*
01785 DataReaderImpl::parent() const
01786 {
01787   return this->subscriber_servant_;
01788 }
01789 
01790 bool
01791 DataReaderImpl::check_transport_qos(const TransportInst& ti)
01792 {
01793   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01794     return ti.is_reliable();
01795   }
01796   return true;
01797 }
01798 
01799 void DataReaderImpl::notify_read_conditions()
01800 {
01801   //sample lock is already held
01802   ReadConditionSet local_read_conditions = read_conditions_;
01803   ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01804 
01805   for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01806       end = local_read_conditions.end(); it != end; ++it) {
01807     dynamic_cast<ConditionImpl*>(it->in())->signal_all();
01808   }
01809 }
01810 
01811 SubscriberImpl* DataReaderImpl::get_subscriber_servant()
01812 {
01813   return subscriber_servant_;
01814 }
01815 
01816 RepoId DataReaderImpl::get_subscription_id() const
01817 {
01818   return subscription_id_;
01819 }
01820 
01821 char *
01822 DataReaderImpl::get_topic_name() const
01823 {
01824   return topic_servant_->get_name();
01825 }
01826 
01827 bool DataReaderImpl::have_sample_states(
01828     DDS::SampleStateMask sample_states) const
01829 {
01830   //!!!caller should have acquired sample_lock_
01831   /// @TODO: determine correct failed lock return value.
01832   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01833 
01834   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01835       iter != instances_.end();
01836       ++iter) {
01837     SubscriptionInstance *ptr = iter->second;
01838 
01839     for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01840         item != 0; item = item->next_data_sample_) {
01841       if (item->sample_state_ & sample_states) {
01842         return true;
01843       }
01844     }
01845   }
01846 
01847   return false;
01848 }
01849 
01850 bool
01851 DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
01852 {
01853   //!!!caller should have acquired sample_lock_
01854   /// @TODO: determine correct failed lock return value.
01855   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01856 
01857   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01858       iter != instances_.end();
01859       ++iter) {
01860     SubscriptionInstance *ptr = iter->second;
01861 
01862     if (ptr->instance_state_.view_state() & view_states) {
01863       return true;
01864     }
01865   }
01866 
01867   return false;
01868 }
01869 
01870 bool DataReaderImpl::have_instance_states(
01871     DDS::InstanceStateMask instance_states) const
01872 {
01873   //!!!caller should have acquired sample_lock_
01874   /// @TODO: determine correct failed lock return value.
01875   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01876 
01877   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01878       iter != instances_.end();
01879       ++iter) {
01880     SubscriptionInstance *ptr = iter->second;
01881 
01882     if (ptr->instance_state_.instance_state() & instance_states) {
01883       return true;
01884     }
01885   }
01886 
01887   return false;
01888 }
01889 
01890 /// Fold-in the three separate loops of have_sample_states(),
01891 /// have_view_states(), and have_instance_states().  Takes the sample_lock_.
01892 bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
01893     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
01894 {
01895   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01896   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01897 
01898   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01899       end = instances_.end(); iter != end; ++iter) {
01900     SubscriptionInstance& inst = *iter->second;
01901 
01902     if ((inst.instance_state_.view_state() & view_states) &&
01903         (inst.instance_state_.instance_state() & instance_states)) {
01904       for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01905           item = item->next_data_sample_) {
01906         if (item->sample_state_ & sample_states
01907 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01908             && !item->coherent_change_
01909 #endif
01910         ) {
01911           return true;
01912         }
01913       }
01914     }
01915   }
01916 
01917   return false;
01918 }
01919 
01920 DDS::DataReaderListener_ptr
01921 DataReaderImpl::listener_for(DDS::StatusKind kind)
01922 {
01923   // per 2.1.4.3.1 Listener Access to Plain Communication Status
01924   // use this entities factory if listener is mask not enabled
01925   // for this kind.
01926   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01927     return subscriber_servant_->listener_for(kind);
01928 
01929   } else {
01930     return DDS::DataReaderListener::_duplicate(listener_.in());
01931   }
01932 }
01933 
01934 void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
01935     const ReceivedDataElement *ptr)
01936 {
01937 
01938   sample_info.sample_rank = 0;
01939 
01940   // generation_rank =
01941   //    (MRSIC.disposed_generation_count +
01942   //     MRSIC.no_writers_generation_count)
01943   //  - (S.disposed_generation_count +
01944   //     S.no_writers_generation_count)
01945   //
01946   sample_info.generation_rank =
01947       (sample_info.disposed_generation_count +
01948           sample_info.no_writers_generation_count) -
01949           sample_info.generation_rank;
01950 
01951   // absolute_generation_rank =
01952   //     (MRS.disposed_generation_count +
01953   //      MRS.no_writers_generation_count)
01954   //   - (S.disposed_generation_count +
01955   //      S.no_writers_generation_count)
01956   //
01957   sample_info.absolute_generation_rank =
01958       (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01959           static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01960           sample_info.absolute_generation_rank;
01961 
01962   sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01963 }
01964 
01965 CORBA::Long DataReaderImpl::total_samples() const
01966 {
01967   //!!!caller should have acquired sample_lock_
01968   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01969 
01970   CORBA::Long count(0);
01971 
01972   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01973       iter != instances_.end();
01974       ++iter) {
01975     SubscriptionInstance *ptr = iter->second;
01976 
01977     count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01978   }
01979 
01980   return count;
01981 }
01982 
01983 int
01984 DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
01985                                                 const void * /*arg*/)
01986 {
01987   check_liveliness_i(false, tv);
01988   return 0;
01989 }
01990 
01991 void
01992 DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
01993                                                     const ACE_Time_Value& now)
01994 {
01995   // Working copy of the active timer Id.
01996   long local_timer_id = liveliness_timer_id_;
01997   bool timer_was_reset = false;
01998 
01999   if (local_timer_id != -1 && cancel) {
02000     if (DCPS_debug_level >= 5) {
02001       GuidConverter converter(data_reader_->get_subscription_id());
02002       ACE_DEBUG((LM_DEBUG,
02003                  ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02004                  ACE_TEXT(" canceling timer for reader %C.\n"),
02005                  OPENDDS_STRING(converter).c_str()));
02006     }
02007 
02008     // called from add_associations and there is already a timer
02009     // so cancel the existing timer.
02010     if (this->reactor()->cancel_timer(local_timer_id) == -1) {
02011       // this could fail because the reactor's call and
02012       // the add_associations' call to this could overlap
02013       // so it is not a failure.
02014       ACE_DEBUG((LM_DEBUG,
02015                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02016                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
02017     }
02018 
02019     timer_was_reset = true;
02020   }
02021 
02022   // Used after the lock scope ends.
02023   ACE_Time_Value smallest(ACE_Time_Value::max_time);
02024   int alive_writers = 0;
02025 
02026   // This is a bit convoluted.  The reasoning goes as follows:
02027   // 1) We grab the current timer Id value when we enter the method.
02028   // 2) We *might* cancel the timer if it is active.
02029   // 3) The timer *might* be rescheduled while we do not hold the sample lock.
02030   // 4) If we (or another thread) canceled the timer that we can tell, then
02031   // 5) we should clear the Id value,
02032   // 6) unless it has been rescheduled.
02033   // We are using a changed timer Id value as a proxy for having been
02034   // rescheduled.
02035   if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
02036     liveliness_timer_id_ = -1;
02037   }
02038 
02039   ACE_Time_Value next_absolute;
02040 
02041   // Iterate over each writer to this reader
02042   {
02043     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02044         read_guard,
02045         data_reader_->writers_lock_);
02046 
02047     for (WriterMapType::iterator iter = data_reader_->writers_.begin();
02048         iter != data_reader_->writers_.end();
02049         ++iter) {
02050       // deal with possibly not being alive or
02051       // tell when it will not be alive next (if no activity)
02052       next_absolute = iter->second->check_activity(now);
02053 
02054       if (next_absolute != ACE_Time_Value::max_time) {
02055         alive_writers++;
02056 
02057         if (next_absolute < smallest) {
02058           smallest = next_absolute;
02059         }
02060       }
02061     }
02062   }
02063 
02064   if (!alive_writers) {
02065     // no live writers so no need to schedule a timer
02066     // but be sure we don't try to cancel the timer later.
02067     liveliness_timer_id_ = -1;
02068   }
02069 
02070   if (DCPS_debug_level >= 5) {
02071     GuidConverter converter(data_reader_->get_subscription_id());
02072     ACE_DEBUG((LM_DEBUG,
02073         ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02074         ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
02075         OPENDDS_STRING(converter).c_str(),
02076         alive_writers,
02077         !cancel));
02078   }
02079 
02080   // Call into the reactor after releasing the sample lock.
02081   if (alive_writers) {
02082     ACE_Time_Value relative;
02083 
02084     // compare the time now with the earliest(smallest) deadline we found
02085     if (now < smallest)
02086       relative = smallest - now;
02087 
02088     else
02089       relative = ACE_Time_Value(0,1); // ASAP
02090 
02091     liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
02092 
02093     if (liveliness_timer_id_ == -1) {
02094       ACE_ERROR((LM_ERROR,
02095           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02096           ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
02097     }
02098   }
02099 }
02100 
02101 void
02102 DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
02103 {
02104 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02105   if (this->is_exclusive_ownership_) {
02106     this->owner_manager_->remove_writers (handle);
02107   }
02108 #endif
02109 
02110   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
02111   SubscriptionInstance* instance = this->get_handle_instance(handle);
02112 
02113   if (instance == 0) {
02114     ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
02115         "could not find the instance by handle 0x%x\n", handle));
02116     return;
02117   }
02118 
02119   this->purge_data(instance);
02120 
02121   { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02122   this->instances_.erase(handle);
02123   }
02124   this->release_instance_i(handle);
02125   if (this->monitor_) {
02126     this->monitor_->report();
02127   }
02128 }
02129 
02130 
02131 OpenDDS::DCPS::WriterStats::WriterStats(
02132     int amount,
02133     DataCollector<double>::OnFull type) : stats_(amount, type)
02134 {
02135 }
02136 
02137 void OpenDDS::DCPS::WriterStats::add_stat(const ACE_Time_Value& delay)
02138 {
02139   double datum = static_cast<double>(delay.sec());
02140   datum += delay.usec() / 1000000.0;
02141   this->stats_.add(datum);
02142 }
02143 
02144 OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
02145 {
02146   LatencyStatistics value;
02147 
02148   value.publication = GUID_UNKNOWN;
02149   value.n           = this->stats_.n();
02150   value.maximum     = this->stats_.maximum();
02151   value.minimum     = this->stats_.minimum();
02152   value.mean        = this->stats_.mean();
02153   value.variance    = this->stats_.var();
02154 
02155   return value;
02156 }
02157 
02158 void OpenDDS::DCPS::WriterStats::reset_stats()
02159 {
02160   this->stats_.reset();
02161 }
02162 
02163 #ifndef OPENDDS_SAFETY_PROFILE
02164 std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
02165 {
02166   str << std::dec << this->stats_.size()
02167                               << " samples out of " << this->stats_.n() << std::endl;
02168   return str << this->stats_;
02169 }
02170 #endif //OPENDDS_SAFETY_PROFILE
02171 
02172 void
02173 DataReaderImpl::writer_removed(WriterInfo& info)
02174 {
02175   if (DCPS_debug_level >= 5) {
02176     GuidConverter reader_converter(subscription_id_);
02177     GuidConverter writer_converter(info.writer_id_);
02178     ACE_DEBUG((LM_DEBUG,
02179         ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02180         ACE_TEXT("reader %C from writer %C.\n"),
02181         OPENDDS_STRING(reader_converter).c_str(),
02182         OPENDDS_STRING(writer_converter).c_str()));
02183   }
02184 
02185 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02186   if (this->is_exclusive_ownership_) {
02187     this->owner_manager_->remove_writer (info.writer_id_);
02188     info.clear_owner_evaluated ();
02189   }
02190 #endif
02191 
02192   bool liveliness_changed = false;
02193 
02194   if (info.state_ == WriterInfo::ALIVE) {
02195     -- liveliness_changed_status_.alive_count;
02196     -- liveliness_changed_status_.alive_count_change;
02197     liveliness_changed = true;
02198   }
02199 
02200   if (info.state_ == WriterInfo::DEAD) {
02201     -- liveliness_changed_status_.not_alive_count;
02202     -- liveliness_changed_status_.not_alive_count_change;
02203     liveliness_changed = true;
02204   }
02205 
02206   liveliness_changed_status_.last_publication_handle = info.handle_;
02207   instances_liveliness_update(info, ACE_OS::gettimeofday());
02208 
02209   if (liveliness_changed) {
02210     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02211     this->notify_liveliness_change();
02212   }
02213 }
02214 
02215 void
02216 DataReaderImpl::writer_became_alive(WriterInfo& info,
02217     const ACE_Time_Value& /* when */)
02218 {
02219   if (DCPS_debug_level >= 5) {
02220     GuidConverter reader_converter(subscription_id_);
02221     GuidConverter writer_converter(info.writer_id_);
02222     ACE_DEBUG((LM_DEBUG,
02223         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02224         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02225         OPENDDS_STRING(reader_converter).c_str(),
02226         OPENDDS_STRING(writer_converter).c_str(),
02227         info.get_state_str().c_str()));
02228   }
02229 
02230   // caller should already have the samples_lock_ !!!
02231 
02232   // NOTE: each instance will change to ALIVE_STATE when they receive a sample
02233 
02234   bool liveliness_changed = false;
02235 
02236   if (info.state_ != WriterInfo::ALIVE) {
02237     liveliness_changed_status_.alive_count++;
02238     liveliness_changed_status_.alive_count_change++;
02239     liveliness_changed = true;
02240   }
02241 
02242   if (info.state_ == WriterInfo::DEAD) {
02243     liveliness_changed_status_.not_alive_count--;
02244     liveliness_changed_status_.not_alive_count_change--;
02245     liveliness_changed = true;
02246   }
02247 
02248   liveliness_changed_status_.last_publication_handle = info.handle_;
02249 
02250   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02251 
02252   if (liveliness_changed_status_.alive_count < 0) {
02253     ACE_ERROR((LM_ERROR,
02254         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02255         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02256         liveliness_changed_status_.alive_count));
02257     return;
02258   }
02259 
02260   if (liveliness_changed_status_.not_alive_count < 0) {
02261     ACE_ERROR((LM_ERROR,
02262         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02263         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02264         liveliness_changed_status_.not_alive_count));
02265     return;
02266   }
02267 
02268   // Change the state to ALIVE since handle_timeout may call writer_became_dead
02269   // which need the current state info.
02270   info.state_ = WriterInfo::ALIVE;
02271 
02272   if (this->monitor_) {
02273     this->monitor_->report();
02274   }
02275 
02276   // Call listener only when there are liveliness status changes.
02277   if (liveliness_changed) {
02278     // Avoid possible deadlock by releasing sample_lock_.
02279     // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock()
02280     // for information about the locks involved.
02281     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02282     this->notify_liveliness_change();
02283   }
02284 
02285   // this call will start the liveliness timer if it is not already set
02286   liveliness_timer_->check_liveliness();
02287 }
02288 
02289 void
02290 DataReaderImpl::writer_became_dead(WriterInfo & info,
02291     const ACE_Time_Value& when)
02292 {
02293   if (DCPS_debug_level >= 5) {
02294     GuidConverter reader_converter(subscription_id_);
02295     GuidConverter writer_converter(info.writer_id_);
02296     ACE_DEBUG((LM_DEBUG,
02297         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02298         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02299 
02300         OPENDDS_STRING(reader_converter).c_str(),
02301         OPENDDS_STRING(writer_converter).c_str(),
02302         info.get_state_str().c_str()));
02303   }
02304 
02305 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02306   if (this->is_exclusive_ownership_) {
02307     this->owner_manager_->remove_writer (info.writer_id_);
02308     info.clear_owner_evaluated ();
02309   }
02310 #endif
02311 
02312   // caller should already have the samples_lock_ !!!
02313   bool liveliness_changed = false;
02314 
02315   if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02316     liveliness_changed_status_.not_alive_count++;
02317     liveliness_changed_status_.not_alive_count_change++;
02318     liveliness_changed = true;
02319   }
02320 
02321   if (info.state_ == WriterInfo::ALIVE) {
02322     liveliness_changed_status_.alive_count--;
02323     liveliness_changed_status_.alive_count_change--;
02324     liveliness_changed_status_.not_alive_count++;
02325     liveliness_changed_status_.not_alive_count_change++;
02326     liveliness_changed = true;
02327   }
02328 
02329   liveliness_changed_status_.last_publication_handle = info.handle_;
02330 
02331   //update the state to DEAD.
02332   info.state_ = WriterInfo::DEAD;
02333   info.seen_data_ = false;
02334 
02335   if (this->monitor_) {
02336     this->monitor_->report();
02337   }
02338 
02339   if (liveliness_changed_status_.alive_count < 0) {
02340     ACE_ERROR((LM_ERROR,
02341         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02342         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02343         liveliness_changed_status_.alive_count));
02344     return;
02345   }
02346 
02347   if (liveliness_changed_status_.not_alive_count < 0) {
02348     ACE_ERROR((LM_ERROR,
02349         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02350         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02351         liveliness_changed_status_.not_alive_count));
02352     return;
02353   }
02354 
02355   instances_liveliness_update(info, when);
02356 
02357   // Call listener only when there are liveliness status changes.
02358   if (liveliness_changed) {
02359     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02360     this->notify_liveliness_change();
02361   }
02362 }
02363 
02364 void
02365 DataReaderImpl::instances_liveliness_update(WriterInfo& info,
02366     const ACE_Time_Value& when)
02367 {
02368   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02369   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02370       next = iter; iter != instances_.end(); iter = next) {
02371     ++next;
02372     iter->second->instance_state_.writer_became_dead(
02373         info.writer_id_, liveliness_changed_status_.alive_count, when);
02374   }
02375 }
02376 
02377 void
02378 DataReaderImpl::set_sample_lost_status(
02379     const DDS::SampleLostStatus& status)
02380 {
02381   //!!!caller should have acquired sample_lock_
02382   sample_lost_status_ = status;
02383 }
02384 
02385 void
02386 DataReaderImpl::set_sample_rejected_status(
02387     const DDS::SampleRejectedStatus& status)
02388 {
02389   //!!!caller should have acquired sample_lock_
02390   sample_rejected_status_ = status;
02391 }
02392 
02393 void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
02394                                         SubscriptionInstance*&)
02395 {
02396   if (DCPS_debug_level > 0) {
02397     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02398   }
02399 }
02400 
02401 void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
02402 {
02403   StatsMapType::iterator location
02404   = this->statistics_.find(sample.header_.publication_id_);
02405 
02406   if (location != this->statistics_.end()) {
02407     // This starts as the current time.
02408     ACE_Time_Value  latency = ACE_OS::gettimeofday();
02409 
02410     // The time interval starts at the send end.
02411     DDS::Duration_t then = {
02412         sample.header_.source_timestamp_sec_,
02413         sample.header_.source_timestamp_nanosec_
02414     };
02415 
02416     // latency delay in ACE_Time_Value format.
02417     latency -= duration_to_time_value(then);
02418 
02419     if (this->statistics_enabled()) {
02420       location->second.add_stat(latency);
02421     }
02422 
02423     if (DCPS_debug_level > 9) {
02424       ACE_DEBUG((LM_DEBUG,
02425           ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02426           ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02427           latency.sec(),
02428           latency.msec()));
02429     }
02430 
02431     // Check latency against the budget.
02432     if (time_value_to_duration(latency)
02433         > this->qos_.latency_budget.duration) {
02434       this->notify_latency(sample.header_.publication_id_);
02435     }
02436 
02437   } else if (DCPS_debug_level > 0) {
02438     /// NB: This message is generated contemporaneously with a similar
02439     ///     message from writer_activity().  That message is not marked
02440     ///     as an error, so we follow that lead and leave this as an
02441     ///     informational message, guarded by debug level.  This seems
02442     ///     to be due to late samples (samples delivered after an
02443     ///     association has been torn down).  We may want to promote this
02444     ///     to a warning if other conditions causing this symptom are
02445     ///     discovered.
02446     GuidConverter reader_converter(subscription_id_);
02447     GuidConverter writer_converter(sample.header_.publication_id_);
02448     ACE_DEBUG((LM_DEBUG,
02449         ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02450         ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02451         OPENDDS_STRING(reader_converter).c_str(),
02452         OPENDDS_STRING(writer_converter).c_str()));
02453   }
02454 }
02455 
02456 void DataReaderImpl::notify_latency(PublicationId writer)
02457 {
02458   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02459   // is given to this DataReader then narrow() fails.
02460   DataReaderListener_var listener
02461   = DataReaderListener::_narrow(this->listener_.in());
02462 
02463   if (!CORBA::is_nil(listener.in())) {
02464     WriterIdSeq writerIds;
02465     writerIds.length(1);
02466     writerIds[ 0] = writer;
02467 
02468     DDS::InstanceHandleSeq handles;
02469     this->lookup_instance_handles(writerIds, handles);
02470 
02471     if (handles.length() >= 1) {
02472       this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02473 
02474     } else {
02475       this->budget_exceeded_status_.last_instance_handle = -1;
02476     }
02477 
02478     ++this->budget_exceeded_status_.total_count;
02479     ++this->budget_exceeded_status_.total_count_change;
02480 
02481     listener->on_budget_exceeded(
02482         this->dr_local_objref_.in(),
02483         this->budget_exceeded_status_);
02484 
02485     this->budget_exceeded_status_.total_count_change = 0;
02486   }
02487 }
02488 
02489 #ifndef OPENDDS_SAFETY_PROFILE
02490 void
02491 DataReaderImpl::get_latency_stats(
02492     OpenDDS::DCPS::LatencyStatisticsSeq & stats)
02493 {
02494   stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02495   int index = 0;
02496 
02497   for (StatsMapType::const_iterator current = this->statistics_.begin();
02498       current != this->statistics_.end();
02499       ++current, ++index) {
02500     stats[ index] = current->second.get_stats();
02501     stats[ index].publication = current->first;
02502   }
02503 }
02504 #endif
02505 
02506 void
02507 DataReaderImpl::reset_latency_stats()
02508 {
02509   for (StatsMapType::iterator current = this->statistics_.begin();
02510       current != this->statistics_.end();
02511       ++current) {
02512     current->second.reset_stats();
02513   }
02514 }
02515 
02516 CORBA::Boolean
02517 DataReaderImpl::statistics_enabled()
02518 {
02519   return this->statistics_enabled_;
02520 }
02521 
02522 void
02523 DataReaderImpl::statistics_enabled(
02524     CORBA::Boolean statistics_enabled)
02525 {
02526   this->statistics_enabled_ = statistics_enabled;
02527 }
02528 
02529 void
02530 DataReaderImpl::prepare_to_delete()
02531 {
02532   this->set_deleted(true);
02533   this->stop_associating();
02534   this->send_final_acks();
02535 }
02536 
02537 SubscriptionInstance*
02538 DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02539 {
02540   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, 0);
02541 
02542   SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02543   if (iter == instances_.end()) {
02544     ACE_DEBUG((LM_WARNING,
02545         ACE_TEXT("(%P|%t) WARNING: ")
02546         ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02547         ACE_TEXT("lookup for 0x%x failed\n"),
02548         handle));
02549     return 0;
02550   } // if (0 != instances_.find(handle, instance))
02551 
02552   return iter->second;
02553 }
02554 
02555 DDS::InstanceHandle_t
02556 DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
02557 {
02558   if (is_bit()) {
02559     Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02560     CORBA::String_var topic = get_topic_name();
02561     RepoId id = disc->bit_key_to_repo_id(participant_servant_, topic, key);
02562     return participant_servant_->id_to_handle(id);
02563 
02564   } else {
02565     return participant_servant_->id_to_handle(GUID_UNKNOWN);
02566   }
02567 }
02568 
02569 void
02570 DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
02571 {
02572   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02573 
02574   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02575   // is given to this DataReader then narrow() fails.
02576   DataReaderListener_var the_listener
02577   = DataReaderListener::_narrow(this->listener_.in());
02578 
02579   if (!CORBA::is_nil(the_listener.in())) {
02580     SubscriptionLostStatus status;
02581 
02582     // Since this callback may come after remove_association which removes
02583     // the writer from id_to_handle map, we can ignore this error.
02584     this->lookup_instance_handles(pubids, status.publication_handles);
02585     the_listener->on_subscription_disconnected(this->dr_local_objref_.in(),
02586         status);
02587   }
02588 }
02589 
02590 void
02591 DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
02592 {
02593   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02594 
02595   if (!this->is_bit_) {
02596     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02597     // is given to this DataReader then narrow() fails.
02598     DataReaderListener_var the_listener
02599     = DataReaderListener::_narrow(this->listener_.in());
02600 
02601     if (!CORBA::is_nil(the_listener.in())) {
02602       SubscriptionLostStatus status;
02603 
02604       // If it's reconnected then the reader should be in id_to_handle map otherwise
02605       // log with an error.
02606       if (this->lookup_instance_handles(pubids, status.publication_handles) == false) {
02607         ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::notify_subscription_reconnected: "
02608             "lookup_instance_handles failed.\n"));
02609       }
02610 
02611       the_listener->on_subscription_reconnected(this->dr_local_objref_.in(),
02612           status);
02613     }
02614   }
02615 }
02616 
02617 void
02618 DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
02619 {
02620   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02621 
02622   if (!this->is_bit_) {
02623     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02624     // is given to this DataReader then narrow() fails.
02625     DataReaderListener_var the_listener
02626     = DataReaderListener::_narrow(this->listener_.in());
02627 
02628     if (!CORBA::is_nil(the_listener.in())) {
02629       SubscriptionLostStatus status;
02630 
02631       CORBA::ULong len = handles.length();
02632       status.publication_handles.length(len);
02633 
02634       for (CORBA::ULong i = 0; i < len; ++ i) {
02635         status.publication_handles[i] = handles[i];
02636       }
02637 
02638       the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02639           status);
02640     }
02641   }
02642 }
02643 
02644 void
02645 DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
02646 {
02647   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02648 
02649   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02650   // is given to this DataReader then narrow() fails.
02651   DataReaderListener_var the_listener
02652   = DataReaderListener::_narrow(this->listener_.in());
02653 
02654   if (!CORBA::is_nil(the_listener.in())) {
02655     SubscriptionLostStatus status;
02656 
02657     // Since this callback may come after remove_association which removes
02658     // the writer from id_to_handle map, we can ignore this error.
02659     this->lookup_instance_handles(pubids, status.publication_handles);
02660     the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02661         status);
02662   }
02663 }
02664 
02665 void
02666 DataReaderImpl::notify_connection_deleted(const RepoId& peerId)
02667 {
02668   DBG_ENTRY_LVL("DataReaderImpl","notify_connection_deleted",6);
02669   on_notification_of_connection_deletion(peerId);
02670   // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02671   // is given to this DataWriter then narrow() fails.
02672   DataReaderListener_var the_listener = DataReaderListener::_narrow(this->listener_.in());
02673 
02674   if (!CORBA::is_nil(the_listener.in()))
02675     the_listener->on_connection_deleted(this->dr_local_objref_.in());
02676 }
02677 
02678 bool
02679 DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
02680     DDS::InstanceHandleSeq & hdls)
02681 {
02682   if (DCPS_debug_level > 9) {
02683     CORBA::ULong const size = ids.length();
02684     const char* separator = "";
02685     OPENDDS_STRING guids;
02686 
02687     for (unsigned long i = 0; i < size; ++i) {
02688       guids += separator;
02689       guids += OPENDDS_STRING(GuidConverter(ids[i]));
02690       separator = ", ";
02691     }
02692 
02693     ACE_DEBUG((LM_DEBUG,
02694         ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02695         ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02696         guids.c_str()));
02697   }
02698 
02699   CORBA::ULong const num_wrts = ids.length();
02700   hdls.length(num_wrts);
02701 
02702   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02703     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02704   }
02705 
02706   return true;
02707 }
02708 
02709 bool
02710 DataReaderImpl::filter_sample(const DataSampleHeader& header)
02711 {
02712   ACE_Time_Value now(ACE_OS::gettimeofday());
02713 
02714   // Expire historic data if QoS indicates VOLATILE.
02715   if (!always_get_history_ && header.historic_sample_
02716       && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02717     if (DCPS_debug_level >= 8) {
02718       ACE_DEBUG((LM_DEBUG,
02719           ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02720           ACE_TEXT("Discarded historic data.\n")));
02721     }
02722 
02723     return true;  // Data filtered.
02724   }
02725 
02726   // The LIFESPAN_DURATION_FLAG is set when sample data is sent
02727   // with a non-default LIFESPAN duration value.
02728   if (header.lifespan_duration_) {
02729     // Finite lifespan.  Check if data has expired.
02730 
02731     DDS::Time_t const tmp = {
02732         header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02733         header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02734     };
02735 
02736     // We assume that the publisher host's clock and subcriber host's
02737     // clock are synchronized (allowed by the spec).
02738     ACE_Time_Value const expiration_time(
02739         OpenDDS::DCPS::time_to_time_value(tmp));
02740 
02741     if (now >= expiration_time) {
02742       if (DCPS_debug_level >= 8) {
02743         ACE_Time_Value const diff(now - expiration_time);
02744         ACE_DEBUG((LM_DEBUG,
02745             ACE_TEXT("OpenDDS (%P|%t) Received data ")
02746             ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02747             diff.sec(),
02748             diff.usec()));
02749       }
02750 
02751       return true;  // Data filtered.
02752     }
02753   }
02754 
02755   return false;
02756 }
02757 
02758 bool
02759 DataReaderImpl::filter_instance(SubscriptionInstance* instance,
02760     const PublicationId& pubid)
02761 {
02762 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02763   if (this->is_exclusive_ownership_) {
02764 
02765     WriterMapType::iterator iter = writers_.find(pubid);
02766 
02767     if (iter == writers_.end()) {
02768       if (DCPS_debug_level > 4) {
02769         // This may not be an error since it could happen that the sample
02770         // is delivered to the datareader after the write is dis-associated
02771         // with this datareader.
02772         GuidConverter reader_converter(subscription_id_);
02773         GuidConverter writer_converter(pubid);
02774         ACE_DEBUG((LM_DEBUG,
02775             ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02776             ACE_TEXT("reader %C is not associated with writer %C.\n"),
02777             OPENDDS_STRING(reader_converter).c_str(),
02778             OPENDDS_STRING(writer_converter).c_str()));
02779       }
02780       return true;
02781     }
02782 
02783 
02784     // Evaulate the owner of the instance if not selected and filter
02785     // current message if it's not from owner writer.
02786     if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02787         || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02788       bool is_owner = this->owner_manager_->select_owner (
02789           instance->instance_handle_,
02790           iter->second->writer_id_,
02791           iter->second->writer_qos_.ownership_strength.value,
02792           &instance->instance_state_);
02793       iter->second->set_owner_evaluated (instance->instance_handle_, true);
02794 
02795       if (! is_owner) {
02796         if (DCPS_debug_level >= 1) {
02797           GuidConverter reader_converter(subscription_id_);
02798           GuidConverter writer_converter(pubid);
02799           GuidConverter owner_converter (instance->instance_state_.get_owner ());
02800           ACE_DEBUG((LM_DEBUG,
02801               ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02802               ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02803               OPENDDS_STRING(reader_converter).c_str(),
02804               OPENDDS_STRING(writer_converter).c_str(),
02805               OPENDDS_STRING(owner_converter).c_str()));
02806         }
02807         return true;
02808       }
02809     }
02810     else if (! (instance->instance_state_.get_owner () == pubid)) {
02811       if (DCPS_debug_level >= 1) {
02812         GuidConverter reader_converter(subscription_id_);
02813         GuidConverter writer_converter(pubid);
02814         GuidConverter owner_converter (instance->instance_state_.get_owner ());
02815         ACE_DEBUG((LM_DEBUG,
02816             ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02817             ACE_TEXT("reader %C writer %C is not owner %C\n"),
02818             OPENDDS_STRING(reader_converter).c_str(),
02819             OPENDDS_STRING(writer_converter).c_str(),
02820             OPENDDS_STRING(owner_converter).c_str()));
02821       }
02822       return true;
02823     }
02824   }
02825 #else
02826   ACE_UNUSED_ARG(pubid);
02827 #endif
02828 
02829   ACE_Time_Value now(ACE_OS::gettimeofday());
02830 
02831   // TIME_BASED_FILTER processing; expire data samples
02832   // if minimum separation is not met for instance.
02833   const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02834 
02835   if (this->qos_.time_based_filter.minimum_separation > zero) {
02836     DDS::Duration_t separation =
02837         time_value_to_duration(now - instance->last_accepted_);
02838 
02839     if (separation < this->qos_.time_based_filter.minimum_separation) {
02840       return true;  // Data filtered.
02841     }
02842   }
02843 
02844   instance->last_accepted_ = now;
02845 
02846   return false;
02847 }
02848 
02849 bool DataReaderImpl::is_bit() const
02850 {
02851   return this->is_bit_;
02852 }
02853 
02854 int
02855 DataReaderImpl::num_zero_copies()
02856 {
02857   int loans = 0;
02858   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02859       guard,
02860       this->sample_lock_,
02861       1 /* assume we have loans */);
02862   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,1);
02863 
02864   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02865       iter != instances_.end();
02866       ++iter) {
02867     SubscriptionInstance *ptr = iter->second;
02868 
02869     for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02870         item != 0; item = item->next_data_sample_) {
02871       loans += item->zero_copy_cnt_.value();
02872     }
02873   }
02874 
02875   return loans;
02876 }
02877 
02878 void DataReaderImpl::notify_liveliness_change()
02879 {
02880   // N.B. writers_lock_ should already be acquired when
02881   //      this method is called.
02882 
02883   DDS::DataReaderListener_var listener
02884   = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02885 
02886   if (!CORBA::is_nil(listener.in())) {
02887     listener->on_liveliness_changed(dr_local_objref_.in(),
02888         liveliness_changed_status_);
02889 
02890     liveliness_changed_status_.alive_count_change = 0;
02891     liveliness_changed_status_.not_alive_count_change = 0;
02892   }
02893   notify_status_condition();
02894 
02895   if (DCPS_debug_level > 9) {
02896     OPENDDS_STRING output_str;
02897     output_str += "subscription ";
02898     output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02899     output_str += ", listener at: 0x";
02900     output_str += to_dds_string(this->listener_.in ());
02901 
02902     for (WriterMapType::iterator current = this->writers_.begin();
02903         current != this->writers_.end();
02904         ++current) {
02905       RepoId id = current->first;
02906       output_str += "\n\tNOTIFY: writer[ ";
02907       output_str += OPENDDS_STRING(GuidConverter(id));
02908       output_str += "] == ";
02909       output_str += current->second->get_state_str();
02910     }
02911 
02912     output_str + "\n";
02913     ACE_DEBUG((LM_DEBUG,
02914         ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02915         ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02916         ACE_TEXT("\tNOTIFY: %C\n"),
02917         listener.in (),
02918         listener_mask_,
02919         output_str.c_str()));
02920   }
02921 }
02922 
02923 void DataReaderImpl::post_read_or_take()
02924 {
02925   set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02926   get_subscriber_servant()->set_status_changed_flag(
02927       DDS::DATA_ON_READERS_STATUS, false);
02928 }
02929 
02930 void DataReaderImpl::reschedule_deadline()
02931 {
02932   if (this->watchdog_) {
02933     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02934     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02935         iter != this->instances_.end();
02936         ++iter) {
02937       if (iter->second->deadline_timer_id_ != -1) {
02938         if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02939           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02940               ACE_TEXT("reset_timer_interval")));
02941         }
02942       }
02943     }
02944   }
02945 }
02946 
02947 ACE_Reactor_Timer_Interface*
02948 DataReaderImpl::get_reactor()
02949 {
02950   return this->reactor_;
02951 }
02952 
02953 OpenDDS::DCPS::RepoId
02954 DataReaderImpl::get_topic_id()
02955 {
02956   return this->topic_servant_->get_id();
02957 }
02958 
02959 OpenDDS::DCPS::RepoId
02960 DataReaderImpl::get_dp_id()
02961 {
02962   return this->participant_servant_->get_id();
02963 }
02964 
02965 void
02966 DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02967 {
02968   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02969   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02970 
02971   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02972       end = instances_.end(); iter != end; ++iter) {
02973     instance_handles.push_back(iter->first);
02974   }
02975 }
02976 
02977 void
02978 DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
02979 {
02980   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02981       read_guard,
02982       this->writers_lock_);
02983   for (WriterMapType::iterator iter = writers_.begin();
02984       iter != writers_.end();
02985       ++iter) {
02986     writer_states.push_back(WriterStatePair(iter->first,
02987         iter->second->get_state()));
02988   }
02989 }
02990 
02991 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02992 void
02993 DataReaderImpl::update_ownership_strength (const PublicationId& pub_id,
02994     const CORBA::Long& ownership_strength)
02995 {
02996   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02997       read_guard,
02998       this->writers_lock_);
02999   for (WriterMapType::iterator iter = writers_.begin();
03000       iter != writers_.end();
03001       ++iter) {
03002     if (iter->second->writer_id_ == pub_id) {
03003       if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
03004         if (DCPS_debug_level >= 1) {
03005           GuidConverter reader_converter(this->subscription_id_);
03006           GuidConverter writer_converter(pub_id);
03007           ACE_DEBUG((LM_DEBUG,
03008               ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
03009               ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
03010               OPENDDS_STRING(reader_converter).c_str(),
03011               OPENDDS_STRING(writer_converter).c_str(),
03012               iter->second->writer_qos_.ownership_strength, ownership_strength));
03013         }
03014         iter->second->writer_qos_.ownership_strength.value = ownership_strength;
03015         iter->second->clear_owner_evaluated ();
03016       }
03017       break;
03018     }
03019   }
03020 }
03021 #endif
03022 
03023 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03024 bool DataReaderImpl::verify_coherent_changes_completion (WriterInfo* writer)
03025 {
03026   if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
03027       || ! this->subqos_.presentation.coherent_access) {
03028     this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03029     this->coherent_changes_completed (this);
03030     return true;
03031   }
03032 
03033   // verify current coherent changes from single writer
03034   Coherent_State state = writer->coherent_change_received();
03035   if (writer->group_coherent_) { // GROUP coherent
03036     if (state != NOT_COMPLETED_YET) {
03037       // verify if all readers received complete coherent changes in a group.
03038       this->subscriber_servant_->coherent_change_received (
03039           writer->publisher_id_, this, state);
03040     }
03041   }
03042   else {  // TOPIC coherent
03043     if (state == COMPLETED) {
03044       this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03045     }
03046     else if (state == REJECTED) {
03047       this->reject_coherent (writer->writer_id_, writer->publisher_id_);
03048     }
03049     else {// NOT_COMPLETED
03050       return false;
03051     }
03052 
03053     // decision made: either COMPLETED or REJECTED
03054     writer->reset_coherent_info ();
03055   }
03056 
03057   return state == COMPLETED;
03058 }
03059 
03060 
03061 void DataReaderImpl::accept_coherent (PublicationId& writer_id,
03062     RepoId& publisher_id)
03063 {
03064   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03065     GuidConverter reader (this->subscription_id_);
03066     GuidConverter writer (writer_id);
03067     GuidConverter publisher (publisher_id);
03068     ACE_DEBUG((LM_DEBUG,
03069         ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
03070         ACE_TEXT(" reader %C writer %C publisher %C \n"),
03071         OPENDDS_STRING(reader).c_str(),
03072         OPENDDS_STRING(writer).c_str(),
03073         OPENDDS_STRING(publisher).c_str()));
03074   }
03075 
03076   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03077   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03078 
03079   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03080       iter != this->instances_.end(); ++iter) {
03081     iter->second->rcvd_strategy_->accept_coherent(
03082         writer_id, publisher_id);
03083   }
03084 }
03085 
03086 
03087 void DataReaderImpl::reject_coherent (PublicationId& writer_id,
03088     RepoId& publisher_id)
03089 {
03090   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03091     GuidConverter reader (this->subscription_id_);
03092     GuidConverter writer (writer_id);
03093     GuidConverter publisher (publisher_id);
03094     ACE_DEBUG((LM_DEBUG,
03095         ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
03096         ACE_TEXT(" reader %C writer %C publisher %C \n"),
03097         OPENDDS_STRING(reader).c_str(),
03098         OPENDDS_STRING(writer).c_str(),
03099         OPENDDS_STRING(publisher).c_str()));
03100   }
03101 
03102   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03103   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03104 
03105   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03106       iter != this->instances_.end(); ++iter) {
03107     iter->second->rcvd_strategy_->reject_coherent(
03108         writer_id, publisher_id);
03109   }
03110   this->reset_coherent_info (writer_id, publisher_id);
03111 }
03112 
03113 
03114 void DataReaderImpl::reset_coherent_info (const PublicationId& writer_id,
03115     const RepoId& publisher_id)
03116 {
03117   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03118 
03119   WriterMapType::iterator itEnd = this->writers_.end();
03120   for (WriterMapType::iterator it = this->writers_.begin();
03121       it != itEnd; ++it) {
03122     if (it->second->writer_id_ == writer_id
03123         && it->second->publisher_id_ == publisher_id) {
03124       it->second->reset_coherent_info();
03125     }
03126   }
03127 }
03128 
03129 
03130 void
03131 DataReaderImpl::coherent_change_received (RepoId publisher_id, Coherent_State& result)
03132 {
03133   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03134 
03135   result = COMPLETED;
03136   for (WriterMapType::iterator iter = writers_.begin();
03137       iter != writers_.end();
03138       ++iter) {
03139 
03140     if (iter->second->publisher_id_ == publisher_id) {
03141       Coherent_State state = iter->second->coherent_change_received();
03142       if (state == NOT_COMPLETED_YET) {
03143         result = state;
03144         break;
03145       }
03146       else if (state == REJECTED) {
03147         result = REJECTED;
03148       }
03149     }
03150   }
03151 }
03152 
03153 
03154 void
03155 DataReaderImpl::coherent_changes_completed (DataReaderImpl* reader)
03156 {
03157   this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03158   this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03159 
03160   ::DDS::SubscriberListener_var sub_listener =
03161       this->subscriber_servant_->listener_for(::DDS::DATA_ON_READERS_STATUS);
03162   if (!CORBA::is_nil(sub_listener.in()))
03163   {
03164     if (reader == this) {
03165       // Release the sample_lock before listener callback.
03166       ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03167       sub_listener->on_data_on_readers(this->subscriber_servant_);
03168     }
03169 
03170     this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03171     this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03172   }
03173   else
03174   {
03175     this->subscriber_servant_->notify_status_condition();
03176 
03177     ::DDS::DataReaderListener_var listener =
03178         this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03179 
03180     if (!CORBA::is_nil(listener.in()))
03181     {
03182       if (reader == this) {
03183         // Release the sample_lock before listener callback.
03184         ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03185         listener->on_data_available(dr_local_objref_.in ());
03186       }
03187       else {
03188         listener->on_data_available(dr_local_objref_.in ());
03189       }
03190 
03191       set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03192       this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03193     }
03194     else
03195     {
03196       this->notify_status_condition();
03197     }
03198   }
03199 }
03200 
03201 
03202 void DataReaderImpl::begin_access()
03203 {
03204   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03205   this->coherent_ = true;
03206 }
03207 
03208 
03209 void DataReaderImpl::end_access()
03210 {
03211   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03212   this->coherent_ = false;
03213   this->group_coherent_ordered_data_.reset();
03214   this->post_read_or_take();
03215 }
03216 
03217 
03218 void DataReaderImpl::get_ordered_data (GroupRakeData& data,
03219     DDS::SampleStateMask sample_states,
03220     DDS::ViewStateMask view_states,
03221     DDS::InstanceStateMask instance_states)
03222 {
03223   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03224   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03225 
03226   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03227       iter != instances_.end(); ++iter) {
03228     SubscriptionInstance *ptr = iter->second;
03229     if ((ptr->instance_state_.view_state() & view_states) &&
03230         (ptr->instance_state_.instance_state() & instance_states)) {
03231       size_t i(0);
03232       for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03233           item != 0; item = item->next_data_sample_) {
03234         if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03235           data.insert_sample(item, ptr, ++i);
03236           this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03237         }
03238       }
03239     }
03240   }
03241 }
03242 
03243 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
03244 
03245 void
03246 DataReaderImpl::set_subscriber_qos(
03247     const DDS::SubscriberQos & qos)
03248 {
03249   this->subqos_ = qos;
03250 }
03251 
03252 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
03253 void
03254 DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
03255 {
03256   cft->update_reader_count(true);
03257   cft->add_reader(*this);
03258   content_filtered_topic_ = DDS::ContentFilteredTopic::_duplicate(cft);
03259 }
03260 
03261 DDS::ContentFilteredTopic_ptr
03262 DataReaderImpl::get_cf_topic() const
03263 {
03264   return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_);
03265 }
03266 #endif
03267 
03268 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
03269 
03270 void
03271 DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
03272 {
03273   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03274   disco->update_subscription_params(participant_servant_->get_domain_id(),
03275       participant_servant_->get_id(),
03276       subscription_id_,
03277       params);
03278 }
03279 #endif
03280 
03281 void
03282 DataReaderImpl::reset_ownership (::DDS::InstanceHandle_t instance)
03283 {
03284   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03285   for (WriterMapType::iterator iter = writers_.begin();
03286       iter != writers_.end();
03287       ++iter) {
03288     iter->second->set_owner_evaluated(instance, false);
03289   }
03290 }
03291 
03292 void
03293 DataReaderImpl::resume_sample_processing(const PublicationId& pub_id)
03294 {
03295   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03296   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03297   WriterMapType::iterator where = writers_.find(pub_id);
03298   if (writers_.end() != where) {
03299     WriterInfo& info = *where->second;
03300     // Stop filtering these
03301     if (info.waiting_for_end_historic_samples_) {
03302       end_historic_sweeper_->cancel_timer(where->second);
03303       if (!info.historic_samples_.empty()) {
03304         info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03305       }
03306       to_deliver.swap(info.historic_samples_);
03307       write_guard.release();
03308       deliver_historic(to_deliver);
03309     }
03310   }
03311 }
03312 
03313 bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
03314 {
03315   ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03316   WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03317   if (iter != writers_.end()) {
03318     const SequenceNumber& seq = sample.header_.sequence_;
03319     if (iter->second->waiting_for_end_historic_samples_) {
03320       iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03321       return false;
03322     }
03323     if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03324         && !sample.header_.historic_sample_
03325         && seq <= iter->second->last_historic_seq_) {
03326       // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
03327       return false;
03328     }
03329   }
03330   return true;
03331 }
03332 
03333 void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
03334 {
03335   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03336   const iter_t end = samples.end();
03337   for (iter_t iter = samples.begin(); iter != end; ++iter) {
03338     iter->second.header_.historic_sample_ = true;
03339     data_received(iter->second);
03340   }
03341 }
03342 
03343 void
03344 DataReaderImpl::add_link(const DataLink_rch& link, const RepoId& peer)
03345 {
03346   if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03347 
03348     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03349 
03350     WriterMapType::iterator it = writers_.find(peer);
03351     if (it != writers_.end()) {
03352       // Schedule timer if necessary
03353       //   - only need to check reader qos - we know the writer must be >= reader
03354       end_historic_sweeper_->schedule_timer(it->second);
03355     }
03356   }
03357   TransportClient::add_link(link, peer);
03358   TransportImpl_rch impl = link->impl();
03359   OPENDDS_STRING type = impl->transport_type();
03360 
03361   if (type == "rtps_udp" || type == "multicast") {
03362     resume_sample_processing(peer);
03363   }
03364 }
03365 
03366 void
03367 DataReaderImpl::register_for_writer(const RepoId& participant,
03368                                     const RepoId& readerid,
03369                                     const RepoId& writerid,
03370                                     const TransportLocatorSeq& locators,
03371                                     DiscoveryListener* listener)
03372 {
03373   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03374 }
03375 
03376 void
03377 DataReaderImpl::unregister_for_writer(const RepoId& participant,
03378                                       const RepoId& readerid,
03379                                       const RepoId& writerid)
03380 {
03381   TransportClient::unregister_for_writer(participant, readerid, writerid);
03382 }
03383 
03384 EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
03385                                                                  ACE_thread_t owner,
03386                                                                  DataReaderImpl* reader)
03387   : ReactorInterceptor (reactor, owner)
03388   , reader_(reader)
03389 { }
03390 
03391 EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
03392 { }
03393 
03394 void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03395 {
03396   info->waiting_for_end_historic_samples_ = true;
03397   ScheduleCommand c(this, info);
03398   execute_or_enqueue(c);
03399 }
03400 
03401 void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03402 {
03403   info->waiting_for_end_historic_samples_ = false;
03404   CancelCommand c(this, info);
03405   execute_or_enqueue(c);
03406 }
03407 
03408 int EndHistoricSamplesMissedSweeper::handle_timeout(
03409     const ACE_Time_Value& ,
03410     const void* arg)
03411 {
03412   PublicationId pub_id = reinterpret_cast<const WriterInfo*>(arg)->writer_id_;
03413 
03414   if (DCPS_debug_level >= 1) {
03415     GuidConverter sub_repo(reader_->get_repo_id());
03416     GuidConverter pub_repo(pub_id);
03417     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
03418                OPENDDS_STRING(sub_repo).c_str(),
03419                OPENDDS_STRING(pub_repo).c_str()));
03420   }
03421 
03422   reader_->resume_sample_processing(pub_id);
03423   return 0;
03424 }
03425 
03426 void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
03427 {
03428   static const ACE_Time_Value ten_seconds(10);
03429 
03430   //Pass pointer to writer info for timer to use, must decrease ref count when canceling timer
03431   const void* arg = reinterpret_cast<const void*>(info_.in());
03432   info_->_add_ref();
03433 
03434   info_->historic_samples_timer_ = sweeper_->reactor()->schedule_timer(sweeper_,
03435                                                                        arg,
03436                                                                        ten_seconds);
03437   if (DCPS_debug_level) {
03438     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", info_->historic_samples_timer_));
03439   }
03440 }
03441 
03442 void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
03443 {
03444   if (info_->historic_samples_timer_ != WriterInfo::NO_TIMER) {
03445     sweeper_->reactor()->cancel_timer(info_->historic_samples_timer_);
03446     if (DCPS_debug_level) {
03447       ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", info_->historic_samples_timer_));
03448     }
03449     info_->historic_samples_timer_ = WriterInfo::NO_TIMER;
03450     info_->_remove_ref();
03451   }
03452 }
03453 
03454 } // namespace DCPS
03455 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7