DataReaderImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataReaderImpl.h"
00010 #include "tao/ORB_Core.h"
00011 #include "SubscriptionInstance.h"
00012 #include "ReceivedDataElementList.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "FeatureDisabledQosCheck.h"
00017 #include "GuidConverter.h"
00018 #include "TopicImpl.h"
00019 #include "Serializer.h"
00020 #include "SubscriberImpl.h"
00021 #include "Transient_Kludge.h"
00022 #include "Util.h"
00023 #include "RequestedDeadlineWatchdog.h"
00024 #include "QueryConditionImpl.h"
00025 #include "ReadConditionImpl.h"
00026 #include "MonitorFactory.h"
00027 #include "dds/DCPS/transport/framework/EntryExit.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 #include "dds/DdsDcpsCoreC.h"
00030 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00031 #include "dds/DCPS/SafetyProfileStreams.h"
00032 #if !defined (DDS_HAS_MINIMUM_BIT)
00033 #include "BuiltInTopicUtils.h"
00034 #include "dds/DdsDcpsCoreTypeSupportC.h"
00035 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00036 
00037 #include "ace/Reactor.h"
00038 #include "ace/Auto_Ptr.h"
00039 #include "ace/OS_NS_sys_time.h"
00040 
00041 #include <cstdio>
00042 #include <stdexcept>
00043 
00044 #if !defined (__ACE_INLINE__)
00045 # include "DataReaderImpl.inl"
00046 #endif /* !__ACE_INLINE__ */
00047 
00048 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00049 
00050 namespace OpenDDS {
00051 namespace DCPS {
00052 
00053 DataReaderImpl::DataReaderImpl()
00054 : qos_(TheServiceParticipant->initial_DataReaderQos()),
00055   reverse_sample_lock_(sample_lock_),
00056   topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058   is_exclusive_ownership_ (false),
00059 #endif
00060     coherent_(false),
00061     subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00062     topic_desc_(0),
00063     listener_mask_(DEFAULT_STATUS_MASK),
00064     domain_id_(0),
00065     end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00066     remove_association_sweeper_(make_rch<RemoveAssociationSweeper<DataReaderImpl> >(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00067     n_chunks_(TheServiceParticipant->n_chunks()),
00068     reverse_pub_handle_lock_(publication_handle_lock_),
00069     reactor_(0),
00070     liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071     last_deadline_missed_total_count_(0),
00072     is_bit_(false),
00073     always_get_history_(false),
00074     statistics_enabled_(false),
00075     raw_latency_buffer_size_(0),
00076     raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00077     monitor_(0),
00078     periodic_monitor_(0),
00079     transport_disabled_(false)
00080 {
00081   reactor_ = TheServiceParticipant->timer();
00082 
00083   liveliness_changed_status_.alive_count = 0;
00084   liveliness_changed_status_.not_alive_count = 0;
00085   liveliness_changed_status_.alive_count_change = 0;
00086   liveliness_changed_status_.not_alive_count_change = 0;
00087   liveliness_changed_status_.last_publication_handle =
00088       DDS::HANDLE_NIL;
00089 
00090   requested_deadline_missed_status_.total_count = 0;
00091   requested_deadline_missed_status_.total_count_change = 0;
00092   requested_deadline_missed_status_.last_instance_handle =
00093       DDS::HANDLE_NIL;
00094 
00095   requested_incompatible_qos_status_.total_count = 0;
00096   requested_incompatible_qos_status_.total_count_change = 0;
00097   requested_incompatible_qos_status_.last_policy_id = 0;
00098   requested_incompatible_qos_status_.policies.length(0);
00099 
00100   subscription_match_status_.total_count = 0;
00101   subscription_match_status_.total_count_change = 0;
00102   subscription_match_status_.current_count = 0;
00103   subscription_match_status_.current_count_change = 0;
00104   subscription_match_status_.last_publication_handle =
00105       DDS::HANDLE_NIL;
00106 
00107   sample_lost_status_.total_count = 0;
00108   sample_lost_status_.total_count_change = 0;
00109 
00110   sample_rejected_status_.total_count = 0;
00111   sample_rejected_status_.total_count_change = 0;
00112   sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00113   sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00114 
00115   this->budget_exceeded_status_.total_count = 0;
00116   this->budget_exceeded_status_.total_count_change = 0;
00117   this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00118 
00119   monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00120   periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00121 }
00122 
00123 // This method is called when there are no longer any reference to the
00124 // the servant.
00125 DataReaderImpl::~DataReaderImpl()
00126 {
00127   DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00128 
00129 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00130   OwnershipManagerPtr owner_manager = this->ownership_manager();
00131   if (owner_manager) {
00132     owner_manager->unregister_reader(topic_servant_->type_name(), this);
00133   }
00134 #endif
00135 
00136 }
00137 
00138 // this method is called when delete_datareader is called.
00139 void
00140 DataReaderImpl::cleanup()
00141 {
00142   // As first step set our listener to nill which will prevent us from calling
00143   // back onto the listener at the moment the related DDS entity has been
00144   // deleted
00145   set_listener(0, NO_STATUS_MASK);
00146 
00147 
00148 }
00149 
00150 void DataReaderImpl::init(
00151     TopicDescriptionImpl* a_topic_desc,
00152     const DDS::DataReaderQos &qos,
00153     DDS::DataReaderListener_ptr a_listener,
00154     const DDS::StatusMask & mask,
00155     DomainParticipantImpl* participant,
00156     SubscriberImpl* subscriber)
00157 {
00158   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00159   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00160     topic_servant_ = a_topic;
00161   }
00162 
00163   CORBA::String_var topic_name = a_topic_desc->get_name();
00164 
00165 #if !defined (DDS_HAS_MINIMUM_BIT)
00166   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00167       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00168       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00169       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00170 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00171 
00172   qos_ = qos;
00173 
00174 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00175   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00176 #endif
00177 
00178   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00179   listener_mask_ = mask;
00180 
00181   // Only store the participant pointer, since it is our "grand"
00182   // parent, we will exist as long as it does
00183   participant_servant_ = *participant;
00184 
00185   domain_id_ = participant->get_domain_id();
00186 
00187   subscriber_servant_ = *subscriber;
00188 
00189   if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00190     ACE_DEBUG((LM_WARNING,
00191         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00192         ACE_TEXT("failed to get SubscriberQos\n")));
00193   }
00194 }
00195 
00196 DDS::InstanceHandle_t
00197 DataReaderImpl::get_instance_handle()
00198 {
00199   using namespace OpenDDS::DCPS;
00200   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00201   if (participant)
00202     return participant->id_to_handle(subscription_id_);
00203   return 0;
00204 }
00205 
00206 void
00207 DataReaderImpl::add_association(const RepoId& yourId,
00208     const WriterAssociation& writer,
00209     bool active)
00210 {
00211   if (DCPS_debug_level) {
00212     GuidConverter reader_converter(yourId);
00213     GuidConverter writer_converter(writer.writerId);
00214     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00215         ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00216         OPENDDS_STRING(reader_converter).c_str(),
00217         OPENDDS_STRING(writer_converter).c_str()));
00218   }
00219 
00220   if (entity_deleted_.value()) {
00221     if (DCPS_debug_level) {
00222       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00223           ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00224     }
00225     return;
00226   }
00227 
00228   // We are being called back from the repository before we are done
00229   // processing after our call to the repository that caused this call
00230   // (from the repository) to be made.
00231   if (GUID_UNKNOWN == subscription_id_) {
00232     subscription_id_ = yourId;
00233   }
00234 
00235   //Why do we need the publication_handle_lock_ here?  No access to id_to_handle_map_...
00236   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00237 
00238 
00239   // For each writer in the list of writers to associate with, we
00240   // create a WriterInfo and a WriterStats object and store them in
00241   // our internal maps.
00242   //
00243   {
00244 
00245     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00246 
00247     const PublicationId& writer_id = writer.writerId;
00248     RcHandle<WriterInfo> info = make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos);
00249     std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00250         // This insertion is idempotent.
00251         WriterMapType::value_type(
00252           writer_id,
00253           info));
00254 
00255       // Schedule timer if necessary
00256       //   - only need to check reader qos - we know the writer must be >= reader
00257       if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00258         info->waiting_for_end_historic_samples_ = true;
00259       }
00260 
00261       this->statistics_.insert(
00262         StatsMapType::value_type(
00263             writer_id,
00264             WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00265 
00266     // If this is a durable reader
00267     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00268       // TODO schedule timer for removing flag from writers
00269     }
00270 
00271     if (DCPS_debug_level > 4) {
00272       GuidConverter converter(writer_id);
00273       ACE_DEBUG((LM_DEBUG,
00274           "(%P|%t) DataReaderImpl::add_association: "
00275           "inserted writer %C.return %d \n",
00276           OPENDDS_STRING(converter).c_str(), bpair.second));
00277 
00278       WriterMapType::iterator iter = writers_.find(writer_id);
00279       if (iter != writers_.end()) {
00280         // This may not be an error since it could happen that the sample
00281         // is delivered to the datareader after the write is dis-associated
00282         // with this datareader.
00283         GuidConverter reader_converter(subscription_id_);
00284         GuidConverter writer_converter(writer_id);
00285         ACE_DEBUG((LM_DEBUG,
00286             ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00287             ACE_TEXT("reader %C is associated with writer %C.\n"),
00288             OPENDDS_STRING(reader_converter).c_str(),
00289             OPENDDS_STRING(writer_converter).c_str()));
00290       }
00291     }
00292   }
00293 
00294   // Propagate the add_associations processing down into the Transport
00295   // layer here.  This will establish the transport support and reserve
00296   // usage of an existing connection or initiate creation of a new
00297   // connection if no suitable connection is available.
00298   AssociationData data;
00299   data.remote_id_ = writer.writerId;
00300   data.remote_data_ = writer.writerTransInfo;
00301   data.publication_transport_priority_ =
00302       writer.writerQos.transport_priority.value;
00303   data.remote_reliable_ =
00304       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00305   data.remote_durable_ =
00306       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00307 
00308   //Do not hold publication_handle_lock_ when calling associate due to possible reactor
00309   //deadlock on passive side completion
00310   //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_
00311   //is held here anyway
00312   guard.release();
00313 
00314   if (!associate(data, active)) {
00315     if (DCPS_debug_level) {
00316       ACE_ERROR((LM_ERROR,
00317           ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00318           ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00319     }
00320   }
00321 }
00322 
00323 void
00324 DataReaderImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00325 {
00326   if (!(flags & ASSOC_OK)) {
00327     if (DCPS_debug_level) {
00328       const GuidConverter conv(remote_id);
00329       ACE_ERROR((LM_ERROR,
00330           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00331           ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00332           OPENDDS_STRING(conv).c_str()));
00333     }
00334     return;
00335   }
00336 
00337   const bool active = flags & ASSOC_ACTIVE;
00338   {
00339 
00340     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00341 
00342     // LIVELINESS policy timers are managed here.
00343     if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00344       if (DCPS_debug_level >= 5) {
00345         GuidConverter converter(subscription_id_);
00346         ACE_DEBUG((LM_DEBUG,
00347             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00348             ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00349             OPENDDS_STRING(converter).c_str()));
00350       }
00351       // this call will start the timer if it is not already set
00352       liveliness_timer_->check_liveliness();
00353     }
00354   }
00355   // We no longer hold the publication_handle_lock_.
00356 
00357   if (!is_bit_) {
00358 
00359     RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00360 
00361     if (!participant)
00362       return;
00363 
00364     DDS::InstanceHandle_t handle = participant->id_to_handle(remote_id);
00365 
00366     // We acquire the publication_handle_lock_ for the remainder of our
00367     // processing.
00368     {
00369       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00370 
00371       // This insertion is idempotent.
00372       id_to_handle_map_.insert(
00373           RepoIdToHandleMap::value_type(remote_id, handle));
00374 
00375       if (DCPS_debug_level > 4) {
00376         GuidConverter converter(remote_id);
00377         ACE_DEBUG((LM_DEBUG,
00378             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00379             ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00380             OPENDDS_STRING(converter).c_str(),
00381             handle));
00382       }
00383 
00384       // We need to adjust these after the insertions have all completed
00385       // since insertions are not guaranteed to increase the number of
00386       // currently matched publications.
00387       const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00388       subscription_match_status_.current_count_change =
00389           matchedPublications - subscription_match_status_.current_count;
00390       subscription_match_status_.current_count = matchedPublications;
00391 
00392       ++subscription_match_status_.total_count;
00393       ++subscription_match_status_.total_count_change;
00394 
00395       subscription_match_status_.last_publication_handle = handle;
00396 
00397       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00398 
00399       DDS::DataReaderListener_var listener =
00400           listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00401 
00402       if (!CORBA::is_nil(listener)) {
00403         listener->on_subscription_matched(this, subscription_match_status_);
00404 
00405         // TBD - why does the spec say to change this but not change
00406         //       the ChangeFlagStatus after a listener call?
00407 
00408         // Client will look at it so next time it looks the change should be 0
00409         subscription_match_status_.total_count_change = 0;
00410         subscription_match_status_.current_count_change = 0;
00411       }
00412 
00413       notify_status_condition();
00414     }
00415 
00416     {
00417       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00418       ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00419 
00420       if(!writers_.count(remote_id)){
00421         return;
00422       }
00423       writers_[remote_id]->handle_ = handle;
00424     }
00425   }
00426 
00427   if (!active) {
00428     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00429 
00430     disco->association_complete(domain_id_, dp_id_,
00431         subscription_id_, remote_id);
00432   }
00433 
00434   if (monitor_) {
00435     monitor_->report();
00436   }
00437 }
00438 
00439 void
00440 DataReaderImpl::association_complete(const RepoId& /*remote_id*/)
00441 {
00442   // For the current DCPSInfoRepo implementation, the DataReader side will
00443   // always be passive, so association_complete() will not be called.
00444 }
00445 
00446 void
00447 DataReaderImpl::remove_associations(const WriterIdSeq& writers,
00448     bool notify_lost)
00449 {
00450   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00451 
00452   if (writers.length() == 0) {
00453     return;
00454   }
00455 
00456   if (DCPS_debug_level >= 1) {
00457     GuidConverter reader_converter(subscription_id_);
00458     GuidConverter writer_converter(writers[0]);
00459     ACE_DEBUG((LM_DEBUG,
00460         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00461         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00462         is_bit_,
00463         OPENDDS_STRING(reader_converter).c_str(),
00464         OPENDDS_STRING(writer_converter).c_str(),
00465         writers.length()));
00466   }
00467   if (!this->entity_deleted_.value()) {
00468     // stop pending associations for these writer ids
00469     this->stop_associating(writers.get_buffer(), writers.length());
00470 
00471     // writers which are considered non-active and can
00472     // be removed immediately
00473     WriterIdSeq non_active_writers;
00474     {
00475       CORBA::ULong wr_len = writers.length();
00476       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00477 
00478       for (CORBA::ULong i = 0; i < wr_len; i++) {
00479         PublicationId writer_id = writers[i];
00480 
00481         WriterMapType::iterator it = this->writers_.find(writer_id);
00482         if (it != this->writers_.end() &&
00483             it->second->active()) {
00484           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00485         } else {
00486           push_back(non_active_writers, writer_id);
00487         }
00488       }
00489     }
00490     remove_associations_i(non_active_writers, notify_lost);
00491   } else {
00492     remove_associations_i(writers, notify_lost);
00493   }
00494 }
00495 
00496 void
00497 DataReaderImpl::remove_publication(const PublicationId& pub_id)
00498 {
00499   WriterIdSeq writers;
00500   bool notify = false;
00501   {
00502     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00503     WriterMapType::iterator where = writers_.find(pub_id);
00504     if (writers_.end() != where) {
00505       notify = where->second->notify_lost_;
00506       push_back(writers, pub_id);
00507     }
00508   }
00509   remove_associations_i(writers, notify);
00510 }
00511 
00512 void
00513 DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
00514     bool notify_lost)
00515 {
00516   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00517 
00518   if (writers.length() == 0) {
00519     return;
00520   }
00521 
00522   if (DCPS_debug_level >= 1) {
00523     GuidConverter reader_converter(subscription_id_);
00524     GuidConverter writer_converter(writers[0]);
00525     ACE_DEBUG((LM_DEBUG,
00526         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00527         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00528         is_bit_,
00529         OPENDDS_STRING(reader_converter).c_str(),
00530         OPENDDS_STRING(writer_converter).c_str(),
00531         writers.length()));
00532   }
00533   DDS::InstanceHandleSeq handles;
00534 
00535   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00536 
00537   // This is used to hold the list of writers which were actually
00538   // removed, which is a proper subset of the writers which were
00539   // requested to be removed.
00540   WriterIdSeq updated_writers;
00541 
00542   CORBA::ULong wr_len;
00543 
00544   //Remove the writers from writer list. If the supplied writer
00545   //is not in the cached writers list then it is already removed.
00546   //We just need remove the writers in the list that have not been
00547   //removed.
00548   {
00549     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00550 
00551     wr_len = writers.length();
00552 
00553     for (CORBA::ULong i = 0; i < wr_len; i++) {
00554       PublicationId writer_id = writers[i];
00555 
00556       WriterMapType::iterator it = this->writers_.find(writer_id);
00557 
00558       if (it != this->writers_.end()) {
00559         it->second->removed();
00560         end_historic_sweeper_->cancel_timer(it->second);
00561         remove_association_sweeper_->cancel_timer(it->second);
00562       }
00563 
00564       if (this->writers_.erase(writer_id) == 0) {
00565         if (DCPS_debug_level >= 1) {
00566           GuidConverter converter(writer_id);
00567           ACE_DEBUG((LM_DEBUG,
00568               ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00569               ACE_TEXT("the writer local %C was already removed.\n"),
00570               OPENDDS_STRING(converter).c_str()));
00571         }
00572 
00573       } else {
00574         push_back(updated_writers, writer_id);
00575       }
00576     }
00577   }
00578 
00579   wr_len = updated_writers.length();
00580 
00581   // Return now if the supplied writers have been removed already.
00582   if (wr_len == 0) {
00583     return;
00584   }
00585 
00586   if (!is_bit_) {
00587     // The writer should be in the id_to_handle map at this time.
00588     this->lookup_instance_handles(updated_writers, handles);
00589 
00590     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00591       id_to_handle_map_.erase(updated_writers[i]);
00592     }
00593   }
00594 
00595   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00596     {
00597       this->disassociate(updated_writers[i]);
00598     }
00599   }
00600 
00601   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00602   if (!this->is_bit_) {
00603     // Derive the change in the number of publications writing to this reader.
00604     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00605     this->subscription_match_status_.current_count_change
00606     = matchedPublications - this->subscription_match_status_.current_count;
00607 
00608     // Only process status if the number of publications has changed.
00609     if (this->subscription_match_status_.current_count_change != 0) {
00610       this->subscription_match_status_.current_count = matchedPublications;
00611 
00612       /// Section 7.1.4.1: total_count will not decrement.
00613 
00614       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00615       this->subscription_match_status_.last_publication_handle
00616       = handles[ wr_len - 1];
00617 
00618       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00619 
00620       DDS::DataReaderListener_var listener
00621       = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00622 
00623       if (!CORBA::is_nil(listener.in())) {
00624         listener->on_subscription_matched(this, this->subscription_match_status_);
00625 
00626         // Client will look at it so next time it looks the change should be 0
00627         this->subscription_match_status_.total_count_change = 0;
00628         this->subscription_match_status_.current_count_change = 0;
00629       }
00630       notify_status_condition();
00631     }
00632   }
00633 
00634   // If this remove_association is invoked when the InfoRepo
00635   // detects a lost writer then make a callback to notify
00636   // subscription lost.
00637   if (notify_lost) {
00638     this->notify_subscription_lost(handles);
00639   }
00640 
00641   if (this->monitor_) {
00642     this->monitor_->report();
00643   }
00644 }
00645 
00646 void
00647 DataReaderImpl::remove_all_associations()
00648 {
00649   DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00650   // stop pending associations
00651   this->stop_associating();
00652 
00653   OpenDDS::DCPS::WriterIdSeq writers;
00654   int size;
00655 
00656   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00657 
00658   {
00659     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00660 
00661     size = static_cast<int>(writers_.size());
00662     writers.length(size);
00663 
00664     WriterMapType::iterator curr_writer = writers_.begin();
00665     WriterMapType::iterator end_writer = writers_.end();
00666 
00667     int i = 0;
00668 
00669     while (curr_writer != end_writer) {
00670       writers[i++] = curr_writer->first;
00671       ++curr_writer;
00672     }
00673   }
00674 
00675   try {
00676     CORBA::Boolean dont_notify_lost = 0;
00677 
00678     if (0 < size) {
00679       remove_associations(writers, dont_notify_lost);
00680     }
00681 
00682   } catch (const CORBA::Exception&) {
00683       ACE_DEBUG((LM_WARNING,
00684                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00685                  ACE_TEXT("caught exception from remove_associations.\n")));
00686   }
00687 }
00688 
00689 void
00690 DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00691 {
00692   DDS::DataReaderListener_var listener =
00693       listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00694 
00695   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00696       guard,
00697       this->publication_handle_lock_);
00698 
00699 
00700   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00701     // This test should make the method idempotent.
00702     return;
00703   }
00704 
00705   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00706       true);
00707 
00708   // copy status and increment change
00709   requested_incompatible_qos_status_.total_count = status.total_count;
00710   requested_incompatible_qos_status_.total_count_change +=
00711       status.count_since_last_send;
00712   requested_incompatible_qos_status_.last_policy_id =
00713       status.last_policy_id;
00714   requested_incompatible_qos_status_.policies = status.policies;
00715 
00716   if (!CORBA::is_nil(listener.in())) {
00717     listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
00718 
00719     // TBD - why does the spec say to change total_count_change but not
00720     // change the ChangeFlagStatus after a listener call?
00721 
00722     // client just looked at it so next time it looks the
00723     // change should be 0
00724     requested_incompatible_qos_status_.total_count_change = 0;
00725   }
00726 
00727   notify_status_condition();
00728 }
00729 
00730 void
00731 DataReaderImpl::inconsistent_topic()
00732 {
00733   topic_servant_->inconsistent_topic();
00734 }
00735 
00736 void
00737 DataReaderImpl::signal_liveliness(const RepoId& remote_participant)
00738 {
00739   RepoId prefix = remote_participant;
00740   prefix.entityId = EntityId_t();
00741 
00742   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00743 
00744   typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00745   typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00746   WriterSet writers;
00747 
00748   {
00749     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00751            limit = writers_.end();
00752          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00753          ++pos) {
00754       writers.push_back(std::make_pair(pos->first, pos->second));
00755     }
00756   }
00757 
00758   ACE_Time_Value when = ACE_OS::gettimeofday();
00759   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00760        pos != limit;
00761        ++pos) {
00762     pos->second->received_activity(when);
00763   }
00764 
00765   if (!writers.empty()) {
00766     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00767     for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00768          pos != limit;
00769          ++pos) {
00770       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00771            iter != instances_.end();
00772            ++iter) {
00773         SubscriptionInstance_rch ptr = iter->second;
00774         ptr->instance_state_.lively(pos->first);
00775       }
00776     }
00777   }
00778 }
00779 
00780 DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
00781     DDS::SampleStateMask sample_states,
00782     DDS::ViewStateMask view_states,
00783     DDS::InstanceStateMask instance_states)
00784 {
00785   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00786   DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00787       view_states, instance_states);
00788   read_conditions_.insert(rc);
00789   return rc._retn();
00790 }
00791 
00792 #ifndef OPENDDS_NO_QUERY_CONDITION
00793 DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
00794     DDS::SampleStateMask sample_states,
00795     DDS::ViewStateMask view_states,
00796     DDS::InstanceStateMask instance_states,
00797     const char* query_expression,
00798     const DDS::StringSeq& query_parameters)
00799 {
00800   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00801   try {
00802     DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00803         view_states, instance_states, query_expression);
00804     if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
00805       return 0;
00806     }
00807     DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00808     read_conditions_.insert(rc);
00809     return qc._retn();
00810   } catch (const std::exception& e) {
00811     if (DCPS_debug_level) {
00812       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
00813           ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00814           e.what()));
00815     }
00816   }
00817   return 0;
00818 }
00819 #endif
00820 
00821 bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
00822 {
00823   //sample lock already held
00824   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00825   return read_conditions_.find(rc) != read_conditions_.end();
00826 }
00827 
00828 DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
00829     DDS::ReadCondition_ptr a_condition)
00830 {
00831   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00832       DDS::RETCODE_OUT_OF_RESOURCES);
00833   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00834   return read_conditions_.erase(rc)
00835       ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00836 }
00837 
00838 DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
00839 {
00840   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00841       DDS::RETCODE_OUT_OF_RESOURCES);
00842   read_conditions_.clear();
00843   return DDS::RETCODE_OK;
00844 }
00845 
00846 DDS::ReturnCode_t DataReaderImpl::set_qos(
00847     const DDS::DataReaderQos & qos)
00848 {
00849 
00850   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00852   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00853 
00854   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00855     if (qos_ == qos)
00856       return DDS::RETCODE_OK;
00857 
00858     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00859       return DDS::RETCODE_IMMUTABLE_POLICY;
00860 
00861     } else {
00862       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00863       DDS::SubscriberQos subscriberQos;
00864 
00865       RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
00866       bool status = false;
00867       if (subscriber) {
00868         subscriber->get_qos(subscriberQos);
00869         status =
00870           disco->update_subscription_qos(
00871               domain_id_,
00872               dp_id_,
00873               this->subscription_id_,
00874               qos,
00875               subscriberQos);
00876       }
00877       if (!status) {
00878         ACE_ERROR_RETURN((LM_ERROR,
00879             ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00880             ACE_TEXT("qos not updated. \n")),
00881             DDS::RETCODE_ERROR);
00882       }
00883     }
00884 
00885 
00886     qos_change(qos);
00887     qos_ = qos;
00888 
00889     return DDS::RETCODE_OK;
00890 
00891   } else {
00892     return DDS::RETCODE_INCONSISTENT_POLICY;
00893   }
00894 }
00895 
00896 void DataReaderImpl::qos_change(const DDS::DataReaderQos & qos)
00897 {
00898   // Reset the deadline timer if the period has changed.
00899   if (qos_.deadline.period.sec != qos.deadline.period.sec ||
00900       qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00901     if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00902         qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00903 
00904           this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
00905                   ref(this->sample_lock_),
00906                   qos.deadline,
00907                   ref(*this),
00908                   ref(this->requested_deadline_missed_status_),
00909                   ref(this->last_deadline_missed_total_count_));
00910 
00911     } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00912                qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00913       watchdog_->cancel_all();
00914       watchdog_.reset();
00915     } else {
00916       watchdog_->reset_interval(
00917         duration_to_time_value(qos.deadline.period));
00918     }
00919   }
00920 }
00921 
00922 DDS::ReturnCode_t
00923 DataReaderImpl::get_qos(
00924     DDS::DataReaderQos & qos)
00925 {
00926   qos = qos_;
00927   return DDS::RETCODE_OK;
00928 }
00929 
00930 DDS::ReturnCode_t DataReaderImpl::set_listener(
00931     DDS::DataReaderListener_ptr a_listener,
00932     DDS::StatusMask mask)
00933 {
00934   listener_mask_ = mask;
00935   //note: OK to duplicate  a nil object ref
00936   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00937   return DDS::RETCODE_OK;
00938 }
00939 
00940 DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
00941 {
00942   return DDS::DataReaderListener::_duplicate(listener_.in());
00943 }
00944 
00945 DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
00946 {
00947 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00948   if (content_filtered_topic_) {
00949     return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
00950   }
00951 #endif
00952   return DDS::TopicDescription::_duplicate(topic_desc_.in());
00953 }
00954 
00955 DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
00956 {
00957   return get_subscriber_servant()._retn();
00958 }
00959 
00960 DDS::ReturnCode_t
00961 DataReaderImpl::get_sample_rejected_status(
00962     DDS::SampleRejectedStatus & status)
00963 {
00964   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00965 
00966   set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
00967   status = sample_rejected_status_;
00968   sample_rejected_status_.total_count_change = 0;
00969   return DDS::RETCODE_OK;
00970 }
00971 
00972 DDS::ReturnCode_t
00973 DataReaderImpl::get_liveliness_changed_status(
00974     DDS::LivelinessChangedStatus & status)
00975 {
00976   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00977 
00978   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
00979       false);
00980   status = liveliness_changed_status_;
00981 
00982   liveliness_changed_status_.alive_count_change = 0;
00983   liveliness_changed_status_.not_alive_count_change = 0;
00984 
00985   return DDS::RETCODE_OK;
00986 }
00987 
00988 DDS::ReturnCode_t
00989 DataReaderImpl::get_requested_deadline_missed_status(
00990     DDS::RequestedDeadlineMissedStatus & status)
00991 {
00992   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00993 
00994   set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
00995       false);
00996 
00997   this->requested_deadline_missed_status_.total_count_change =
00998       this->requested_deadline_missed_status_.total_count
00999       - this->last_deadline_missed_total_count_;
01000 
01001   // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
01002   // is updated by the RequestedDeadlineWatchdog.
01003 
01004   // Update for next status check.
01005   this->last_deadline_missed_total_count_ =
01006       this->requested_deadline_missed_status_.total_count;
01007 
01008   status = requested_deadline_missed_status_;
01009 
01010   return DDS::RETCODE_OK;
01011 }
01012 
01013 DDS::ReturnCode_t
01014 DataReaderImpl::get_requested_incompatible_qos_status(
01015     DDS::RequestedIncompatibleQosStatus & status)
01016 {
01017 
01018   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01019       this->publication_handle_lock_);
01020 
01021   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
01022       false);
01023   status = requested_incompatible_qos_status_;
01024   requested_incompatible_qos_status_.total_count_change = 0;
01025 
01026   return DDS::RETCODE_OK;
01027 }
01028 
01029 DDS::ReturnCode_t
01030 DataReaderImpl::get_subscription_matched_status(
01031     DDS::SubscriptionMatchedStatus & status)
01032 {
01033 
01034   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01035       this->publication_handle_lock_);
01036 
01037   set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
01038   status = subscription_match_status_;
01039   subscription_match_status_.total_count_change = 0;
01040   subscription_match_status_.current_count_change = 0;
01041 
01042   return DDS::RETCODE_OK;
01043 }
01044 
01045 DDS::ReturnCode_t
01046 DataReaderImpl::get_sample_lost_status(
01047     DDS::SampleLostStatus & status)
01048 {
01049   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01050 
01051   set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
01052   status = sample_lost_status_;
01053   sample_lost_status_.total_count_change = 0;
01054   return DDS::RETCODE_OK;
01055 }
01056 
01057 DDS::ReturnCode_t
01058 DataReaderImpl::wait_for_historical_data(
01059     const DDS::Duration_t & /* max_wait */)
01060 {
01061   // Add your implementation here
01062   return 0;
01063 }
01064 
01065 DDS::ReturnCode_t
01066 DataReaderImpl::get_matched_publications(
01067     DDS::InstanceHandleSeq & publication_handles)
01068 {
01069   if (enabled_ == false) {
01070     ACE_ERROR_RETURN((LM_ERROR,
01071         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01072         ACE_TEXT(" Entity is not enabled. \n")),
01073         DDS::RETCODE_NOT_ENABLED);
01074   }
01075 
01076   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01077       guard,
01078       this->publication_handle_lock_,
01079       DDS::RETCODE_ERROR);
01080 
01081   // Copy out the handles for the current set of publications.
01082   int index = 0;
01083   publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01084 
01085   for (RepoIdToHandleMap::iterator
01086       current = this->id_to_handle_map_.begin();
01087       current != this->id_to_handle_map_.end();
01088       ++current, ++index) {
01089     publication_handles[ index] = current->second;
01090   }
01091 
01092   return DDS::RETCODE_OK;
01093 }
01094 
01095 #if !defined (DDS_HAS_MINIMUM_BIT)
01096 DDS::ReturnCode_t
01097 DataReaderImpl::get_matched_publication_data(
01098     DDS::PublicationBuiltinTopicData & publication_data,
01099     DDS::InstanceHandle_t publication_handle)
01100 {
01101   if (enabled_ == false) {
01102     ACE_ERROR_RETURN((LM_ERROR,
01103         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01104         ACE_TEXT("get_matched_publication_data: ")
01105         ACE_TEXT("Entity is not enabled. \n")),
01106         DDS::RETCODE_NOT_ENABLED);
01107   }
01108 
01109 
01110   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01111       guard,
01112       this->publication_handle_lock_,
01113       DDS::RETCODE_ERROR);
01114 
01115   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01116 
01117   if (!participant)
01118     return DDS::RETCODE_ERROR;
01119 
01120   DDS::PublicationBuiltinTopicDataSeq data;
01121   const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01122                                   participant.in(),
01123                                   BUILT_IN_PUBLICATION_TOPIC,
01124                                   publication_handle,
01125                                   data);
01126 
01127   if (ret == DDS::RETCODE_OK) {
01128     publication_data = data[0];
01129   }
01130 
01131   return ret;
01132 }
01133 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01134 
01135 DDS::ReturnCode_t
01136 DataReaderImpl::enable()
01137 {
01138   //According spec:
01139   // - Calling enable on an already enabled Entity returns OK and has no
01140   // effect.
01141   // - Calling enable on an Entity whose factory is not enabled will fail
01142   // and return PRECONDITION_NOT_MET.
01143 
01144   if (this->is_enabled()) {
01145     return DDS::RETCODE_OK;
01146   }
01147 
01148   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01149   if (!subscriber) {
01150     return DDS::RETCODE_ERROR;
01151   }
01152 
01153   if (!subscriber->is_enabled()) {
01154     return DDS::RETCODE_PRECONDITION_NOT_MET;
01155   }
01156 
01157   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01158   if (participant) {
01159     dp_id_ = participant->get_id();
01160   }
01161 
01162   if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01163     // The spec says qos_.history.depth is "has no effect"
01164     // when history.kind = KEEP_ALL so use max_samples_per_instance
01165     depth_ = qos_.resource_limits.max_samples_per_instance;
01166 
01167   } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
01168     depth_ = qos_.history.depth;
01169   }
01170 
01171   if (depth_ == DDS::LENGTH_UNLIMITED) {
01172     // DDS::LENGTH_UNLIMITED is negative so make it a positive
01173     // value that is for all intents and purposes unlimited
01174     // and we can use it for comparisons.
01175     // use 2147483647L because that is the greatest value a signed
01176     // CORBA::Long can have.
01177     // WARNING: The client risks running out of memory in this case.
01178     depth_ = 2147483647L;
01179   }
01180 
01181   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01182     n_chunks_ = qos_.resource_limits.max_samples;
01183   }
01184 
01185   //else using value from Service_Participant
01186 
01187   // enable the type specific part of this DataReader
01188   this->enable_specific();
01189 
01190   //Note: the QoS used to set n_chunks_ is Changable=No so
01191   // it is OK that we cannot change the size of our allocators.
01192   rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_));
01193 
01194   if (DCPS_debug_level >= 2)
01195     ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01196         " Cached_Allocator_With_Overflow %x with %d chunks\n",
01197         rd_allocator_.get(), n_chunks_));
01198 
01199   if ((qos_.liveliness.lease_duration.sec !=
01200       DDS::DURATION_INFINITE_SEC) &&
01201       (qos_.liveliness.lease_duration.nanosec !=
01202           DDS::DURATION_INFINITE_NSEC)) {
01203     liveliness_lease_duration_ =
01204         duration_to_time_value(qos_.liveliness.lease_duration);
01205   }
01206 
01207   // Setup the requested deadline watchdog if the configured deadline
01208   // period is not the default (infinite).
01209   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01210 
01211   if (!this->watchdog_
01212       && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01213           || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01214     this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
01215             ref(this->sample_lock_),
01216             this->qos_.deadline,
01217             ref(*this),
01218             ref(this->requested_deadline_missed_status_),
01219             ref(this->last_deadline_missed_total_count_));
01220   }
01221 
01222   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01223   disco->pre_reader(this);
01224 
01225   this->set_enabled();
01226 
01227   if (topic_servant_ && !transport_disabled_) {
01228 
01229     try {
01230       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01231           this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01232     } catch (const Transport::Exception&) {
01233       ACE_ERROR((LM_ERROR,
01234           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01235           ACE_TEXT("Transport Exception.\n")));
01236       return DDS::RETCODE_ERROR;
01237 
01238     }
01239 
01240     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01241 
01242     CORBA::String_var filterClassName = "";
01243     CORBA::String_var filterExpression = "";
01244     DDS::StringSeq exprParams;
01245 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01246 
01247     if (content_filtered_topic_) {
01248       filterClassName = content_filtered_topic_->get_filter_class_name();
01249       filterExpression = content_filtered_topic_->get_filter_expression();
01250       content_filtered_topic_->get_expression_parameters(exprParams);
01251     }
01252 
01253 #endif
01254 
01255     DDS::SubscriberQos sub_qos;
01256     subscriber->get_qos(sub_qos);
01257 
01258     this->subscription_id_ =
01259         disco->add_subscription(this->domain_id_,
01260             this->dp_id_,
01261             this->topic_servant_->get_id(),
01262             this,
01263             this->qos_,
01264             trans_conf_info,
01265             sub_qos,
01266             filterClassName,
01267             filterExpression,
01268             exprParams);
01269 
01270     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01271       ACE_ERROR((LM_WARNING,
01272           ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::enable, ")
01273           ACE_TEXT("add_subscription returned invalid id.\n")));
01274       return DDS::RETCODE_ERROR;
01275     }
01276   }
01277 
01278   if (topic_servant_) {
01279     const CORBA::String_var name = topic_servant_->get_name();
01280     DDS::ReturnCode_t return_value =
01281         subscriber->reader_enabled(name.in(), this);
01282 
01283     if (this->monitor_) {
01284       this->monitor_->report();
01285     }
01286 
01287     return return_value;
01288   } else {
01289     return DDS::RETCODE_OK;
01290   }
01291 }
01292 
01293 void
01294 DataReaderImpl::writer_activity(const DataSampleHeader& header)
01295 {
01296   // caller should have the sample_lock_ !!!
01297 
01298   RcHandle<WriterInfo> writer;
01299 
01300   // The received_activity() has to be called outside the writers_lock_
01301   // because it probably acquire writers_lock_ read lock recursively
01302   // (in handle_timeout). This could cause deadlock when there are writers
01303   // waiting.
01304   {
01305     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01306     WriterMapType::iterator iter = writers_.find(header.publication_id_);
01307 
01308     if (iter != writers_.end()) {
01309       writer = iter->second;
01310 
01311     } else if (DCPS_debug_level > 4) {
01312       // This may not be an error since it could happen that the sample
01313       // is delivered to the datareader after the write is dis-associated
01314       // with this datareader.
01315       GuidConverter reader_converter(subscription_id_);
01316       GuidConverter writer_converter(header.publication_id_);
01317       ACE_DEBUG((LM_DEBUG,
01318           ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01319           ACE_TEXT("reader %C is not associated with writer %C.\n"),
01320           OPENDDS_STRING(reader_converter).c_str(),
01321           OPENDDS_STRING(writer_converter).c_str()));
01322     }
01323   }
01324 
01325   if (!writer.is_nil()) {
01326     ACE_Time_Value when = ACE_OS::gettimeofday();
01327     writer->received_activity(when);
01328 
01329     if ((header.message_id_ == SAMPLE_DATA) ||
01330         (header.message_id_ == INSTANCE_REGISTRATION) ||
01331         (header.message_id_ == UNREGISTER_INSTANCE) ||
01332         (header.message_id_ == DISPOSE_INSTANCE) ||
01333         (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01334 
01335       const SequenceNumber defaultSN;
01336       SequenceRange resetRange(defaultSN, header.sequence_);
01337 
01338 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01339       if (header.coherent_change_) {
01340         if (writer->coherent_samples_ == 0) {
01341           writer->coherent_sample_sequence_.reset();
01342           writer->coherent_sample_sequence_.insert(resetRange);
01343         }
01344         else {
01345           writer->coherent_sample_sequence_.insert(header.sequence_);
01346         }
01347       }
01348 #endif
01349     }
01350   }
01351 }
01352 
01353 void
01354 DataReaderImpl::data_received(const ReceivedDataSample& sample)
01355 {
01356   DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01357 
01358   // ensure some other thread is not changing the sample container
01359   // or statuses related to samples.
01360   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01361 
01362   if (get_deleted()) return;
01363 
01364   if (DCPS_debug_level > 9) {
01365     GuidConverter converter(subscription_id_);
01366     ACE_DEBUG((LM_DEBUG,
01367         ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01368         ACE_TEXT("%C received sample: %C.\n"),
01369         OPENDDS_STRING(converter).c_str(),
01370         to_string(sample.header_).c_str()));
01371   }
01372 
01373   switch (sample.header_.message_id_) {
01374   case SAMPLE_DATA:
01375   case INSTANCE_REGISTRATION: {
01376     if (!check_historic(sample)) break;
01377 
01378     DataSampleHeader const & header = sample.header_;
01379 
01380     this->writer_activity(header);
01381 
01382     // Verify data has not exceeded its lifespan.
01383     if (this->filter_sample(header)) break;
01384 
01385     // This adds the reader to the set/list of readers with data.
01386     RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01387     if (subscriber)
01388       subscriber->data_received(this);
01389 
01390     // Only gather statistics about real samples, not registration data, etc.
01391     if (header.message_id_ == SAMPLE_DATA) {
01392       this->process_latency(sample);
01393     }
01394 
01395     // This also adds to the sample container and makes any callbacks
01396     // and condition modifications.
01397 
01398     SubscriptionInstance_rch instance;
01399     bool is_new_instance = false;
01400     bool filtered = false;
01401     if (sample.header_.key_fields_only_) {
01402       dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01403     } else {
01404       dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01405     }
01406 
01407     // Per sample logging
01408     if (DCPS_debug_level >= 8) {
01409       GuidConverter reader_converter(subscription_id_);
01410       GuidConverter writer_converter(header.publication_id_);
01411 
01412       ACE_DEBUG ((LM_DEBUG,
01413           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01414           ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01415           OPENDDS_STRING(reader_converter).c_str(),
01416           OPENDDS_STRING(writer_converter).c_str(),
01417           instance ? instance->instance_handle_ : 0,
01418               is_new_instance, filtered));
01419     }
01420 
01421     if (filtered) break; // sample filtered from instance
01422 
01423     if (instance) accept_sample_processing(instance, header, is_new_instance);
01424   }
01425   break;
01426 
01427 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01428   case END_COHERENT_CHANGES: {
01429     CoherentChangeControl control;
01430 
01431     this->writer_activity(sample.header_);
01432 
01433     Serializer serializer(
01434         sample.sample_.get(), sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01435     if (!(serializer >> control)) {
01436       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01437           ACE_TEXT("deserialization coherent change control failed.\n")));
01438       return;
01439     }
01440 
01441     if (DCPS_debug_level > 0) {
01442       std::stringstream buffer;
01443       buffer << control << std::endl;
01444 
01445       ACE_DEBUG((LM_DEBUG,
01446           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01447           ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01448           buffer.str().c_str()));
01449     }
01450 
01451     RcHandle<WriterInfo> writer;
01452     {
01453       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01454 
01455       WriterMapType::iterator it =
01456           this->writers_.find(sample.header_.publication_id_);
01457 
01458       if (it == this->writers_.end()) {
01459         GuidConverter sub_id(this->subscription_id_);
01460         GuidConverter pub_id(sample.header_.publication_id_);
01461         ACE_DEBUG((LM_WARNING,
01462             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01463             ACE_TEXT(" subscription %C failed to find ")
01464             ACE_TEXT(" publication data for %C!\n"),
01465             OPENDDS_STRING(sub_id).c_str(),
01466             OPENDDS_STRING(pub_id).c_str()));
01467         return;
01468       }
01469       else {
01470         writer = it->second;
01471       }
01472       it->second->set_group_info (control);
01473     }
01474 
01475     if (this->verify_coherent_changes_completion(writer.in())) {
01476       this->notify_read_conditions();
01477     }
01478   }
01479   break;
01480 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01481 
01482   case DATAWRITER_LIVELINESS: {
01483     if (DCPS_debug_level >= 4) {
01484       GuidConverter reader_converter(subscription_id_);
01485       GuidConverter writer_converter(sample.header_.publication_id_);
01486       ACE_DEBUG((LM_DEBUG,
01487                  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01488                  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01489                  OPENDDS_STRING(reader_converter).c_str(),
01490                  OPENDDS_STRING(writer_converter).c_str()));
01491     }
01492     this->writer_activity(sample.header_);
01493 
01494     // tell all instances they got a liveliness message
01495     {
01496       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01497       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01498           iter != instances_.end();
01499           ++iter) {
01500         iter->second->instance_state_.lively(sample.header_.publication_id_);
01501       }
01502     }
01503 
01504   }
01505   break;
01506 
01507   case DISPOSE_INSTANCE: {
01508     if (!check_historic(sample)) break;
01509     this->writer_activity(sample.header_);
01510     SubscriptionInstance_rch instance;
01511 
01512     if (this->watchdog_.in()) {
01513       // Find the instance first for timer cancellation since
01514       // the instance may be deleted during dispose and can
01515       // not be accessed.
01516       ReceivedDataSample dup(sample);
01517       this->lookup_instance(dup, instance);
01518 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01519       OwnershipManagerPtr owner_manager = this->ownership_manager();
01520 
01521       if (! this->is_exclusive_ownership_
01522           || (owner_manager
01523               && (instance)
01524               && (owner_manager->is_owner (instance->instance_handle_,
01525                   sample.header_.publication_id_)))) {
01526 #endif
01527         this->watchdog_->cancel_timer(instance);
01528 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01529       }
01530 #endif
01531     }
01532     instance.reset();
01533     this->dispose_unregister(sample, instance);
01534   }
01535   this->notify_read_conditions();
01536   break;
01537 
01538   case UNREGISTER_INSTANCE: {
01539     if (!check_historic(sample)) break;
01540     this->writer_activity(sample.header_);
01541     SubscriptionInstance_rch instance;
01542 
01543     if (this->watchdog_.in()) {
01544       // Find the instance first for timer cancellation since
01545       // the instance may be deleted during dispose and can
01546       // not be accessed.
01547       ReceivedDataSample dup(sample);
01548       this->lookup_instance(dup, instance);
01549       if (instance) {
01550 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01551         if (! this->is_exclusive_ownership_
01552             || (this->is_exclusive_ownership_
01553                 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01554 #endif
01555           this->watchdog_->cancel_timer(instance);
01556 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01557         }
01558 #endif
01559       }
01560     }
01561     instance.reset();
01562     this->dispose_unregister(sample, instance);
01563   }
01564   this->notify_read_conditions();
01565   break;
01566 
01567   case DISPOSE_UNREGISTER_INSTANCE: {
01568     if (!check_historic(sample)) break;
01569     this->writer_activity(sample.header_);
01570     SubscriptionInstance_rch instance;
01571 
01572     if (this->watchdog_.in()) {
01573       // Find the instance first for timer cancellation since
01574       // the instance may be deleted during dispose and can
01575       // not be accessed.
01576       ReceivedDataSample dup(sample);
01577       this->lookup_instance(dup, instance);
01578 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01579       OwnershipManagerPtr owner_manager = this->ownership_manager();
01580       if (! this->is_exclusive_ownership_
01581           || (owner_manager
01582               && (instance)
01583               && (owner_manager->is_owner (instance->instance_handle_,
01584                   sample.header_.publication_id_)))
01585                   || (this->is_exclusive_ownership_
01586                       && (instance)
01587                       && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01588 #endif
01589         if (instance) {
01590           this->watchdog_->cancel_timer(instance);
01591         }
01592 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01593       }
01594 #endif
01595     }
01596     instance.reset();
01597     this->dispose_unregister(sample, instance);
01598   }
01599   this->notify_read_conditions();
01600   break;
01601 
01602   case END_HISTORIC_SAMPLES: {
01603     if (sample.header_.message_length_ >= sizeof(RepoId)) {
01604       Serializer ser(sample.sample_.get());
01605       RepoId readerId = GUID_UNKNOWN;
01606       if (!(ser >> readerId)) {
01607         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01608             ACE_TEXT("deserialization reader failed.\n")));
01609         return;
01610       }
01611       if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01612         break; // not our message
01613       }
01614     }
01615     if (DCPS_debug_level > 4) {
01616       ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01617     }
01618     // Going to acquire writers lock, release samples lock
01619     guard.release();
01620     this->resume_sample_processing(sample.header_.publication_id_);
01621     if (DCPS_debug_level > 4) {
01622       GuidConverter pub_id(sample.header_.publication_id_);
01623       ACE_DEBUG((
01624           LM_INFO,
01625           "(%P|%t) Resumed sample processing for durable writer %C\n",
01626           OPENDDS_STRING(pub_id).c_str()));
01627     }
01628     break;
01629   }
01630 
01631   default:
01632     ACE_ERROR((LM_ERROR,
01633         "(%P|%t) ERROR: DataReaderImpl::data_received"
01634         "unexpected message_id = %d\n",
01635         sample.header_.message_id_));
01636     break;
01637   }
01638 }
01639 
01640 RcHandle<EntityImpl>
01641 DataReaderImpl::parent() const
01642 {
01643   return this->subscriber_servant_.lock();
01644 }
01645 
01646 bool
01647 DataReaderImpl::check_transport_qos(const TransportInst& ti)
01648 {
01649   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01650     return ti.is_reliable();
01651   }
01652   return true;
01653 }
01654 
01655 void DataReaderImpl::notify_read_conditions()
01656 {
01657   //sample lock is already held
01658   ReadConditionSet local_read_conditions = read_conditions_;
01659   ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01660 
01661   for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01662       end = local_read_conditions.end(); it != end; ++it) {
01663     ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
01664     if (ci) {
01665       ci->signal_all();
01666     } else {
01667       ACE_ERROR((LM_ERROR,
01668         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
01669         ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
01670     }
01671   }
01672 }
01673 
01674 RcHandle<SubscriberImpl>
01675 DataReaderImpl::get_subscriber_servant()
01676 {
01677   return subscriber_servant_.lock();
01678 }
01679 
01680 RepoId DataReaderImpl::get_subscription_id() const
01681 {
01682   return subscription_id_;
01683 }
01684 
01685 bool DataReaderImpl::have_sample_states(
01686     DDS::SampleStateMask sample_states) const
01687 {
01688   //!!!caller should have acquired sample_lock_
01689   /// @TODO: determine correct failed lock return value.
01690   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01691 
01692   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01693       iter != instances_.end();
01694       ++iter) {
01695     SubscriptionInstance_rch ptr = iter->second;
01696 
01697     for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01698         item != 0; item = item->next_data_sample_) {
01699       if (item->sample_state_ & sample_states) {
01700         return true;
01701       }
01702     }
01703   }
01704 
01705   return false;
01706 }
01707 
01708 bool
01709 DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
01710 {
01711   //!!!caller should have acquired sample_lock_
01712   /// @TODO: determine correct failed lock return value.
01713   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01714 
01715   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01716       iter != instances_.end();
01717       ++iter) {
01718     SubscriptionInstance_rch ptr = iter->second;
01719 
01720     if (ptr->instance_state_.view_state() & view_states) {
01721       return true;
01722     }
01723   }
01724 
01725   return false;
01726 }
01727 
01728 bool DataReaderImpl::have_instance_states(
01729     DDS::InstanceStateMask instance_states) const
01730 {
01731   //!!!caller should have acquired sample_lock_
01732   /// @TODO: determine correct failed lock return value.
01733   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01734 
01735   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01736       iter != instances_.end();
01737       ++iter) {
01738     SubscriptionInstance_rch ptr = iter->second;
01739 
01740     if (ptr->instance_state_.instance_state() & instance_states) {
01741       return true;
01742     }
01743   }
01744 
01745   return false;
01746 }
01747 
01748 /// Fold-in the three separate loops of have_sample_states(),
01749 /// have_view_states(), and have_instance_states().  Takes the sample_lock_.
01750 bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
01751     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
01752 {
01753   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01754   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01755 
01756   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01757       end = instances_.end(); iter != end; ++iter) {
01758     SubscriptionInstance& inst = *iter->second;
01759 
01760     if ((inst.instance_state_.view_state() & view_states) &&
01761         (inst.instance_state_.instance_state() & instance_states)) {
01762       for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01763           item = item->next_data_sample_) {
01764         if (item->sample_state_ & sample_states
01765 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01766             && !item->coherent_change_
01767 #endif
01768         ) {
01769           return true;
01770         }
01771       }
01772     }
01773   }
01774 
01775   return false;
01776 }
01777 
01778 DDS::DataReaderListener_ptr
01779 DataReaderImpl::listener_for(DDS::StatusKind kind)
01780 {
01781   // per 2.1.4.3.1 Listener Access to Plain Communication Status
01782   // use this entities factory if listener is mask not enabled
01783   // for this kind.
01784   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01785   if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
01786     return subscriber->listener_for(kind);
01787 
01788   } else {
01789     return DDS::DataReaderListener::_duplicate(listener_.in());
01790   }
01791 }
01792 
01793 void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
01794     const ReceivedDataElement *ptr)
01795 {
01796 
01797   sample_info.sample_rank = 0;
01798 
01799   // generation_rank =
01800   //    (MRSIC.disposed_generation_count +
01801   //     MRSIC.no_writers_generation_count)
01802   //  - (S.disposed_generation_count +
01803   //     S.no_writers_generation_count)
01804   //
01805   sample_info.generation_rank =
01806       (sample_info.disposed_generation_count +
01807           sample_info.no_writers_generation_count) -
01808           sample_info.generation_rank;
01809 
01810   // absolute_generation_rank =
01811   //     (MRS.disposed_generation_count +
01812   //      MRS.no_writers_generation_count)
01813   //   - (S.disposed_generation_count +
01814   //      S.no_writers_generation_count)
01815   //
01816   sample_info.absolute_generation_rank =
01817       (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01818           static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01819           sample_info.absolute_generation_rank;
01820 
01821   sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01822 }
01823 
01824 CORBA::Long DataReaderImpl::total_samples() const
01825 {
01826   //!!!caller should have acquired sample_lock_
01827   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01828 
01829   CORBA::Long count(0);
01830 
01831   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01832       iter != instances_.end();
01833       ++iter) {
01834     SubscriptionInstance_rch ptr = iter->second;
01835 
01836     count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01837   }
01838 
01839   return count;
01840 }
01841 
01842 int
01843 DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
01844                                                 const void * /*arg*/)
01845 {
01846   check_liveliness_i(false, tv);
01847   return 0;
01848 }
01849 
01850 void
01851 DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
01852                                                     const ACE_Time_Value& now)
01853 {
01854   // Working copy of the active timer Id.
01855 
01856   RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
01857   if (! data_reader) {
01858     this->reactor()->purge_pending_notifications(this);
01859     return;
01860   }
01861 
01862   long local_timer_id = liveliness_timer_id_;
01863   bool timer_was_reset = false;
01864 
01865   if (local_timer_id != -1 && cancel) {
01866     if (DCPS_debug_level >= 5) {
01867       GuidConverter converter(data_reader->get_subscription_id());
01868       ACE_DEBUG((LM_DEBUG,
01869                  ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01870                  ACE_TEXT(" canceling timer for reader %C.\n"),
01871                  OPENDDS_STRING(converter).c_str()));
01872     }
01873 
01874     // called from add_associations and there is already a timer
01875     // so cancel the existing timer.
01876     if (this->reactor()->cancel_timer(local_timer_id) == -1) {
01877       // this could fail because the reactor's call and
01878       // the add_associations' call to this could overlap
01879       // so it is not a failure.
01880       ACE_DEBUG((LM_DEBUG,
01881                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01882                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
01883     }
01884 
01885     timer_was_reset = true;
01886   }
01887 
01888   // Used after the lock scope ends.
01889   ACE_Time_Value smallest(ACE_Time_Value::max_time);
01890   int alive_writers = 0;
01891 
01892   // This is a bit convoluted.  The reasoning goes as follows:
01893   // 1) We grab the current timer Id value when we enter the method.
01894   // 2) We *might* cancel the timer if it is active.
01895   // 3) The timer *might* be rescheduled while we do not hold the sample lock.
01896   // 4) If we (or another thread) canceled the timer that we can tell, then
01897   // 5) we should clear the Id value,
01898   // 6) unless it has been rescheduled.
01899   // We are using a changed timer Id value as a proxy for having been
01900   // rescheduled.
01901   if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
01902     liveliness_timer_id_ = -1;
01903   }
01904 
01905   ACE_Time_Value next_absolute;
01906 
01907   // Iterate over each writer to this reader
01908   {
01909     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
01910         read_guard,
01911         data_reader->writers_lock_);
01912 
01913     for (WriterMapType::iterator iter = data_reader->writers_.begin();
01914         iter != data_reader->writers_.end();
01915         ++iter) {
01916       // deal with possibly not being alive or
01917       // tell when it will not be alive next (if no activity)
01918       next_absolute = iter->second->check_activity(now);
01919 
01920       if (next_absolute != ACE_Time_Value::max_time) {
01921         alive_writers++;
01922 
01923         if (next_absolute < smallest) {
01924           smallest = next_absolute;
01925         }
01926       }
01927     }
01928   }
01929 
01930   if (!alive_writers) {
01931     // no live writers so no need to schedule a timer
01932     // but be sure we don't try to cancel the timer later.
01933     liveliness_timer_id_ = -1;
01934   }
01935 
01936   if (DCPS_debug_level >= 5) {
01937     GuidConverter converter(data_reader->get_subscription_id());
01938     ACE_DEBUG((LM_DEBUG,
01939         ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01940         ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
01941         OPENDDS_STRING(converter).c_str(),
01942         alive_writers,
01943         !cancel));
01944   }
01945 
01946   // Call into the reactor after releasing the sample lock.
01947   if (alive_writers) {
01948     ACE_Time_Value relative;
01949 
01950     // compare the time now with the earliest(smallest) deadline we found
01951     if (now < smallest) {
01952       relative = smallest - now;
01953     } else {
01954       relative = ACE_Time_Value(0,1); // ASAP
01955     }
01956 
01957     liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
01958 
01959     if (liveliness_timer_id_ == -1) {
01960       ACE_ERROR((LM_ERROR,
01961           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01962           ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
01963     }
01964   }
01965 }
01966 
01967 void
01968 DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
01969 {
01970 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01971   OwnershipManagerPtr owner_manager = this->ownership_manager();
01972   if (owner_manager) {
01973     owner_manager->remove_writers (handle);
01974   }
01975 #endif
01976 
01977   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01978   SubscriptionInstance_rch instance = this->get_handle_instance(handle);
01979 
01980   if (!instance) {
01981     ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
01982         "could not find the instance by handle 0x%x\n", handle));
01983     return;
01984   }
01985 
01986   this->purge_data(instance);
01987 
01988   {
01989     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01990     instances_.erase(handle);
01991   }
01992 
01993   this->release_instance_i(handle);
01994   if (this->monitor_) {
01995     this->monitor_->report();
01996   }
01997 }
01998 
01999 
02000 OpenDDS::DCPS::WriterStats::WriterStats(
02001     int amount,
02002     DataCollector<double>::OnFull type) : stats_(amount, type)
02003 {
02004 }
02005 
02006 void OpenDDS::DCPS::WriterStats::add_stat(const ACE_Time_Value& delay)
02007 {
02008   double datum = static_cast<double>(delay.sec());
02009   datum += delay.usec() / 1000000.0;
02010   this->stats_.add(datum);
02011 }
02012 
02013 OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
02014 {
02015   LatencyStatistics value;
02016 
02017   value.publication = GUID_UNKNOWN;
02018   value.n           = this->stats_.n();
02019   value.maximum     = this->stats_.maximum();
02020   value.minimum     = this->stats_.minimum();
02021   value.mean        = this->stats_.mean();
02022   value.variance    = this->stats_.var();
02023 
02024   return value;
02025 }
02026 
02027 void OpenDDS::DCPS::WriterStats::reset_stats()
02028 {
02029   this->stats_.reset();
02030 }
02031 
02032 #ifndef OPENDDS_SAFETY_PROFILE
02033 std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
02034 {
02035   str << std::dec << this->stats_.size()
02036                               << " samples out of " << this->stats_.n() << std::endl;
02037   return str << this->stats_;
02038 }
02039 #endif //OPENDDS_SAFETY_PROFILE
02040 
02041 void
02042 DataReaderImpl::writer_removed(WriterInfo& info)
02043 {
02044   if (DCPS_debug_level >= 5) {
02045     GuidConverter reader_converter(subscription_id_);
02046     GuidConverter writer_converter(info.writer_id_);
02047     ACE_DEBUG((LM_DEBUG,
02048         ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02049         ACE_TEXT("reader %C from writer %C.\n"),
02050         OPENDDS_STRING(reader_converter).c_str(),
02051         OPENDDS_STRING(writer_converter).c_str()));
02052   }
02053 
02054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02055   OwnershipManagerPtr owner_manager = this->ownership_manager();
02056   if (owner_manager) {
02057     owner_manager->remove_writer (info.writer_id_);
02058     info.clear_owner_evaluated ();
02059   }
02060 #endif
02061 
02062   bool liveliness_changed = false;
02063 
02064   if (info.state_ == WriterInfo::ALIVE) {
02065     -- liveliness_changed_status_.alive_count;
02066     -- liveliness_changed_status_.alive_count_change;
02067     liveliness_changed = true;
02068   }
02069 
02070   if (info.state_ == WriterInfo::DEAD) {
02071     -- liveliness_changed_status_.not_alive_count;
02072     -- liveliness_changed_status_.not_alive_count_change;
02073     liveliness_changed = true;
02074   }
02075 
02076   liveliness_changed_status_.last_publication_handle = info.handle_;
02077   instances_liveliness_update(info, ACE_OS::gettimeofday());
02078 
02079   if (liveliness_changed) {
02080     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02081     this->notify_liveliness_change();
02082   }
02083 }
02084 
02085 void
02086 DataReaderImpl::writer_became_alive(WriterInfo& info,
02087     const ACE_Time_Value& /* when */)
02088 {
02089   if (DCPS_debug_level >= 5) {
02090     GuidConverter reader_converter(subscription_id_);
02091     GuidConverter writer_converter(info.writer_id_);
02092     ACE_DEBUG((LM_DEBUG,
02093         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02094         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02095         OPENDDS_STRING(reader_converter).c_str(),
02096         OPENDDS_STRING(writer_converter).c_str(),
02097         info.get_state_str().c_str()));
02098   }
02099 
02100   // caller should already have the samples_lock_ !!!
02101 
02102   // NOTE: each instance will change to ALIVE_STATE when they receive a sample
02103 
02104   bool liveliness_changed = false;
02105 
02106   if (info.state_ != WriterInfo::ALIVE) {
02107     liveliness_changed_status_.alive_count++;
02108     liveliness_changed_status_.alive_count_change++;
02109     liveliness_changed = true;
02110   }
02111 
02112   if (info.state_ == WriterInfo::DEAD) {
02113     liveliness_changed_status_.not_alive_count--;
02114     liveliness_changed_status_.not_alive_count_change--;
02115     liveliness_changed = true;
02116   }
02117 
02118   liveliness_changed_status_.last_publication_handle = info.handle_;
02119 
02120   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02121 
02122   if (liveliness_changed_status_.alive_count < 0) {
02123     ACE_ERROR((LM_ERROR,
02124         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02125         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02126         liveliness_changed_status_.alive_count));
02127     return;
02128   }
02129 
02130   if (liveliness_changed_status_.not_alive_count < 0) {
02131     ACE_ERROR((LM_ERROR,
02132         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02133         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02134         liveliness_changed_status_.not_alive_count));
02135     return;
02136   }
02137 
02138   // Change the state to ALIVE since handle_timeout may call writer_became_dead
02139   // which need the current state info.
02140   info.state_ = WriterInfo::ALIVE;
02141 
02142   if (this->monitor_) {
02143     this->monitor_->report();
02144   }
02145 
02146   // Call listener only when there are liveliness status changes.
02147   if (liveliness_changed) {
02148     // Avoid possible deadlock by releasing sample_lock_.
02149     // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock()
02150     // for information about the locks involved.
02151     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02152     this->notify_liveliness_change();
02153   }
02154 
02155   // this call will start the liveliness timer if it is not already set
02156   liveliness_timer_->check_liveliness();
02157 }
02158 
02159 void
02160 DataReaderImpl::writer_became_dead(WriterInfo & info,
02161     const ACE_Time_Value& when)
02162 {
02163   if (DCPS_debug_level >= 5) {
02164     GuidConverter reader_converter(subscription_id_);
02165     GuidConverter writer_converter(info.writer_id_);
02166     ACE_DEBUG((LM_DEBUG,
02167         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02168         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02169 
02170         OPENDDS_STRING(reader_converter).c_str(),
02171         OPENDDS_STRING(writer_converter).c_str(),
02172         info.get_state_str().c_str()));
02173   }
02174 
02175 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02176   OwnershipManagerPtr owner_manager = this->ownership_manager();
02177   if (owner_manager) {
02178     owner_manager->remove_writer (info.writer_id_);
02179     info.clear_owner_evaluated ();
02180   }
02181 #endif
02182 
02183   // caller should already have the samples_lock_ !!!
02184   bool liveliness_changed = false;
02185 
02186   if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02187     liveliness_changed_status_.not_alive_count++;
02188     liveliness_changed_status_.not_alive_count_change++;
02189     liveliness_changed = true;
02190   }
02191 
02192   if (info.state_ == WriterInfo::ALIVE) {
02193     liveliness_changed_status_.alive_count--;
02194     liveliness_changed_status_.alive_count_change--;
02195     liveliness_changed_status_.not_alive_count++;
02196     liveliness_changed_status_.not_alive_count_change++;
02197     liveliness_changed = true;
02198   }
02199 
02200   liveliness_changed_status_.last_publication_handle = info.handle_;
02201 
02202   //update the state to DEAD.
02203   info.state_ = WriterInfo::DEAD;
02204 
02205   if (this->monitor_) {
02206     this->monitor_->report();
02207   }
02208 
02209   if (liveliness_changed_status_.alive_count < 0) {
02210     ACE_ERROR((LM_ERROR,
02211         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02212         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02213         liveliness_changed_status_.alive_count));
02214     return;
02215   }
02216 
02217   if (liveliness_changed_status_.not_alive_count < 0) {
02218     ACE_ERROR((LM_ERROR,
02219         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02220         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02221         liveliness_changed_status_.not_alive_count));
02222     return;
02223   }
02224 
02225   instances_liveliness_update(info, when);
02226 
02227   // Call listener only when there are liveliness status changes.
02228   if (liveliness_changed) {
02229     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02230     this->notify_liveliness_change();
02231   }
02232 }
02233 
02234 void
02235 DataReaderImpl::instances_liveliness_update(WriterInfo& info,
02236     const ACE_Time_Value& when)
02237 {
02238   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02239   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02240       next = iter; iter != instances_.end(); iter = next) {
02241     ++next;
02242     iter->second->instance_state_.writer_became_dead(
02243         info.writer_id_, liveliness_changed_status_.alive_count, when);
02244   }
02245 }
02246 
02247 void
02248 DataReaderImpl::set_sample_lost_status(
02249     const DDS::SampleLostStatus& status)
02250 {
02251   //!!!caller should have acquired sample_lock_
02252   sample_lost_status_ = status;
02253 }
02254 
02255 void
02256 DataReaderImpl::set_sample_rejected_status(
02257     const DDS::SampleRejectedStatus& status)
02258 {
02259   //!!!caller should have acquired sample_lock_
02260   sample_rejected_status_ = status;
02261 }
02262 
02263 void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
02264                                         SubscriptionInstance_rch&)
02265 {
02266   if (DCPS_debug_level > 0) {
02267     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02268   }
02269 }
02270 
02271 void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
02272 {
02273   StatsMapType::iterator location
02274   = this->statistics_.find(sample.header_.publication_id_);
02275 
02276   if (location != this->statistics_.end()) {
02277     const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02278 
02279     // Only when the user has specified a latency budget or statistics
02280     // are enabled we need to calculate our latency
02281     if ((this->statistics_enabled()) ||
02282         (this->qos_.latency_budget.duration > zero)) {
02283       // This starts as the current time.
02284       ACE_Time_Value latency = ACE_OS::gettimeofday();
02285 
02286       // The time interval starts at the send end.
02287       DDS::Duration_t then = {
02288           sample.header_.source_timestamp_sec_,
02289           sample.header_.source_timestamp_nanosec_
02290       };
02291 
02292       // latency delay in ACE_Time_Value format.
02293       latency -= duration_to_time_value(then);
02294 
02295       if (this->statistics_enabled()) {
02296         location->second.add_stat(latency);
02297       }
02298 
02299       if (DCPS_debug_level > 9) {
02300         ACE_DEBUG((LM_DEBUG,
02301             ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02302             ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02303             latency.sec(),
02304             latency.msec()));
02305       }
02306 
02307       if (this->qos_.latency_budget.duration > zero) {
02308         // Check latency against the budget.
02309         if (time_value_to_duration(latency) > this->qos_.latency_budget.duration) {
02310           this->notify_latency(sample.header_.publication_id_);
02311         }
02312       }
02313     }
02314   } else if (DCPS_debug_level > 0) {
02315     /// NB: This message is generated contemporaneously with a similar
02316     ///     message from writer_activity().  That message is not marked
02317     ///     as an error, so we follow that lead and leave this as an
02318     ///     informational message, guarded by debug level.  This seems
02319     ///     to be due to late samples (samples delivered after an
02320     ///     association has been torn down).  We may want to promote this
02321     ///     to a warning if other conditions causing this symptom are
02322     ///     discovered.
02323     GuidConverter reader_converter(subscription_id_);
02324     GuidConverter writer_converter(sample.header_.publication_id_);
02325     ACE_DEBUG((LM_DEBUG,
02326         ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02327         ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02328         OPENDDS_STRING(reader_converter).c_str(),
02329         OPENDDS_STRING(writer_converter).c_str()));
02330   }
02331 }
02332 
02333 void DataReaderImpl::notify_latency(PublicationId writer)
02334 {
02335   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02336   // is given to this DataReader then narrow() fails.
02337   DataReaderListener_var listener
02338   = DataReaderListener::_narrow(this->listener_.in());
02339 
02340   if (!CORBA::is_nil(listener.in())) {
02341     WriterIdSeq writerIds;
02342     writerIds.length(1);
02343     writerIds[ 0] = writer;
02344 
02345     DDS::InstanceHandleSeq handles;
02346     this->lookup_instance_handles(writerIds, handles);
02347 
02348     if (handles.length() >= 1) {
02349       this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02350 
02351     } else {
02352       this->budget_exceeded_status_.last_instance_handle = -1;
02353     }
02354 
02355     ++this->budget_exceeded_status_.total_count;
02356     ++this->budget_exceeded_status_.total_count_change;
02357 
02358     listener->on_budget_exceeded(this, this->budget_exceeded_status_);
02359 
02360     this->budget_exceeded_status_.total_count_change = 0;
02361   }
02362 }
02363 
02364 #ifndef OPENDDS_SAFETY_PROFILE
02365 void
02366 DataReaderImpl::get_latency_stats(
02367     OpenDDS::DCPS::LatencyStatisticsSeq & stats)
02368 {
02369   stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02370   int index = 0;
02371 
02372   for (StatsMapType::const_iterator current = this->statistics_.begin();
02373       current != this->statistics_.end();
02374       ++current, ++index) {
02375     stats[ index] = current->second.get_stats();
02376     stats[ index].publication = current->first;
02377   }
02378 }
02379 #endif
02380 
02381 void
02382 DataReaderImpl::reset_latency_stats()
02383 {
02384   for (StatsMapType::iterator current = this->statistics_.begin();
02385       current != this->statistics_.end();
02386       ++current) {
02387     current->second.reset_stats();
02388   }
02389 }
02390 
02391 CORBA::Boolean
02392 DataReaderImpl::statistics_enabled()
02393 {
02394   return this->statistics_enabled_;
02395 }
02396 
02397 void
02398 DataReaderImpl::statistics_enabled(
02399     CORBA::Boolean statistics_enabled)
02400 {
02401   this->statistics_enabled_ = statistics_enabled;
02402 }
02403 
02404 void
02405 DataReaderImpl::prepare_to_delete()
02406 {
02407   this->set_deleted(true);
02408   this->stop_associating();
02409   this->send_final_acks();
02410 }
02411 
02412 SubscriptionInstance_rch
02413 DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02414 {
02415   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
02416 
02417   SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02418   if (iter == instances_.end()) {
02419     ACE_DEBUG((LM_WARNING,
02420         ACE_TEXT("(%P|%t) WARNING: ")
02421         ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02422         ACE_TEXT("lookup for 0x%x failed\n"),
02423         handle));
02424     return SubscriptionInstance_rch();
02425   } // if (0 != instances_.find(handle, instance))
02426 
02427   return iter->second;
02428 }
02429 
02430 DDS::InstanceHandle_t
02431 DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
02432 {
02433   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02434   if (!participant)
02435     return 0;
02436 
02437   if (is_bit()) {
02438     Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02439     CORBA::String_var topic = topic_servant_->get_name();
02440 
02441     RepoId id = disc->bit_key_to_repo_id(participant.in(), topic, key);
02442     return participant->id_to_handle(id);
02443 
02444   } else {
02445     return participant->id_to_handle(GUID_UNKNOWN);
02446   }
02447 }
02448 
02449 void
02450 DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
02451 {
02452   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02453 
02454   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02455   // is given to this DataReader then narrow() fails.
02456   DataReaderListener_var the_listener
02457   = DataReaderListener::_narrow(this->listener_.in());
02458 
02459   if (!CORBA::is_nil(the_listener.in())) {
02460     SubscriptionLostStatus status;
02461 
02462     // Since this callback may come after remove_association which removes
02463     // the writer from id_to_handle map, we can ignore this error.
02464     this->lookup_instance_handles(pubids, status.publication_handles);
02465     the_listener->on_subscription_disconnected(this, status);
02466   }
02467 }
02468 
02469 void
02470 DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
02471 {
02472   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02473 
02474   if (!this->is_bit_) {
02475     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02476     // is given to this DataReader then narrow() fails.
02477     DataReaderListener_var the_listener
02478     = DataReaderListener::_narrow(this->listener_.in());
02479 
02480     if (!CORBA::is_nil(the_listener.in())) {
02481       SubscriptionLostStatus status;
02482 
02483       // If it's reconnected then the reader should be in id_to_handle
02484       this->lookup_instance_handles(pubids, status.publication_handles);
02485 
02486       the_listener->on_subscription_reconnected(this,  status);
02487     }
02488   }
02489 }
02490 
02491 void
02492 DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
02493 {
02494   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02495 
02496   if (!this->is_bit_) {
02497     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02498     // is given to this DataReader then narrow() fails.
02499     DataReaderListener_var the_listener
02500     = DataReaderListener::_narrow(this->listener_.in());
02501 
02502     if (!CORBA::is_nil(the_listener.in())) {
02503       SubscriptionLostStatus status;
02504 
02505       CORBA::ULong len = handles.length();
02506       status.publication_handles.length(len);
02507 
02508       for (CORBA::ULong i = 0; i < len; ++ i) {
02509         status.publication_handles[i] = handles[i];
02510       }
02511 
02512       the_listener->on_subscription_lost(this, status);
02513     }
02514   }
02515 }
02516 
02517 void
02518 DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
02519 {
02520   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02521 
02522   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02523   // is given to this DataReader then narrow() fails.
02524   DataReaderListener_var the_listener
02525   = DataReaderListener::_narrow(this->listener_.in());
02526 
02527   if (!CORBA::is_nil(the_listener.in())) {
02528     SubscriptionLostStatus status;
02529 
02530     // Since this callback may come after remove_association which removes
02531     // the writer from id_to_handle map, we can ignore this error.
02532     this->lookup_instance_handles(pubids, status.publication_handles);
02533     the_listener->on_subscription_lost(this, status);
02534   }
02535 }
02536 
02537 
02538 void
02539 DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
02540     DDS::InstanceHandleSeq & hdls)
02541 {
02542   CORBA::ULong const num_wrts = ids.length();
02543 
02544   if (DCPS_debug_level > 9) {
02545     const char* separator = "";
02546     OPENDDS_STRING guids;
02547 
02548     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02549       guids += separator;
02550       guids += OPENDDS_STRING(GuidConverter(ids[i]));
02551       separator = ", ";
02552     }
02553 
02554     ACE_DEBUG((LM_DEBUG,
02555         ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02556         ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02557         guids.c_str()));
02558   }
02559 
02560   hdls.length(num_wrts);
02561 
02562   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563   if (participant) {
02564     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02565       hdls[i] = participant->id_to_handle(ids[i]);
02566     }
02567   }
02568 }
02569 
02570 bool
02571 DataReaderImpl::filter_sample(const DataSampleHeader& header)
02572 {
02573   ACE_Time_Value now(ACE_OS::gettimeofday());
02574 
02575   // Expire historic data if QoS indicates VOLATILE.
02576   if (!always_get_history_ && header.historic_sample_
02577       && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02578     if (DCPS_debug_level >= 8) {
02579       ACE_DEBUG((LM_DEBUG,
02580           ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02581           ACE_TEXT("Discarded historic data.\n")));
02582     }
02583 
02584     return true;  // Data filtered.
02585   }
02586 
02587   // The LIFESPAN_DURATION_FLAG is set when sample data is sent
02588   // with a non-default LIFESPAN duration value.
02589   if (header.lifespan_duration_) {
02590     // Finite lifespan.  Check if data has expired.
02591 
02592     DDS::Time_t const tmp = {
02593         header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02594         header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02595     };
02596 
02597     // We assume that the publisher host's clock and subcriber host's
02598     // clock are synchronized (allowed by the spec).
02599     ACE_Time_Value const expiration_time(
02600         OpenDDS::DCPS::time_to_time_value(tmp));
02601 
02602     if (now >= expiration_time) {
02603       if (DCPS_debug_level >= 8) {
02604         ACE_Time_Value const diff(now - expiration_time);
02605         ACE_DEBUG((LM_DEBUG,
02606             ACE_TEXT("OpenDDS (%P|%t) Received data ")
02607             ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02608             diff.sec(),
02609             diff.usec()));
02610       }
02611 
02612       return true;  // Data filtered.
02613     }
02614   }
02615 
02616   return false;
02617 }
02618 
02619 bool
02620 
02621 DataReaderImpl::ownership_filter_instance(const SubscriptionInstance_rch& instance,
02622   const PublicationId& pubid)
02623 {
02624 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02625   if (this->is_exclusive_ownership_) {
02626 
02627     WriterMapType::iterator iter = writers_.find(pubid);
02628 
02629     if (iter == writers_.end()) {
02630       if (DCPS_debug_level > 4) {
02631         // This may not be an error since it could happen that the sample
02632         // is delivered to the datareader after the write is dis-associated
02633         // with this datareader.
02634         GuidConverter reader_converter(subscription_id_);
02635         GuidConverter writer_converter(pubid);
02636         ACE_DEBUG((LM_DEBUG,
02637                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02638                    ACE_TEXT("reader %C is not associated with writer %C.\n"),
02639                    OPENDDS_STRING(reader_converter).c_str(),
02640                    OPENDDS_STRING(writer_converter).c_str()));
02641       }
02642       return true;
02643     }
02644 
02645 
02646     // Evaulate the owner of the instance if not selected and filter
02647     // current message if it's not from owner writer.
02648     if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02649         || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02650       OwnershipManagerPtr owner_manager = this->ownership_manager();
02651 
02652       bool is_owner = owner_manager && owner_manager->select_owner (
02653         instance->instance_handle_,
02654         iter->second->writer_id_,
02655         iter->second->writer_qos_.ownership_strength.value,
02656         &instance->instance_state_);
02657       iter->second->set_owner_evaluated (instance->instance_handle_, true);
02658 
02659       if (! is_owner) {
02660         if (DCPS_debug_level >= 1) {
02661           GuidConverter reader_converter(subscription_id_);
02662           GuidConverter writer_converter(pubid);
02663           GuidConverter owner_converter (instance->instance_state_.get_owner ());
02664           ACE_DEBUG((LM_DEBUG,
02665                      ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02666                      ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02667                      OPENDDS_STRING(reader_converter).c_str(),
02668                      OPENDDS_STRING(writer_converter).c_str(),
02669                      OPENDDS_STRING(owner_converter).c_str()));
02670         }
02671         return true;
02672       }
02673     }
02674     else if (! (instance->instance_state_.get_owner () == pubid)) {
02675       if (DCPS_debug_level >= 1) {
02676         GuidConverter reader_converter(subscription_id_);
02677         GuidConverter writer_converter(pubid);
02678         GuidConverter owner_converter (instance->instance_state_.get_owner ());
02679         ACE_DEBUG((LM_DEBUG,
02680                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02681                    ACE_TEXT("reader %C writer %C is not owner %C\n"),
02682                    OPENDDS_STRING(reader_converter).c_str(),
02683                    OPENDDS_STRING(writer_converter).c_str(),
02684                    OPENDDS_STRING(owner_converter).c_str()));
02685       }
02686       return true;
02687     }
02688   }
02689 #else
02690   ACE_UNUSED_ARG(pubid);
02691   ACE_UNUSED_ARG(instance);
02692 #endif
02693   return false;
02694 }
02695 
02696 bool
02697 DataReaderImpl::time_based_filter_instance(const SubscriptionInstance_rch& instance, ACE_Time_Value& filter_time_expired)
02698 {
02699   ACE_Time_Value now(ACE_OS::gettimeofday());
02700 
02701   // TIME_BASED_FILTER processing; expire data samples
02702   // if minimum separation is not met for instance.
02703   const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02704 
02705   if (qos_.time_based_filter.minimum_separation > zero) {
02706     filter_time_expired = now - instance->last_accepted_;
02707     DDS::Duration_t separation = time_value_to_duration(filter_time_expired);
02708 
02709     if (separation < qos_.time_based_filter.minimum_separation) {
02710       return true;  // Data filtered.
02711     }
02712   }
02713 
02714   instance->last_accepted_ = now;
02715 
02716   return false;
02717 }
02718 
02719 bool DataReaderImpl::is_bit() const
02720 {
02721   return this->is_bit_;
02722 }
02723 
02724 bool
02725 DataReaderImpl::has_zero_copies()
02726 {
02727   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02728       guard,
02729       this->sample_lock_,
02730       true /* assume we have loans */);
02731   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
02732 
02733   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02734       iter != instances_.end();
02735       ++iter) {
02736     SubscriptionInstance_rch ptr = iter->second;
02737 
02738     for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02739         item != 0; item = item->next_data_sample_) {
02740       if (item->zero_copy_cnt_ > 0) {
02741         return true;
02742       }
02743     }
02744   }
02745 
02746   return false;
02747 }
02748 
02749 void DataReaderImpl::notify_liveliness_change()
02750 {
02751   // N.B. writers_lock_ should already be acquired when
02752   //      this method is called.
02753 
02754   DDS::DataReaderListener_var listener
02755   = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02756 
02757   if (!CORBA::is_nil(listener.in())) {
02758     listener->on_liveliness_changed(this, liveliness_changed_status_);
02759 
02760     liveliness_changed_status_.alive_count_change = 0;
02761     liveliness_changed_status_.not_alive_count_change = 0;
02762   }
02763   notify_status_condition();
02764 
02765   if (DCPS_debug_level > 9) {
02766     OPENDDS_STRING output_str;
02767     output_str += "subscription ";
02768     output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02769     output_str += ", listener at: 0x";
02770     output_str += to_dds_string(this->listener_.in ());
02771 
02772     for (WriterMapType::iterator current = this->writers_.begin();
02773         current != this->writers_.end();
02774         ++current) {
02775       RepoId id = current->first;
02776       output_str += "\n\tNOTIFY: writer[ ";
02777       output_str += OPENDDS_STRING(GuidConverter(id));
02778       output_str += "] == ";
02779       output_str += current->second->get_state_str();
02780     }
02781 
02782     output_str + "\n";
02783     ACE_DEBUG((LM_DEBUG,
02784         ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02785         ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02786         ACE_TEXT("\tNOTIFY: %C\n"),
02787         listener.in (),
02788         listener_mask_,
02789         output_str.c_str()));
02790   }
02791 }
02792 
02793 void DataReaderImpl::post_read_or_take()
02794 {
02795   set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02796   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02797   if (subscriber)
02798     subscriber->set_status_changed_flag(
02799       DDS::DATA_ON_READERS_STATUS, false);
02800 }
02801 
02802 void DataReaderImpl::reschedule_deadline()
02803 {
02804   if (this->watchdog_.in()) {
02805     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02806     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02807         iter != this->instances_.end();
02808         ++iter) {
02809       if (iter->second->deadline_timer_id_ != -1) {
02810         if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02811           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02812               ACE_TEXT("reset_timer_interval")));
02813         }
02814       }
02815     }
02816   }
02817 }
02818 
02819 ACE_Reactor_Timer_Interface*
02820 DataReaderImpl::get_reactor()
02821 {
02822   return this->reactor_;
02823 }
02824 
02825 OpenDDS::DCPS::RepoId
02826 DataReaderImpl::get_topic_id()
02827 {
02828   return this->topic_servant_->get_id();
02829 }
02830 
02831 OpenDDS::DCPS::RepoId
02832 DataReaderImpl::get_dp_id()
02833 {
02834   return dp_id_;
02835 }
02836 
02837 void
02838 DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02839 {
02840   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02841   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02842 
02843   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02844       end = instances_.end(); iter != end; ++iter) {
02845     instance_handles.push_back(iter->first);
02846   }
02847 }
02848 
02849 void
02850 DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
02851 {
02852   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02853       read_guard,
02854       this->writers_lock_);
02855   for (WriterMapType::iterator iter = writers_.begin();
02856       iter != writers_.end();
02857       ++iter) {
02858     writer_states.push_back(WriterStatePair(iter->first,
02859         iter->second->get_state()));
02860   }
02861 }
02862 
02863 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02864 void
02865 DataReaderImpl::update_ownership_strength (const PublicationId& pub_id,
02866     const CORBA::Long& ownership_strength)
02867 {
02868   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02869       read_guard,
02870       this->writers_lock_);
02871   for (WriterMapType::iterator iter = writers_.begin();
02872       iter != writers_.end();
02873       ++iter) {
02874     if (iter->second->writer_id_ == pub_id) {
02875       if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
02876         if (DCPS_debug_level >= 1) {
02877           GuidConverter reader_converter(this->subscription_id_);
02878           GuidConverter writer_converter(pub_id);
02879           ACE_DEBUG((LM_DEBUG,
02880               ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
02881               ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
02882               OPENDDS_STRING(reader_converter).c_str(),
02883               OPENDDS_STRING(writer_converter).c_str(),
02884               iter->second->writer_qos_.ownership_strength, ownership_strength));
02885         }
02886         iter->second->writer_qos_.ownership_strength.value = ownership_strength;
02887         iter->second->clear_owner_evaluated ();
02888       }
02889       break;
02890     }
02891   }
02892 }
02893 #endif
02894 
02895 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02896 bool DataReaderImpl::verify_coherent_changes_completion (WriterInfo* writer)
02897 {
02898   if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
02899       || ! this->subqos_.presentation.coherent_access) {
02900     this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02901     this->coherent_changes_completed (this);
02902     return true;
02903   }
02904 
02905   // verify current coherent changes from single writer
02906   Coherent_State state = writer->coherent_change_received();
02907   if (writer->group_coherent_) { // GROUP coherent
02908     RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02909     if (subscriber && state != NOT_COMPLETED_YET) {
02910       // verify if all readers received complete coherent changes in a group.
02911       subscriber->coherent_change_received (
02912           writer->publisher_id_, this, state);
02913     }
02914   }
02915   else {  // TOPIC coherent
02916     if (state == COMPLETED) {
02917       this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02918     }
02919     else if (state == REJECTED) {
02920       this->reject_coherent (writer->writer_id_, writer->publisher_id_);
02921     }
02922     else {// NOT_COMPLETED
02923       return false;
02924     }
02925 
02926     // decision made: either COMPLETED or REJECTED
02927     writer->reset_coherent_info ();
02928   }
02929 
02930   return state == COMPLETED;
02931 }
02932 
02933 
02934 void DataReaderImpl::accept_coherent (PublicationId& writer_id,
02935     RepoId& publisher_id)
02936 {
02937   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02938     GuidConverter reader (this->subscription_id_);
02939     GuidConverter writer (writer_id);
02940     GuidConverter publisher (publisher_id);
02941     ACE_DEBUG((LM_DEBUG,
02942         ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
02943         ACE_TEXT(" reader %C writer %C publisher %C \n"),
02944         OPENDDS_STRING(reader).c_str(),
02945         OPENDDS_STRING(writer).c_str(),
02946         OPENDDS_STRING(publisher).c_str()));
02947   }
02948 
02949   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02950   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02951 
02952   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02953       iter != this->instances_.end(); ++iter) {
02954     iter->second->rcvd_strategy_->accept_coherent(
02955         writer_id, publisher_id);
02956   }
02957 }
02958 
02959 
02960 void DataReaderImpl::reject_coherent (PublicationId& writer_id,
02961     RepoId& publisher_id)
02962 {
02963   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02964     GuidConverter reader (this->subscription_id_);
02965     GuidConverter writer (writer_id);
02966     GuidConverter publisher (publisher_id);
02967     ACE_DEBUG((LM_DEBUG,
02968         ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
02969         ACE_TEXT(" reader %C writer %C publisher %C \n"),
02970         OPENDDS_STRING(reader).c_str(),
02971         OPENDDS_STRING(writer).c_str(),
02972         OPENDDS_STRING(publisher).c_str()));
02973   }
02974 
02975   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02976   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02977 
02978   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02979       iter != this->instances_.end(); ++iter) {
02980     iter->second->rcvd_strategy_->reject_coherent(
02981         writer_id, publisher_id);
02982   }
02983   this->reset_coherent_info (writer_id, publisher_id);
02984 }
02985 
02986 
02987 void DataReaderImpl::reset_coherent_info (const PublicationId& writer_id,
02988     const RepoId& publisher_id)
02989 {
02990   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
02991 
02992   WriterMapType::iterator itEnd = this->writers_.end();
02993   for (WriterMapType::iterator it = this->writers_.begin();
02994       it != itEnd; ++it) {
02995     if (it->second->writer_id_ == writer_id
02996         && it->second->publisher_id_ == publisher_id) {
02997       it->second->reset_coherent_info();
02998     }
02999   }
03000 }
03001 
03002 
03003 void
03004 DataReaderImpl::coherent_change_received (RepoId publisher_id, Coherent_State& result)
03005 {
03006   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03007 
03008   result = COMPLETED;
03009   for (WriterMapType::iterator iter = writers_.begin();
03010       iter != writers_.end();
03011       ++iter) {
03012 
03013     if (iter->second->publisher_id_ == publisher_id) {
03014       const Coherent_State state = iter->second->coherent_change_received();
03015       if (state == NOT_COMPLETED_YET) {
03016         result = NOT_COMPLETED_YET;
03017         break;
03018       }
03019       else if (state == REJECTED) {
03020         result = REJECTED;
03021       }
03022     }
03023   }
03024 }
03025 
03026 
03027 void
03028 DataReaderImpl::coherent_changes_completed (DataReaderImpl* reader)
03029 {
03030   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
03031   if (!subscriber)
03032     return;
03033 
03034   subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03035   this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03036 
03037   ::DDS::SubscriberListener_var sub_listener =
03038       subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
03039   if (!CORBA::is_nil(sub_listener.in()))
03040   {
03041     if (reader == this) {
03042       // Release the sample_lock before listener callback.
03043       ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03044       sub_listener->on_data_on_readers(subscriber.in());
03045     }
03046 
03047     this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03048     subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03049   }
03050   else
03051   {
03052     subscriber->notify_status_condition();
03053 
03054     ::DDS::DataReaderListener_var listener =
03055         this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03056 
03057     if (!CORBA::is_nil(listener.in()))
03058     {
03059       if (reader == this) {
03060         // Release the sample_lock before listener callback.
03061         ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03062         listener->on_data_available(this);
03063       } else {
03064         listener->on_data_available(this);
03065       }
03066 
03067       set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03068       subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03069     }
03070     else
03071     {
03072       this->notify_status_condition();
03073     }
03074   }
03075 }
03076 
03077 
03078 void DataReaderImpl::begin_access()
03079 {
03080   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03081   this->coherent_ = true;
03082 }
03083 
03084 
03085 void DataReaderImpl::end_access()
03086 {
03087   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03088   this->coherent_ = false;
03089   this->group_coherent_ordered_data_.reset();
03090   this->post_read_or_take();
03091 }
03092 
03093 
03094 void DataReaderImpl::get_ordered_data (GroupRakeData& data,
03095     DDS::SampleStateMask sample_states,
03096     DDS::ViewStateMask view_states,
03097     DDS::InstanceStateMask instance_states)
03098 {
03099   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03100   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03101 
03102   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03103       iter != instances_.end(); ++iter) {
03104     SubscriptionInstance_rch ptr = iter->second;
03105     if ((ptr->instance_state_.view_state() & view_states) &&
03106         (ptr->instance_state_.instance_state() & instance_states)) {
03107       size_t i(0);
03108       for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03109           item != 0; item = item->next_data_sample_) {
03110         if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03111           data.insert_sample(item, ptr, ++i);
03112           this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03113         }
03114       }
03115     }
03116   }
03117 }
03118 
03119 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
03120 
03121 void
03122 DataReaderImpl::set_subscriber_qos(
03123     const DDS::SubscriberQos & qos)
03124 {
03125   this->subqos_ = qos;
03126 }
03127 
03128 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
03129 void
03130 DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
03131 {
03132   cft->add_reader(*this);
03133   content_filtered_topic_ = cft;
03134 }
03135 
03136 DDS::ContentFilteredTopic_ptr
03137 DataReaderImpl::get_cf_topic() const
03138 {
03139   return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
03140 }
03141 #endif
03142 
03143 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
03144 
03145 void
03146 DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
03147 {
03148   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03149   disco->update_subscription_params(domain_id_,
03150       dp_id_,
03151       subscription_id_,
03152       params);
03153 }
03154 #endif
03155 
03156 void
03157 DataReaderImpl::reset_ownership (::DDS::InstanceHandle_t instance)
03158 {
03159   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03160   for (WriterMapType::iterator iter = writers_.begin();
03161       iter != writers_.end();
03162       ++iter) {
03163     iter->second->set_owner_evaluated(instance, false);
03164   }
03165 }
03166 
03167 void
03168 DataReaderImpl::resume_sample_processing(const PublicationId& pub_id)
03169 {
03170   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03171   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03172   WriterMapType::iterator where = writers_.find(pub_id);
03173   if (writers_.end() != where) {
03174     WriterInfo& info = *where->second;
03175     // Stop filtering these
03176     if (info.waiting_for_end_historic_samples_) {
03177       end_historic_sweeper_->cancel_timer(where->second);
03178       if (!info.historic_samples_.empty()) {
03179         info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03180       }
03181       to_deliver.swap(info.historic_samples_);
03182       write_guard.release();
03183       deliver_historic(to_deliver);
03184     }
03185   }
03186 }
03187 
03188 bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
03189 {
03190   ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03191   WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03192   if (iter != writers_.end()) {
03193     const SequenceNumber& seq = sample.header_.sequence_;
03194     if (iter->second->waiting_for_end_historic_samples_) {
03195       iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03196       return false;
03197     }
03198     if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03199         && !sample.header_.historic_sample_
03200         && seq <= iter->second->last_historic_seq_) {
03201       // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
03202       return false;
03203     }
03204   }
03205   return true;
03206 }
03207 
03208 void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
03209 {
03210   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03211   const iter_t end = samples.end();
03212   for (iter_t iter = samples.begin(); iter != end; ++iter) {
03213     iter->second.header_.historic_sample_ = true;
03214     data_received(iter->second);
03215   }
03216 }
03217 
03218 void
03219 DataReaderImpl::add_link(const DataLink_rch& link, const RepoId& peer)
03220 {
03221   if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03222 
03223     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03224 
03225     WriterMapType::iterator it = writers_.find(peer);
03226     if (it != writers_.end()) {
03227       // Schedule timer if necessary
03228       //   - only need to check reader qos - we know the writer must be >= reader
03229       end_historic_sweeper_->schedule_timer(it->second);
03230     }
03231   }
03232   TransportClient::add_link(link, peer);
03233   TransportImpl& impl = link->impl();
03234   OPENDDS_STRING type = impl.transport_type();
03235 
03236   if (type == "rtps_udp" || type == "multicast") {
03237     resume_sample_processing(peer);
03238   }
03239 }
03240 
03241 void
03242 DataReaderImpl::register_for_writer(const RepoId& participant,
03243                                     const RepoId& readerid,
03244                                     const RepoId& writerid,
03245                                     const TransportLocatorSeq& locators,
03246                                     DiscoveryListener* listener)
03247 {
03248   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03249 }
03250 
03251 void
03252 DataReaderImpl::unregister_for_writer(const RepoId& participant,
03253                                       const RepoId& readerid,
03254                                       const RepoId& writerid)
03255 {
03256   TransportClient::unregister_for_writer(participant, readerid, writerid);
03257 }
03258 
03259 void DataReaderImpl::accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance)
03260 {
03261   bool accepted = true;
03262 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03263   bool verify_coherent = false;
03264 #endif
03265   RcHandle<WriterInfo> writer;
03266 
03267   if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
03268     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
03269 
03270     WriterMapType::iterator where = writers_.find(header.publication_id_);
03271 
03272     if (where != writers_.end()) {
03273       if (header.coherent_change_) {
03274 
03275 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03276         // Received coherent change
03277         where->second->group_coherent_ = header.group_coherent_;
03278         where->second->publisher_id_ = header.publisher_id_;
03279         ++where->second->coherent_samples_;
03280         verify_coherent = true;
03281 #endif
03282         writer = where->second;
03283       }
03284     }
03285     else {
03286       GuidConverter subscriptionBuffer(subscription_id_);
03287       GuidConverter publicationBuffer(header.publication_id_);
03288       ACE_DEBUG((LM_WARNING,
03289         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
03290         ACE_TEXT("subscription %C failed to find ")
03291         ACE_TEXT("publication data for %C.\n"),
03292         OPENDDS_STRING(subscriptionBuffer).c_str(),
03293         OPENDDS_STRING(publicationBuffer).c_str()));
03294     }
03295   }
03296 
03297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03298   if (verify_coherent) {
03299     accepted = verify_coherent_changes_completion(writer.in());
03300   }
03301 #endif
03302 
03303   if (instance && watchdog_.in()) {
03304     instance->last_sample_tv_ = instance->cur_sample_tv_;
03305     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
03306 
03307     // Watchdog can't be called with sample_lock_ due to reactor deadlock
03308     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03309     if (is_new_instance) {
03310       watchdog_->schedule_timer(instance);
03311     } else {
03312       watchdog_->execute(instance, false);
03313     }
03314   }
03315 
03316   if (accepted) {
03317     notify_read_conditions();
03318   }
03319 }
03320 
03321 #if defined(OPENDDS_SECURITY)
03322 DDS::Security::ParticipantCryptoHandle DataReaderImpl::get_crypto_handle() const
03323 {
03324   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
03325   return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
03326 }
03327 #endif
03328 
03329 EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
03330                                                                  ACE_thread_t owner,
03331                                                                  DataReaderImpl* reader)
03332   : ReactorInterceptor (reactor, owner)
03333   , reader_(*reader)
03334 { }
03335 
03336 EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
03337 { }
03338 
03339 void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03340 {
03341   info->waiting_for_end_historic_samples_ = true;
03342   ScheduleCommand c(this, info);
03343   execute_or_enqueue(c);
03344 }
03345 
03346 void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03347 {
03348   info->waiting_for_end_historic_samples_ = false;
03349   CancelCommand c(this, info);
03350   execute_or_enqueue(c);
03351 }
03352 
03353 int EndHistoricSamplesMissedSweeper::handle_timeout(
03354     const ACE_Time_Value& ,
03355     const void* arg)
03356 {
03357 
03358   WriterInfo* const info =
03359     const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
03360   const PublicationId pub_id = info->writer_id_;
03361 
03362   {
03363     ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
03364     info_set_.erase(rchandle_from(info));
03365   }
03366 
03367   RcHandle<DataReaderImpl> reader = reader_.lock();
03368   if (!reader)
03369     return 0;
03370 
03371   if (DCPS_debug_level >= 1) {
03372     GuidConverter sub_repo(reader->get_repo_id());
03373     GuidConverter pub_repo(pub_id);
03374     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
03375                OPENDDS_STRING(sub_repo).c_str(),
03376                OPENDDS_STRING(pub_repo).c_str()));
03377   }
03378 
03379   reader->resume_sample_processing(pub_id);
03380   return 0;
03381 }
03382 
03383 void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
03384 {
03385   static const ACE_Time_Value ten_seconds(10);
03386 
03387   const void* arg = reinterpret_cast<const void*>(info_.in());
03388 
03389   info_->historic_samples_timer_ = sweeper_->reactor()->schedule_timer(sweeper_,
03390                                                                        arg,
03391                                                                        ten_seconds);
03392   sweeper_->info_set_.insert(info_);
03393   if (DCPS_debug_level) {
03394     ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", info_->historic_samples_timer_));
03395   }
03396 }
03397 
03398 void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
03399 {
03400   if (info_->historic_samples_timer_ != WriterInfo::NO_TIMER) {
03401     sweeper_->reactor()->cancel_timer(info_->historic_samples_timer_);
03402     if (DCPS_debug_level) {
03403       ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", info_->historic_samples_timer_));
03404     }
03405     info_->historic_samples_timer_ = WriterInfo::NO_TIMER;
03406     sweeper_->info_set_.erase(info_);
03407   }
03408 }
03409 
03410 } // namespace DCPS
03411 } // namespace OpenDDS
03412 
03413 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1