00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataReaderImpl.h"
00010 #include "tao/ORB_Core.h"
00011 #include "SubscriptionInstance.h"
00012 #include "ReceivedDataElementList.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "FeatureDisabledQosCheck.h"
00017 #include "GuidConverter.h"
00018 #include "TopicImpl.h"
00019 #include "Serializer.h"
00020 #include "SubscriberImpl.h"
00021 #include "Transient_Kludge.h"
00022 #include "Util.h"
00023 #include "RequestedDeadlineWatchdog.h"
00024 #include "QueryConditionImpl.h"
00025 #include "ReadConditionImpl.h"
00026 #include "MonitorFactory.h"
00027 #include "dds/DCPS/transport/framework/EntryExit.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 #include "dds/DdsDcpsCoreC.h"
00030 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00031 #include "dds/DCPS/SafetyProfileStreams.h"
00032 #if !defined (DDS_HAS_MINIMUM_BIT)
00033 #include "BuiltInTopicUtils.h"
00034 #include "dds/DdsDcpsCoreTypeSupportC.h"
00035 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00036
00037 #include "ace/Reactor.h"
00038 #include "ace/Auto_Ptr.h"
00039 #include "ace/OS_NS_sys_time.h"
00040
00041 #include <cstdio>
00042 #include <stdexcept>
00043
00044 #if !defined (__ACE_INLINE__)
00045 # include "DataReaderImpl.inl"
00046 #endif
00047
00048 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00049
00050 namespace OpenDDS {
00051 namespace DCPS {
00052
00053 DataReaderImpl::DataReaderImpl()
00054 : qos_(TheServiceParticipant->initial_DataReaderQos()),
00055 reverse_sample_lock_(sample_lock_),
00056 topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058 is_exclusive_ownership_ (false),
00059 #endif
00060 coherent_(false),
00061 subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00062 topic_desc_(0),
00063 listener_mask_(DEFAULT_STATUS_MASK),
00064 domain_id_(0),
00065 end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00066 remove_association_sweeper_(make_rch<RemoveAssociationSweeper<DataReaderImpl> >(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00067 n_chunks_(TheServiceParticipant->n_chunks()),
00068 reverse_pub_handle_lock_(publication_handle_lock_),
00069 reactor_(0),
00070 liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071 last_deadline_missed_total_count_(0),
00072 is_bit_(false),
00073 always_get_history_(false),
00074 statistics_enabled_(false),
00075 raw_latency_buffer_size_(0),
00076 raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00077 monitor_(0),
00078 periodic_monitor_(0),
00079 transport_disabled_(false)
00080 {
00081 reactor_ = TheServiceParticipant->timer();
00082
00083 liveliness_changed_status_.alive_count = 0;
00084 liveliness_changed_status_.not_alive_count = 0;
00085 liveliness_changed_status_.alive_count_change = 0;
00086 liveliness_changed_status_.not_alive_count_change = 0;
00087 liveliness_changed_status_.last_publication_handle =
00088 DDS::HANDLE_NIL;
00089
00090 requested_deadline_missed_status_.total_count = 0;
00091 requested_deadline_missed_status_.total_count_change = 0;
00092 requested_deadline_missed_status_.last_instance_handle =
00093 DDS::HANDLE_NIL;
00094
00095 requested_incompatible_qos_status_.total_count = 0;
00096 requested_incompatible_qos_status_.total_count_change = 0;
00097 requested_incompatible_qos_status_.last_policy_id = 0;
00098 requested_incompatible_qos_status_.policies.length(0);
00099
00100 subscription_match_status_.total_count = 0;
00101 subscription_match_status_.total_count_change = 0;
00102 subscription_match_status_.current_count = 0;
00103 subscription_match_status_.current_count_change = 0;
00104 subscription_match_status_.last_publication_handle =
00105 DDS::HANDLE_NIL;
00106
00107 sample_lost_status_.total_count = 0;
00108 sample_lost_status_.total_count_change = 0;
00109
00110 sample_rejected_status_.total_count = 0;
00111 sample_rejected_status_.total_count_change = 0;
00112 sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00113 sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00114
00115 this->budget_exceeded_status_.total_count = 0;
00116 this->budget_exceeded_status_.total_count_change = 0;
00117 this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00118
00119 monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00120 periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00121 }
00122
00123
00124
00125 DataReaderImpl::~DataReaderImpl()
00126 {
00127 DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00128
00129 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00130 OwnershipManagerPtr owner_manager = this->ownership_manager();
00131 if (owner_manager) {
00132 owner_manager->unregister_reader(topic_servant_->type_name(), this);
00133 }
00134 #endif
00135
00136 }
00137
00138
00139 void
00140 DataReaderImpl::cleanup()
00141 {
00142
00143
00144
00145 set_listener(0, NO_STATUS_MASK);
00146
00147
00148 }
00149
00150 void DataReaderImpl::init(
00151 TopicDescriptionImpl* a_topic_desc,
00152 const DDS::DataReaderQos &qos,
00153 DDS::DataReaderListener_ptr a_listener,
00154 const DDS::StatusMask & mask,
00155 DomainParticipantImpl* participant,
00156 SubscriberImpl* subscriber)
00157 {
00158 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00159 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00160 topic_servant_ = a_topic;
00161 }
00162
00163 CORBA::String_var topic_name = a_topic_desc->get_name();
00164
00165 #if !defined (DDS_HAS_MINIMUM_BIT)
00166 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00167 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00168 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00169 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00170 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00171
00172 qos_ = qos;
00173
00174 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00175 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00176 #endif
00177
00178 listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00179 listener_mask_ = mask;
00180
00181
00182
00183 participant_servant_ = *participant;
00184
00185 domain_id_ = participant->get_domain_id();
00186
00187 subscriber_servant_ = *subscriber;
00188
00189 if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00190 ACE_DEBUG((LM_WARNING,
00191 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00192 ACE_TEXT("failed to get SubscriberQos\n")));
00193 }
00194 }
00195
00196 DDS::InstanceHandle_t
00197 DataReaderImpl::get_instance_handle()
00198 {
00199 using namespace OpenDDS::DCPS;
00200 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00201 if (participant)
00202 return participant->id_to_handle(subscription_id_);
00203 return 0;
00204 }
00205
00206 void
00207 DataReaderImpl::add_association(const RepoId& yourId,
00208 const WriterAssociation& writer,
00209 bool active)
00210 {
00211 if (DCPS_debug_level) {
00212 GuidConverter reader_converter(yourId);
00213 GuidConverter writer_converter(writer.writerId);
00214 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00215 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00216 OPENDDS_STRING(reader_converter).c_str(),
00217 OPENDDS_STRING(writer_converter).c_str()));
00218 }
00219
00220 if (entity_deleted_.value()) {
00221 if (DCPS_debug_level) {
00222 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00223 ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00224 }
00225 return;
00226 }
00227
00228
00229
00230
00231 if (GUID_UNKNOWN == subscription_id_) {
00232 subscription_id_ = yourId;
00233 }
00234
00235
00236 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00237
00238
00239
00240
00241
00242
00243 {
00244
00245 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00246
00247 const PublicationId& writer_id = writer.writerId;
00248 RcHandle<WriterInfo> info = make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos);
00249 std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00250
00251 WriterMapType::value_type(
00252 writer_id,
00253 info));
00254
00255
00256
00257 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00258 info->waiting_for_end_historic_samples_ = true;
00259 }
00260
00261 this->statistics_.insert(
00262 StatsMapType::value_type(
00263 writer_id,
00264 WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00265
00266
00267 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00268
00269 }
00270
00271 if (DCPS_debug_level > 4) {
00272 GuidConverter converter(writer_id);
00273 ACE_DEBUG((LM_DEBUG,
00274 "(%P|%t) DataReaderImpl::add_association: "
00275 "inserted writer %C.return %d \n",
00276 OPENDDS_STRING(converter).c_str(), bpair.second));
00277
00278 WriterMapType::iterator iter = writers_.find(writer_id);
00279 if (iter != writers_.end()) {
00280
00281
00282
00283 GuidConverter reader_converter(subscription_id_);
00284 GuidConverter writer_converter(writer_id);
00285 ACE_DEBUG((LM_DEBUG,
00286 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00287 ACE_TEXT("reader %C is associated with writer %C.\n"),
00288 OPENDDS_STRING(reader_converter).c_str(),
00289 OPENDDS_STRING(writer_converter).c_str()));
00290 }
00291 }
00292 }
00293
00294
00295
00296
00297
00298 AssociationData data;
00299 data.remote_id_ = writer.writerId;
00300 data.remote_data_ = writer.writerTransInfo;
00301 data.publication_transport_priority_ =
00302 writer.writerQos.transport_priority.value;
00303 data.remote_reliable_ =
00304 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00305 data.remote_durable_ =
00306 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00307
00308
00309
00310
00311
00312 guard.release();
00313
00314 if (!associate(data, active)) {
00315 if (DCPS_debug_level) {
00316 ACE_ERROR((LM_ERROR,
00317 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00318 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00319 }
00320 }
00321 }
00322
00323 void
00324 DataReaderImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00325 {
00326 if (!(flags & ASSOC_OK)) {
00327 if (DCPS_debug_level) {
00328 const GuidConverter conv(remote_id);
00329 ACE_ERROR((LM_ERROR,
00330 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00331 ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00332 OPENDDS_STRING(conv).c_str()));
00333 }
00334 return;
00335 }
00336
00337 const bool active = flags & ASSOC_ACTIVE;
00338 {
00339
00340 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00341
00342
00343 if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00344 if (DCPS_debug_level >= 5) {
00345 GuidConverter converter(subscription_id_);
00346 ACE_DEBUG((LM_DEBUG,
00347 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00348 ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00349 OPENDDS_STRING(converter).c_str()));
00350 }
00351
00352 liveliness_timer_->check_liveliness();
00353 }
00354 }
00355
00356
00357 if (!is_bit_) {
00358
00359 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00360
00361 if (!participant)
00362 return;
00363
00364 DDS::InstanceHandle_t handle = participant->id_to_handle(remote_id);
00365
00366
00367
00368 {
00369 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00370
00371
00372 id_to_handle_map_.insert(
00373 RepoIdToHandleMap::value_type(remote_id, handle));
00374
00375 if (DCPS_debug_level > 4) {
00376 GuidConverter converter(remote_id);
00377 ACE_DEBUG((LM_DEBUG,
00378 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00379 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00380 OPENDDS_STRING(converter).c_str(),
00381 handle));
00382 }
00383
00384
00385
00386
00387 const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00388 subscription_match_status_.current_count_change =
00389 matchedPublications - subscription_match_status_.current_count;
00390 subscription_match_status_.current_count = matchedPublications;
00391
00392 ++subscription_match_status_.total_count;
00393 ++subscription_match_status_.total_count_change;
00394
00395 subscription_match_status_.last_publication_handle = handle;
00396
00397 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00398
00399 DDS::DataReaderListener_var listener =
00400 listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00401
00402 if (!CORBA::is_nil(listener)) {
00403 listener->on_subscription_matched(this, subscription_match_status_);
00404
00405
00406
00407
00408
00409 subscription_match_status_.total_count_change = 0;
00410 subscription_match_status_.current_count_change = 0;
00411 }
00412
00413 notify_status_condition();
00414 }
00415
00416 {
00417 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00418 ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00419
00420 if(!writers_.count(remote_id)){
00421 return;
00422 }
00423 writers_[remote_id]->handle_ = handle;
00424 }
00425 }
00426
00427 if (!active) {
00428 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00429
00430 disco->association_complete(domain_id_, dp_id_,
00431 subscription_id_, remote_id);
00432 }
00433
00434 if (monitor_) {
00435 monitor_->report();
00436 }
00437 }
00438
00439 void
00440 DataReaderImpl::association_complete(const RepoId& )
00441 {
00442
00443
00444 }
00445
00446 void
00447 DataReaderImpl::remove_associations(const WriterIdSeq& writers,
00448 bool notify_lost)
00449 {
00450 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00451
00452 if (writers.length() == 0) {
00453 return;
00454 }
00455
00456 if (DCPS_debug_level >= 1) {
00457 GuidConverter reader_converter(subscription_id_);
00458 GuidConverter writer_converter(writers[0]);
00459 ACE_DEBUG((LM_DEBUG,
00460 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00461 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00462 is_bit_,
00463 OPENDDS_STRING(reader_converter).c_str(),
00464 OPENDDS_STRING(writer_converter).c_str(),
00465 writers.length()));
00466 }
00467 if (!this->entity_deleted_.value()) {
00468
00469 this->stop_associating(writers.get_buffer(), writers.length());
00470
00471
00472
00473 WriterIdSeq non_active_writers;
00474 {
00475 CORBA::ULong wr_len = writers.length();
00476 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00477
00478 for (CORBA::ULong i = 0; i < wr_len; i++) {
00479 PublicationId writer_id = writers[i];
00480
00481 WriterMapType::iterator it = this->writers_.find(writer_id);
00482 if (it != this->writers_.end() &&
00483 it->second->active()) {
00484 remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00485 } else {
00486 push_back(non_active_writers, writer_id);
00487 }
00488 }
00489 }
00490 remove_associations_i(non_active_writers, notify_lost);
00491 } else {
00492 remove_associations_i(writers, notify_lost);
00493 }
00494 }
00495
00496 void
00497 DataReaderImpl::remove_publication(const PublicationId& pub_id)
00498 {
00499 WriterIdSeq writers;
00500 bool notify = false;
00501 {
00502 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00503 WriterMapType::iterator where = writers_.find(pub_id);
00504 if (writers_.end() != where) {
00505 notify = where->second->notify_lost_;
00506 push_back(writers, pub_id);
00507 }
00508 }
00509 remove_associations_i(writers, notify);
00510 }
00511
00512 void
00513 DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
00514 bool notify_lost)
00515 {
00516 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00517
00518 if (writers.length() == 0) {
00519 return;
00520 }
00521
00522 if (DCPS_debug_level >= 1) {
00523 GuidConverter reader_converter(subscription_id_);
00524 GuidConverter writer_converter(writers[0]);
00525 ACE_DEBUG((LM_DEBUG,
00526 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00527 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00528 is_bit_,
00529 OPENDDS_STRING(reader_converter).c_str(),
00530 OPENDDS_STRING(writer_converter).c_str(),
00531 writers.length()));
00532 }
00533 DDS::InstanceHandleSeq handles;
00534
00535 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00536
00537
00538
00539
00540 WriterIdSeq updated_writers;
00541
00542 CORBA::ULong wr_len;
00543
00544
00545
00546
00547
00548 {
00549 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00550
00551 wr_len = writers.length();
00552
00553 for (CORBA::ULong i = 0; i < wr_len; i++) {
00554 PublicationId writer_id = writers[i];
00555
00556 WriterMapType::iterator it = this->writers_.find(writer_id);
00557
00558 if (it != this->writers_.end()) {
00559 it->second->removed();
00560 end_historic_sweeper_->cancel_timer(it->second);
00561 remove_association_sweeper_->cancel_timer(it->second);
00562 }
00563
00564 if (this->writers_.erase(writer_id) == 0) {
00565 if (DCPS_debug_level >= 1) {
00566 GuidConverter converter(writer_id);
00567 ACE_DEBUG((LM_DEBUG,
00568 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00569 ACE_TEXT("the writer local %C was already removed.\n"),
00570 OPENDDS_STRING(converter).c_str()));
00571 }
00572
00573 } else {
00574 push_back(updated_writers, writer_id);
00575 }
00576 }
00577 }
00578
00579 wr_len = updated_writers.length();
00580
00581
00582 if (wr_len == 0) {
00583 return;
00584 }
00585
00586 if (!is_bit_) {
00587
00588 this->lookup_instance_handles(updated_writers, handles);
00589
00590 for (CORBA::ULong i = 0; i < wr_len; ++i) {
00591 id_to_handle_map_.erase(updated_writers[i]);
00592 }
00593 }
00594
00595 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00596 {
00597 this->disassociate(updated_writers[i]);
00598 }
00599 }
00600
00601
00602 if (!this->is_bit_) {
00603
00604 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00605 this->subscription_match_status_.current_count_change
00606 = matchedPublications - this->subscription_match_status_.current_count;
00607
00608
00609 if (this->subscription_match_status_.current_count_change != 0) {
00610 this->subscription_match_status_.current_count = matchedPublications;
00611
00612
00613
00614
00615 this->subscription_match_status_.last_publication_handle
00616 = handles[ wr_len - 1];
00617
00618 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00619
00620 DDS::DataReaderListener_var listener
00621 = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00622
00623 if (!CORBA::is_nil(listener.in())) {
00624 listener->on_subscription_matched(this, this->subscription_match_status_);
00625
00626
00627 this->subscription_match_status_.total_count_change = 0;
00628 this->subscription_match_status_.current_count_change = 0;
00629 }
00630 notify_status_condition();
00631 }
00632 }
00633
00634
00635
00636
00637 if (notify_lost) {
00638 this->notify_subscription_lost(handles);
00639 }
00640
00641 if (this->monitor_) {
00642 this->monitor_->report();
00643 }
00644 }
00645
00646 void
00647 DataReaderImpl::remove_all_associations()
00648 {
00649 DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00650
00651 this->stop_associating();
00652
00653 OpenDDS::DCPS::WriterIdSeq writers;
00654 int size;
00655
00656 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00657
00658 {
00659 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00660
00661 size = static_cast<int>(writers_.size());
00662 writers.length(size);
00663
00664 WriterMapType::iterator curr_writer = writers_.begin();
00665 WriterMapType::iterator end_writer = writers_.end();
00666
00667 int i = 0;
00668
00669 while (curr_writer != end_writer) {
00670 writers[i++] = curr_writer->first;
00671 ++curr_writer;
00672 }
00673 }
00674
00675 try {
00676 CORBA::Boolean dont_notify_lost = 0;
00677
00678 if (0 < size) {
00679 remove_associations(writers, dont_notify_lost);
00680 }
00681
00682 } catch (const CORBA::Exception&) {
00683 ACE_DEBUG((LM_WARNING,
00684 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00685 ACE_TEXT("caught exception from remove_associations.\n")));
00686 }
00687 }
00688
00689 void
00690 DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00691 {
00692 DDS::DataReaderListener_var listener =
00693 listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00694
00695 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00696 guard,
00697 this->publication_handle_lock_);
00698
00699
00700 if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00701
00702 return;
00703 }
00704
00705 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00706 true);
00707
00708
00709 requested_incompatible_qos_status_.total_count = status.total_count;
00710 requested_incompatible_qos_status_.total_count_change +=
00711 status.count_since_last_send;
00712 requested_incompatible_qos_status_.last_policy_id =
00713 status.last_policy_id;
00714 requested_incompatible_qos_status_.policies = status.policies;
00715
00716 if (!CORBA::is_nil(listener.in())) {
00717 listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
00718
00719
00720
00721
00722
00723
00724 requested_incompatible_qos_status_.total_count_change = 0;
00725 }
00726
00727 notify_status_condition();
00728 }
00729
00730 void
00731 DataReaderImpl::inconsistent_topic()
00732 {
00733 topic_servant_->inconsistent_topic();
00734 }
00735
00736 void
00737 DataReaderImpl::signal_liveliness(const RepoId& remote_participant)
00738 {
00739 RepoId prefix = remote_participant;
00740 prefix.entityId = EntityId_t();
00741
00742 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00743
00744 typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00745 typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00746 WriterSet writers;
00747
00748 {
00749 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750 for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00751 limit = writers_.end();
00752 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00753 ++pos) {
00754 writers.push_back(std::make_pair(pos->first, pos->second));
00755 }
00756 }
00757
00758 ACE_Time_Value when = ACE_OS::gettimeofday();
00759 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00760 pos != limit;
00761 ++pos) {
00762 pos->second->received_activity(when);
00763 }
00764
00765 if (!writers.empty()) {
00766 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00767 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00768 pos != limit;
00769 ++pos) {
00770 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00771 iter != instances_.end();
00772 ++iter) {
00773 SubscriptionInstance_rch ptr = iter->second;
00774 ptr->instance_state_.lively(pos->first);
00775 }
00776 }
00777 }
00778 }
00779
00780 DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
00781 DDS::SampleStateMask sample_states,
00782 DDS::ViewStateMask view_states,
00783 DDS::InstanceStateMask instance_states)
00784 {
00785 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00786 DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00787 view_states, instance_states);
00788 read_conditions_.insert(rc);
00789 return rc._retn();
00790 }
00791
00792 #ifndef OPENDDS_NO_QUERY_CONDITION
00793 DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
00794 DDS::SampleStateMask sample_states,
00795 DDS::ViewStateMask view_states,
00796 DDS::InstanceStateMask instance_states,
00797 const char* query_expression,
00798 const DDS::StringSeq& query_parameters)
00799 {
00800 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00801 try {
00802 DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00803 view_states, instance_states, query_expression);
00804 if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
00805 return 0;
00806 }
00807 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00808 read_conditions_.insert(rc);
00809 return qc._retn();
00810 } catch (const std::exception& e) {
00811 if (DCPS_debug_level) {
00812 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
00813 ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00814 e.what()));
00815 }
00816 }
00817 return 0;
00818 }
00819 #endif
00820
00821 bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
00822 {
00823
00824 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00825 return read_conditions_.find(rc) != read_conditions_.end();
00826 }
00827
00828 DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
00829 DDS::ReadCondition_ptr a_condition)
00830 {
00831 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00832 DDS::RETCODE_OUT_OF_RESOURCES);
00833 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00834 return read_conditions_.erase(rc)
00835 ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00836 }
00837
00838 DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
00839 {
00840 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00841 DDS::RETCODE_OUT_OF_RESOURCES);
00842 read_conditions_.clear();
00843 return DDS::RETCODE_OK;
00844 }
00845
00846 DDS::ReturnCode_t DataReaderImpl::set_qos(
00847 const DDS::DataReaderQos & qos)
00848 {
00849
00850 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00852 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00853
00854 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00855 if (qos_ == qos)
00856 return DDS::RETCODE_OK;
00857
00858 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00859 return DDS::RETCODE_IMMUTABLE_POLICY;
00860
00861 } else {
00862 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00863 DDS::SubscriberQos subscriberQos;
00864
00865 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
00866 bool status = false;
00867 if (subscriber) {
00868 subscriber->get_qos(subscriberQos);
00869 status =
00870 disco->update_subscription_qos(
00871 domain_id_,
00872 dp_id_,
00873 this->subscription_id_,
00874 qos,
00875 subscriberQos);
00876 }
00877 if (!status) {
00878 ACE_ERROR_RETURN((LM_ERROR,
00879 ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00880 ACE_TEXT("qos not updated. \n")),
00881 DDS::RETCODE_ERROR);
00882 }
00883 }
00884
00885
00886 qos_change(qos);
00887 qos_ = qos;
00888
00889 return DDS::RETCODE_OK;
00890
00891 } else {
00892 return DDS::RETCODE_INCONSISTENT_POLICY;
00893 }
00894 }
00895
00896 void DataReaderImpl::qos_change(const DDS::DataReaderQos & qos)
00897 {
00898
00899 if (qos_.deadline.period.sec != qos.deadline.period.sec ||
00900 qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00901 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00902 qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00903
00904 this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
00905 ref(this->sample_lock_),
00906 qos.deadline,
00907 ref(*this),
00908 ref(this->requested_deadline_missed_status_),
00909 ref(this->last_deadline_missed_total_count_));
00910
00911 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00912 qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00913 watchdog_->cancel_all();
00914 watchdog_.reset();
00915 } else {
00916 watchdog_->reset_interval(
00917 duration_to_time_value(qos.deadline.period));
00918 }
00919 }
00920 }
00921
00922 DDS::ReturnCode_t
00923 DataReaderImpl::get_qos(
00924 DDS::DataReaderQos & qos)
00925 {
00926 qos = qos_;
00927 return DDS::RETCODE_OK;
00928 }
00929
00930 DDS::ReturnCode_t DataReaderImpl::set_listener(
00931 DDS::DataReaderListener_ptr a_listener,
00932 DDS::StatusMask mask)
00933 {
00934 listener_mask_ = mask;
00935
00936 listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00937 return DDS::RETCODE_OK;
00938 }
00939
00940 DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
00941 {
00942 return DDS::DataReaderListener::_duplicate(listener_.in());
00943 }
00944
00945 DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
00946 {
00947 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00948 if (content_filtered_topic_) {
00949 return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
00950 }
00951 #endif
00952 return DDS::TopicDescription::_duplicate(topic_desc_.in());
00953 }
00954
00955 DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
00956 {
00957 return get_subscriber_servant()._retn();
00958 }
00959
00960 DDS::ReturnCode_t
00961 DataReaderImpl::get_sample_rejected_status(
00962 DDS::SampleRejectedStatus & status)
00963 {
00964 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00965
00966 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
00967 status = sample_rejected_status_;
00968 sample_rejected_status_.total_count_change = 0;
00969 return DDS::RETCODE_OK;
00970 }
00971
00972 DDS::ReturnCode_t
00973 DataReaderImpl::get_liveliness_changed_status(
00974 DDS::LivelinessChangedStatus & status)
00975 {
00976 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00977
00978 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
00979 false);
00980 status = liveliness_changed_status_;
00981
00982 liveliness_changed_status_.alive_count_change = 0;
00983 liveliness_changed_status_.not_alive_count_change = 0;
00984
00985 return DDS::RETCODE_OK;
00986 }
00987
00988 DDS::ReturnCode_t
00989 DataReaderImpl::get_requested_deadline_missed_status(
00990 DDS::RequestedDeadlineMissedStatus & status)
00991 {
00992 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00993
00994 set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
00995 false);
00996
00997 this->requested_deadline_missed_status_.total_count_change =
00998 this->requested_deadline_missed_status_.total_count
00999 - this->last_deadline_missed_total_count_;
01000
01001
01002
01003
01004
01005 this->last_deadline_missed_total_count_ =
01006 this->requested_deadline_missed_status_.total_count;
01007
01008 status = requested_deadline_missed_status_;
01009
01010 return DDS::RETCODE_OK;
01011 }
01012
01013 DDS::ReturnCode_t
01014 DataReaderImpl::get_requested_incompatible_qos_status(
01015 DDS::RequestedIncompatibleQosStatus & status)
01016 {
01017
01018 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01019 this->publication_handle_lock_);
01020
01021 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
01022 false);
01023 status = requested_incompatible_qos_status_;
01024 requested_incompatible_qos_status_.total_count_change = 0;
01025
01026 return DDS::RETCODE_OK;
01027 }
01028
01029 DDS::ReturnCode_t
01030 DataReaderImpl::get_subscription_matched_status(
01031 DDS::SubscriptionMatchedStatus & status)
01032 {
01033
01034 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01035 this->publication_handle_lock_);
01036
01037 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
01038 status = subscription_match_status_;
01039 subscription_match_status_.total_count_change = 0;
01040 subscription_match_status_.current_count_change = 0;
01041
01042 return DDS::RETCODE_OK;
01043 }
01044
01045 DDS::ReturnCode_t
01046 DataReaderImpl::get_sample_lost_status(
01047 DDS::SampleLostStatus & status)
01048 {
01049 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01050
01051 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
01052 status = sample_lost_status_;
01053 sample_lost_status_.total_count_change = 0;
01054 return DDS::RETCODE_OK;
01055 }
01056
01057 DDS::ReturnCode_t
01058 DataReaderImpl::wait_for_historical_data(
01059 const DDS::Duration_t & )
01060 {
01061
01062 return 0;
01063 }
01064
01065 DDS::ReturnCode_t
01066 DataReaderImpl::get_matched_publications(
01067 DDS::InstanceHandleSeq & publication_handles)
01068 {
01069 if (enabled_ == false) {
01070 ACE_ERROR_RETURN((LM_ERROR,
01071 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01072 ACE_TEXT(" Entity is not enabled. \n")),
01073 DDS::RETCODE_NOT_ENABLED);
01074 }
01075
01076 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01077 guard,
01078 this->publication_handle_lock_,
01079 DDS::RETCODE_ERROR);
01080
01081
01082 int index = 0;
01083 publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01084
01085 for (RepoIdToHandleMap::iterator
01086 current = this->id_to_handle_map_.begin();
01087 current != this->id_to_handle_map_.end();
01088 ++current, ++index) {
01089 publication_handles[ index] = current->second;
01090 }
01091
01092 return DDS::RETCODE_OK;
01093 }
01094
01095 #if !defined (DDS_HAS_MINIMUM_BIT)
01096 DDS::ReturnCode_t
01097 DataReaderImpl::get_matched_publication_data(
01098 DDS::PublicationBuiltinTopicData & publication_data,
01099 DDS::InstanceHandle_t publication_handle)
01100 {
01101 if (enabled_ == false) {
01102 ACE_ERROR_RETURN((LM_ERROR,
01103 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01104 ACE_TEXT("get_matched_publication_data: ")
01105 ACE_TEXT("Entity is not enabled. \n")),
01106 DDS::RETCODE_NOT_ENABLED);
01107 }
01108
01109
01110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01111 guard,
01112 this->publication_handle_lock_,
01113 DDS::RETCODE_ERROR);
01114
01115 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01116
01117 if (!participant)
01118 return DDS::RETCODE_ERROR;
01119
01120 DDS::PublicationBuiltinTopicDataSeq data;
01121 const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01122 participant.in(),
01123 BUILT_IN_PUBLICATION_TOPIC,
01124 publication_handle,
01125 data);
01126
01127 if (ret == DDS::RETCODE_OK) {
01128 publication_data = data[0];
01129 }
01130
01131 return ret;
01132 }
01133 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01134
01135 DDS::ReturnCode_t
01136 DataReaderImpl::enable()
01137 {
01138
01139
01140
01141
01142
01143
01144 if (this->is_enabled()) {
01145 return DDS::RETCODE_OK;
01146 }
01147
01148 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01149 if (!subscriber) {
01150 return DDS::RETCODE_ERROR;
01151 }
01152
01153 if (!subscriber->is_enabled()) {
01154 return DDS::RETCODE_PRECONDITION_NOT_MET;
01155 }
01156
01157 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01158 if (participant) {
01159 dp_id_ = participant->get_id();
01160 }
01161
01162 if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01163
01164
01165 depth_ = qos_.resource_limits.max_samples_per_instance;
01166
01167 } else {
01168 depth_ = qos_.history.depth;
01169 }
01170
01171 if (depth_ == DDS::LENGTH_UNLIMITED) {
01172
01173
01174
01175
01176
01177
01178 depth_ = 2147483647L;
01179 }
01180
01181 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01182 n_chunks_ = qos_.resource_limits.max_samples;
01183 }
01184
01185
01186
01187
01188 this->enable_specific();
01189
01190
01191
01192 rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_));
01193
01194 if (DCPS_debug_level >= 2)
01195 ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01196 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01197 rd_allocator_.get(), n_chunks_));
01198
01199 if ((qos_.liveliness.lease_duration.sec !=
01200 DDS::DURATION_INFINITE_SEC) &&
01201 (qos_.liveliness.lease_duration.nanosec !=
01202 DDS::DURATION_INFINITE_NSEC)) {
01203 liveliness_lease_duration_ =
01204 duration_to_time_value(qos_.liveliness.lease_duration);
01205 }
01206
01207
01208
01209 DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01210
01211 if (!this->watchdog_
01212 && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01213 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01214 this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
01215 ref(this->sample_lock_),
01216 this->qos_.deadline,
01217 ref(*this),
01218 ref(this->requested_deadline_missed_status_),
01219 ref(this->last_deadline_missed_total_count_));
01220 }
01221
01222 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01223 disco->pre_reader(this);
01224
01225 this->set_enabled();
01226
01227 if (topic_servant_ && !transport_disabled_) {
01228
01229 try {
01230 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01231 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01232 } catch (const Transport::Exception&) {
01233 ACE_ERROR((LM_ERROR,
01234 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01235 ACE_TEXT("Transport Exception.\n")));
01236 return DDS::RETCODE_ERROR;
01237
01238 }
01239
01240 const TransportLocatorSeq& trans_conf_info = this->connection_info();
01241
01242 CORBA::String_var filterClassName = "";
01243 CORBA::String_var filterExpression = "";
01244 DDS::StringSeq exprParams;
01245 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01246
01247 if (content_filtered_topic_) {
01248 filterClassName = content_filtered_topic_->get_filter_class_name();
01249 filterExpression = content_filtered_topic_->get_filter_expression();
01250 content_filtered_topic_->get_expression_parameters(exprParams);
01251 }
01252
01253 #endif
01254
01255 DDS::SubscriberQos sub_qos;
01256 subscriber->get_qos(sub_qos);
01257
01258 this->subscription_id_ =
01259 disco->add_subscription(this->domain_id_,
01260 this->dp_id_,
01261 this->topic_servant_->get_id(),
01262 this,
01263 this->qos_,
01264 trans_conf_info,
01265 sub_qos,
01266 filterClassName,
01267 filterExpression,
01268 exprParams);
01269
01270 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01271 ACE_ERROR((LM_WARNING,
01272 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::enable, ")
01273 ACE_TEXT("add_subscription returned invalid id.\n")));
01274 return DDS::RETCODE_ERROR;
01275 }
01276 }
01277
01278 if (topic_servant_) {
01279 const CORBA::String_var name = topic_servant_->get_name();
01280 DDS::ReturnCode_t return_value =
01281 subscriber->reader_enabled(name.in(), this);
01282
01283 if (this->monitor_) {
01284 this->monitor_->report();
01285 }
01286
01287 return return_value;
01288 } else {
01289 return DDS::RETCODE_OK;
01290 }
01291 }
01292
01293 void
01294 DataReaderImpl::writer_activity(const DataSampleHeader& header)
01295 {
01296
01297
01298 RcHandle<WriterInfo> writer;
01299
01300
01301
01302
01303
01304 {
01305 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01306 WriterMapType::iterator iter = writers_.find(header.publication_id_);
01307
01308 if (iter != writers_.end()) {
01309 writer = iter->second;
01310
01311 } else if (DCPS_debug_level > 4) {
01312
01313
01314
01315 GuidConverter reader_converter(subscription_id_);
01316 GuidConverter writer_converter(header.publication_id_);
01317 ACE_DEBUG((LM_DEBUG,
01318 ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01319 ACE_TEXT("reader %C is not associated with writer %C.\n"),
01320 OPENDDS_STRING(reader_converter).c_str(),
01321 OPENDDS_STRING(writer_converter).c_str()));
01322 }
01323 }
01324
01325 if (!writer.is_nil()) {
01326 ACE_Time_Value when = ACE_OS::gettimeofday();
01327 writer->received_activity(when);
01328
01329 if ((header.message_id_ == SAMPLE_DATA) ||
01330 (header.message_id_ == INSTANCE_REGISTRATION) ||
01331 (header.message_id_ == UNREGISTER_INSTANCE) ||
01332 (header.message_id_ == DISPOSE_INSTANCE) ||
01333 (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01334
01335 const SequenceNumber defaultSN;
01336 SequenceRange resetRange(defaultSN, header.sequence_);
01337
01338 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01339 if (header.coherent_change_) {
01340 if (writer->coherent_samples_ == 0) {
01341 writer->coherent_sample_sequence_.reset();
01342 writer->coherent_sample_sequence_.insert(resetRange);
01343 }
01344 else {
01345 writer->coherent_sample_sequence_.insert(header.sequence_);
01346 }
01347 }
01348 #endif
01349 }
01350 }
01351 }
01352
01353 void
01354 DataReaderImpl::data_received(const ReceivedDataSample& sample)
01355 {
01356 DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01357
01358
01359
01360 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01361
01362 if (get_deleted()) return;
01363
01364 if (DCPS_debug_level > 9) {
01365 GuidConverter converter(subscription_id_);
01366 ACE_DEBUG((LM_DEBUG,
01367 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01368 ACE_TEXT("%C received sample: %C.\n"),
01369 OPENDDS_STRING(converter).c_str(),
01370 to_string(sample.header_).c_str()));
01371 }
01372
01373 switch (sample.header_.message_id_) {
01374 case SAMPLE_DATA:
01375 case INSTANCE_REGISTRATION: {
01376 if (!check_historic(sample)) break;
01377
01378 DataSampleHeader const & header = sample.header_;
01379
01380 this->writer_activity(header);
01381
01382
01383 if (this->filter_sample(header)) break;
01384
01385
01386 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01387 if (subscriber)
01388 subscriber->data_received(this);
01389
01390
01391 if (header.message_id_ == SAMPLE_DATA) {
01392 this->process_latency(sample);
01393 }
01394
01395
01396
01397
01398 SubscriptionInstance_rch instance;
01399 bool is_new_instance = false;
01400 bool filtered = false;
01401 if (sample.header_.key_fields_only_) {
01402 dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01403 } else {
01404 dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01405 }
01406
01407
01408 if (DCPS_debug_level >= 8) {
01409 GuidConverter reader_converter(subscription_id_);
01410 GuidConverter writer_converter(header.publication_id_);
01411
01412 ACE_DEBUG ((LM_DEBUG,
01413 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01414 ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01415 OPENDDS_STRING(reader_converter).c_str(),
01416 OPENDDS_STRING(writer_converter).c_str(),
01417 instance ? instance->instance_handle_ : 0,
01418 is_new_instance, filtered));
01419 }
01420
01421 if (filtered) break;
01422
01423 if (instance) accept_sample_processing(instance, header, is_new_instance);
01424 }
01425 break;
01426
01427 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01428 case END_COHERENT_CHANGES: {
01429 CoherentChangeControl control;
01430
01431 this->writer_activity(sample.header_);
01432
01433 Serializer serializer(
01434 sample.sample_.get(), sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01435 if (!(serializer >> control)) {
01436 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01437 ACE_TEXT("deserialization coherent change control failed.\n")));
01438 return;
01439 }
01440
01441 if (DCPS_debug_level > 0) {
01442 std::stringstream buffer;
01443 buffer << control << std::endl;
01444
01445 ACE_DEBUG((LM_DEBUG,
01446 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01447 ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01448 buffer.str().c_str()));
01449 }
01450
01451 RcHandle<WriterInfo> writer;
01452 {
01453 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01454
01455 WriterMapType::iterator it =
01456 this->writers_.find(sample.header_.publication_id_);
01457
01458 if (it == this->writers_.end()) {
01459 GuidConverter sub_id(this->subscription_id_);
01460 GuidConverter pub_id(sample.header_.publication_id_);
01461 ACE_DEBUG((LM_WARNING,
01462 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01463 ACE_TEXT(" subscription %C failed to find ")
01464 ACE_TEXT(" publication data for %C!\n"),
01465 OPENDDS_STRING(sub_id).c_str(),
01466 OPENDDS_STRING(pub_id).c_str()));
01467 return;
01468 }
01469 else {
01470 writer = it->second;
01471 }
01472 it->second->set_group_info (control);
01473 }
01474
01475 if (this->verify_coherent_changes_completion(writer.in())) {
01476 this->notify_read_conditions();
01477 }
01478 }
01479 break;
01480 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01481
01482 case DATAWRITER_LIVELINESS: {
01483 if (DCPS_debug_level >= 4) {
01484 GuidConverter reader_converter(subscription_id_);
01485 GuidConverter writer_converter(sample.header_.publication_id_);
01486 ACE_DEBUG((LM_DEBUG,
01487 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01488 ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01489 OPENDDS_STRING(reader_converter).c_str(),
01490 OPENDDS_STRING(writer_converter).c_str()));
01491 }
01492 this->writer_activity(sample.header_);
01493
01494
01495 {
01496 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01497 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01498 iter != instances_.end();
01499 ++iter) {
01500 iter->second->instance_state_.lively(sample.header_.publication_id_);
01501 }
01502 }
01503
01504 }
01505 break;
01506
01507 case DISPOSE_INSTANCE: {
01508 if (!check_historic(sample)) break;
01509 this->writer_activity(sample.header_);
01510 SubscriptionInstance_rch instance;
01511
01512 if (this->watchdog_.in()) {
01513
01514
01515
01516 ReceivedDataSample dup(sample);
01517 this->lookup_instance(dup, instance);
01518 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01519 OwnershipManagerPtr owner_manager = this->ownership_manager();
01520
01521 if (! this->is_exclusive_ownership_
01522 || (owner_manager
01523 && (instance)
01524 && (owner_manager->is_owner (instance->instance_handle_,
01525 sample.header_.publication_id_)))) {
01526 #endif
01527 this->watchdog_->cancel_timer(instance);
01528 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01529 }
01530 #endif
01531 }
01532 instance.reset();
01533 this->dispose_unregister(sample, instance);
01534 }
01535 this->notify_read_conditions();
01536 break;
01537
01538 case UNREGISTER_INSTANCE: {
01539 if (!check_historic(sample)) break;
01540 this->writer_activity(sample.header_);
01541 SubscriptionInstance_rch instance;
01542
01543 if (this->watchdog_.in()) {
01544
01545
01546
01547 ReceivedDataSample dup(sample);
01548 this->lookup_instance(dup, instance);
01549 if (instance) {
01550 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01551 if (! this->is_exclusive_ownership_
01552 || (this->is_exclusive_ownership_
01553 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01554 #endif
01555 this->watchdog_->cancel_timer(instance);
01556 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01557 }
01558 #endif
01559 }
01560 }
01561 instance.reset();
01562 this->dispose_unregister(sample, instance);
01563 }
01564 this->notify_read_conditions();
01565 break;
01566
01567 case DISPOSE_UNREGISTER_INSTANCE: {
01568 if (!check_historic(sample)) break;
01569 this->writer_activity(sample.header_);
01570 SubscriptionInstance_rch instance;
01571
01572 if (this->watchdog_.in()) {
01573
01574
01575
01576 ReceivedDataSample dup(sample);
01577 this->lookup_instance(dup, instance);
01578 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01579 OwnershipManagerPtr owner_manager = this->ownership_manager();
01580 if (! this->is_exclusive_ownership_
01581 || (owner_manager
01582 && (instance)
01583 && (owner_manager->is_owner (instance->instance_handle_,
01584 sample.header_.publication_id_)))
01585 || (this->is_exclusive_ownership_
01586 && (instance)
01587 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01588 #endif
01589 if (instance) {
01590 this->watchdog_->cancel_timer(instance);
01591 }
01592 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01593 }
01594 #endif
01595 }
01596 instance.reset();
01597 this->dispose_unregister(sample, instance);
01598 }
01599 this->notify_read_conditions();
01600 break;
01601
01602 case END_HISTORIC_SAMPLES: {
01603 if (sample.header_.message_length_ >= sizeof(RepoId)) {
01604 Serializer ser(sample.sample_.get());
01605 RepoId readerId = GUID_UNKNOWN;
01606 if (!(ser >> readerId)) {
01607 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01608 ACE_TEXT("deserialization reader failed.\n")));
01609 return;
01610 }
01611 if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01612 break;
01613 }
01614 }
01615 if (DCPS_debug_level > 4) {
01616 ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01617 }
01618
01619 guard.release();
01620 this->resume_sample_processing(sample.header_.publication_id_);
01621 if (DCPS_debug_level > 4) {
01622 GuidConverter pub_id(sample.header_.publication_id_);
01623 ACE_DEBUG((
01624 LM_INFO,
01625 "(%P|%t) Resumed sample processing for durable writer %C\n",
01626 OPENDDS_STRING(pub_id).c_str()));
01627 }
01628 break;
01629 }
01630
01631 default:
01632 ACE_ERROR((LM_ERROR,
01633 "(%P|%t) ERROR: DataReaderImpl::data_received"
01634 "unexpected message_id = %d\n",
01635 sample.header_.message_id_));
01636 break;
01637 }
01638 }
01639
01640 RcHandle<EntityImpl>
01641 DataReaderImpl::parent() const
01642 {
01643 return this->subscriber_servant_.lock();
01644 }
01645
01646 bool
01647 DataReaderImpl::check_transport_qos(const TransportInst& ti)
01648 {
01649 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01650 return ti.is_reliable();
01651 }
01652 return true;
01653 }
01654
01655 void DataReaderImpl::notify_read_conditions()
01656 {
01657
01658 ReadConditionSet local_read_conditions = read_conditions_;
01659 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01660
01661 for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01662 end = local_read_conditions.end(); it != end; ++it) {
01663 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
01664 if (ci) {
01665 ci->signal_all();
01666 } else {
01667 ACE_ERROR((LM_ERROR,
01668 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
01669 ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
01670 }
01671 }
01672 }
01673
01674 RcHandle<SubscriberImpl>
01675 DataReaderImpl::get_subscriber_servant()
01676 {
01677 return subscriber_servant_.lock();
01678 }
01679
01680 RepoId DataReaderImpl::get_subscription_id() const
01681 {
01682 return subscription_id_;
01683 }
01684
01685 bool DataReaderImpl::have_sample_states(
01686 DDS::SampleStateMask sample_states) const
01687 {
01688
01689
01690 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01691
01692 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01693 iter != instances_.end();
01694 ++iter) {
01695 SubscriptionInstance_rch ptr = iter->second;
01696
01697 for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01698 item != 0; item = item->next_data_sample_) {
01699 if (item->sample_state_ & sample_states) {
01700 return true;
01701 }
01702 }
01703 }
01704
01705 return false;
01706 }
01707
01708 bool
01709 DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
01710 {
01711
01712
01713 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01714
01715 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01716 iter != instances_.end();
01717 ++iter) {
01718 SubscriptionInstance_rch ptr = iter->second;
01719
01720 if (ptr->instance_state_.view_state() & view_states) {
01721 return true;
01722 }
01723 }
01724
01725 return false;
01726 }
01727
01728 bool DataReaderImpl::have_instance_states(
01729 DDS::InstanceStateMask instance_states) const
01730 {
01731
01732
01733 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01734
01735 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01736 iter != instances_.end();
01737 ++iter) {
01738 SubscriptionInstance_rch ptr = iter->second;
01739
01740 if (ptr->instance_state_.instance_state() & instance_states) {
01741 return true;
01742 }
01743 }
01744
01745 return false;
01746 }
01747
01748
01749
01750 bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
01751 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
01752 {
01753 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01754 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01755
01756 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01757 end = instances_.end(); iter != end; ++iter) {
01758 SubscriptionInstance& inst = *iter->second;
01759
01760 if ((inst.instance_state_.view_state() & view_states) &&
01761 (inst.instance_state_.instance_state() & instance_states)) {
01762 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01763 item = item->next_data_sample_) {
01764 if (item->sample_state_ & sample_states
01765 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01766 && !item->coherent_change_
01767 #endif
01768 ) {
01769 return true;
01770 }
01771 }
01772 }
01773 }
01774
01775 return false;
01776 }
01777
01778 DDS::DataReaderListener_ptr
01779 DataReaderImpl::listener_for(DDS::StatusKind kind)
01780 {
01781
01782
01783
01784 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01785 if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
01786 return subscriber->listener_for(kind);
01787
01788 } else {
01789 return DDS::DataReaderListener::_duplicate(listener_.in());
01790 }
01791 }
01792
01793 void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
01794 const ReceivedDataElement *ptr)
01795 {
01796
01797 sample_info.sample_rank = 0;
01798
01799
01800
01801
01802
01803
01804
01805 sample_info.generation_rank =
01806 (sample_info.disposed_generation_count +
01807 sample_info.no_writers_generation_count) -
01808 sample_info.generation_rank;
01809
01810
01811
01812
01813
01814
01815
01816 sample_info.absolute_generation_rank =
01817 (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01818 static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01819 sample_info.absolute_generation_rank;
01820
01821 sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01822 }
01823
01824 CORBA::Long DataReaderImpl::total_samples() const
01825 {
01826
01827 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01828
01829 CORBA::Long count(0);
01830
01831 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01832 iter != instances_.end();
01833 ++iter) {
01834 SubscriptionInstance_rch ptr = iter->second;
01835
01836 count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01837 }
01838
01839 return count;
01840 }
01841
01842 int
01843 DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
01844 const void * )
01845 {
01846 check_liveliness_i(false, tv);
01847 return 0;
01848 }
01849
01850 void
01851 DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
01852 const ACE_Time_Value& now)
01853 {
01854
01855
01856 RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
01857 if (! data_reader) {
01858 this->reactor()->purge_pending_notifications(this);
01859 return;
01860 }
01861
01862 long local_timer_id = liveliness_timer_id_;
01863 bool timer_was_reset = false;
01864
01865 if (local_timer_id != -1 && cancel) {
01866 if (DCPS_debug_level >= 5) {
01867 GuidConverter converter(data_reader->get_subscription_id());
01868 ACE_DEBUG((LM_DEBUG,
01869 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01870 ACE_TEXT(" canceling timer for reader %C.\n"),
01871 OPENDDS_STRING(converter).c_str()));
01872 }
01873
01874
01875
01876 if (this->reactor()->cancel_timer(local_timer_id) == -1) {
01877
01878
01879
01880 ACE_DEBUG((LM_DEBUG,
01881 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01882 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
01883 }
01884
01885 timer_was_reset = true;
01886 }
01887
01888
01889 ACE_Time_Value smallest(ACE_Time_Value::max_time);
01890 int alive_writers = 0;
01891
01892
01893
01894
01895
01896
01897
01898
01899
01900
01901 if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
01902 liveliness_timer_id_ = -1;
01903 }
01904
01905 ACE_Time_Value next_absolute;
01906
01907
01908 {
01909 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
01910 read_guard,
01911 data_reader->writers_lock_);
01912
01913 for (WriterMapType::iterator iter = data_reader->writers_.begin();
01914 iter != data_reader->writers_.end();
01915 ++iter) {
01916
01917
01918 next_absolute = iter->second->check_activity(now);
01919
01920 if (next_absolute != ACE_Time_Value::max_time) {
01921 alive_writers++;
01922
01923 if (next_absolute < smallest) {
01924 smallest = next_absolute;
01925 }
01926 }
01927 }
01928 }
01929
01930 if (!alive_writers) {
01931
01932
01933 liveliness_timer_id_ = -1;
01934 }
01935
01936 if (DCPS_debug_level >= 5) {
01937 GuidConverter converter(data_reader->get_subscription_id());
01938 ACE_DEBUG((LM_DEBUG,
01939 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01940 ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
01941 OPENDDS_STRING(converter).c_str(),
01942 alive_writers,
01943 !cancel));
01944 }
01945
01946
01947 if (alive_writers) {
01948 ACE_Time_Value relative;
01949
01950
01951 if (now < smallest) {
01952 relative = smallest - now;
01953 } else {
01954 relative = ACE_Time_Value(0,1);
01955 }
01956
01957 liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
01958
01959 if (liveliness_timer_id_ == -1) {
01960 ACE_ERROR((LM_ERROR,
01961 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01962 ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
01963 }
01964 }
01965 }
01966
01967 void
01968 DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
01969 {
01970 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01971 OwnershipManagerPtr owner_manager = this->ownership_manager();
01972 if (owner_manager) {
01973 owner_manager->remove_writers (handle);
01974 }
01975 #endif
01976
01977 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01978 SubscriptionInstance_rch instance = this->get_handle_instance(handle);
01979
01980 if (!instance) {
01981 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
01982 "could not find the instance by handle 0x%x\n", handle));
01983 return;
01984 }
01985
01986 this->purge_data(instance);
01987
01988 {
01989 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01990 instances_.erase(handle);
01991 }
01992
01993 this->release_instance_i(handle);
01994 if (this->monitor_) {
01995 this->monitor_->report();
01996 }
01997 }
01998
01999
02000 OpenDDS::DCPS::WriterStats::WriterStats(
02001 int amount,
02002 DataCollector<double>::OnFull type) : stats_(amount, type)
02003 {
02004 }
02005
02006 void OpenDDS::DCPS::WriterStats::add_stat(const ACE_Time_Value& delay)
02007 {
02008 double datum = static_cast<double>(delay.sec());
02009 datum += delay.usec() / 1000000.0;
02010 this->stats_.add(datum);
02011 }
02012
02013 OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
02014 {
02015 LatencyStatistics value;
02016
02017 value.publication = GUID_UNKNOWN;
02018 value.n = this->stats_.n();
02019 value.maximum = this->stats_.maximum();
02020 value.minimum = this->stats_.minimum();
02021 value.mean = this->stats_.mean();
02022 value.variance = this->stats_.var();
02023
02024 return value;
02025 }
02026
02027 void OpenDDS::DCPS::WriterStats::reset_stats()
02028 {
02029 this->stats_.reset();
02030 }
02031
02032 #ifndef OPENDDS_SAFETY_PROFILE
02033 std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
02034 {
02035 str << std::dec << this->stats_.size()
02036 << " samples out of " << this->stats_.n() << std::endl;
02037 return str << this->stats_;
02038 }
02039 #endif //OPENDDS_SAFETY_PROFILE
02040
02041 void
02042 DataReaderImpl::writer_removed(WriterInfo& info)
02043 {
02044 if (DCPS_debug_level >= 5) {
02045 GuidConverter reader_converter(subscription_id_);
02046 GuidConverter writer_converter(info.writer_id_);
02047 ACE_DEBUG((LM_DEBUG,
02048 ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02049 ACE_TEXT("reader %C from writer %C.\n"),
02050 OPENDDS_STRING(reader_converter).c_str(),
02051 OPENDDS_STRING(writer_converter).c_str()));
02052 }
02053
02054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02055 OwnershipManagerPtr owner_manager = this->ownership_manager();
02056 if (owner_manager) {
02057 owner_manager->remove_writer (info.writer_id_);
02058 info.clear_owner_evaluated ();
02059 }
02060 #endif
02061
02062 bool liveliness_changed = false;
02063
02064 if (info.state_ == WriterInfo::ALIVE) {
02065 -- liveliness_changed_status_.alive_count;
02066 -- liveliness_changed_status_.alive_count_change;
02067 liveliness_changed = true;
02068 }
02069
02070 if (info.state_ == WriterInfo::DEAD) {
02071 -- liveliness_changed_status_.not_alive_count;
02072 -- liveliness_changed_status_.not_alive_count_change;
02073 liveliness_changed = true;
02074 }
02075
02076 liveliness_changed_status_.last_publication_handle = info.handle_;
02077 instances_liveliness_update(info, ACE_OS::gettimeofday());
02078
02079 if (liveliness_changed) {
02080 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02081 this->notify_liveliness_change();
02082 }
02083 }
02084
02085 void
02086 DataReaderImpl::writer_became_alive(WriterInfo& info,
02087 const ACE_Time_Value& )
02088 {
02089 if (DCPS_debug_level >= 5) {
02090 GuidConverter reader_converter(subscription_id_);
02091 GuidConverter writer_converter(info.writer_id_);
02092 ACE_DEBUG((LM_DEBUG,
02093 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02094 ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02095 OPENDDS_STRING(reader_converter).c_str(),
02096 OPENDDS_STRING(writer_converter).c_str(),
02097 info.get_state_str().c_str()));
02098 }
02099
02100
02101
02102
02103
02104 bool liveliness_changed = false;
02105
02106 if (info.state_ != WriterInfo::ALIVE) {
02107 liveliness_changed_status_.alive_count++;
02108 liveliness_changed_status_.alive_count_change++;
02109 liveliness_changed = true;
02110 }
02111
02112 if (info.state_ == WriterInfo::DEAD) {
02113 liveliness_changed_status_.not_alive_count--;
02114 liveliness_changed_status_.not_alive_count_change--;
02115 liveliness_changed = true;
02116 }
02117
02118 liveliness_changed_status_.last_publication_handle = info.handle_;
02119
02120 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02121
02122 if (liveliness_changed_status_.alive_count < 0) {
02123 ACE_ERROR((LM_ERROR,
02124 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02125 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02126 liveliness_changed_status_.alive_count));
02127 return;
02128 }
02129
02130 if (liveliness_changed_status_.not_alive_count < 0) {
02131 ACE_ERROR((LM_ERROR,
02132 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02133 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02134 liveliness_changed_status_.not_alive_count));
02135 return;
02136 }
02137
02138
02139
02140 info.state_ = WriterInfo::ALIVE;
02141
02142 if (this->monitor_) {
02143 this->monitor_->report();
02144 }
02145
02146
02147 if (liveliness_changed) {
02148
02149
02150
02151 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02152 this->notify_liveliness_change();
02153 }
02154
02155
02156 liveliness_timer_->check_liveliness();
02157 }
02158
02159 void
02160 DataReaderImpl::writer_became_dead(WriterInfo & info,
02161 const ACE_Time_Value& when)
02162 {
02163 if (DCPS_debug_level >= 5) {
02164 GuidConverter reader_converter(subscription_id_);
02165 GuidConverter writer_converter(info.writer_id_);
02166 ACE_DEBUG((LM_DEBUG,
02167 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02168 ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02169
02170 OPENDDS_STRING(reader_converter).c_str(),
02171 OPENDDS_STRING(writer_converter).c_str(),
02172 info.get_state_str().c_str()));
02173 }
02174
02175 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02176 OwnershipManagerPtr owner_manager = this->ownership_manager();
02177 if (owner_manager) {
02178 owner_manager->remove_writer (info.writer_id_);
02179 info.clear_owner_evaluated ();
02180 }
02181 #endif
02182
02183
02184 bool liveliness_changed = false;
02185
02186 if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02187 liveliness_changed_status_.not_alive_count++;
02188 liveliness_changed_status_.not_alive_count_change++;
02189 liveliness_changed = true;
02190 }
02191
02192 if (info.state_ == WriterInfo::ALIVE) {
02193 liveliness_changed_status_.alive_count--;
02194 liveliness_changed_status_.alive_count_change--;
02195 liveliness_changed_status_.not_alive_count++;
02196 liveliness_changed_status_.not_alive_count_change++;
02197 liveliness_changed = true;
02198 }
02199
02200 liveliness_changed_status_.last_publication_handle = info.handle_;
02201
02202
02203 info.state_ = WriterInfo::DEAD;
02204
02205 if (this->monitor_) {
02206 this->monitor_->report();
02207 }
02208
02209 if (liveliness_changed_status_.alive_count < 0) {
02210 ACE_ERROR((LM_ERROR,
02211 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02212 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02213 liveliness_changed_status_.alive_count));
02214 return;
02215 }
02216
02217 if (liveliness_changed_status_.not_alive_count < 0) {
02218 ACE_ERROR((LM_ERROR,
02219 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02220 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02221 liveliness_changed_status_.not_alive_count));
02222 return;
02223 }
02224
02225 instances_liveliness_update(info, when);
02226
02227
02228 if (liveliness_changed) {
02229 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02230 this->notify_liveliness_change();
02231 }
02232 }
02233
02234 void
02235 DataReaderImpl::instances_liveliness_update(WriterInfo& info,
02236 const ACE_Time_Value& when)
02237 {
02238 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02239 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02240 next = iter; iter != instances_.end(); iter = next) {
02241 ++next;
02242 iter->second->instance_state_.writer_became_dead(
02243 info.writer_id_, liveliness_changed_status_.alive_count, when);
02244 }
02245 }
02246
02247 void
02248 DataReaderImpl::set_sample_lost_status(
02249 const DDS::SampleLostStatus& status)
02250 {
02251
02252 sample_lost_status_ = status;
02253 }
02254
02255 void
02256 DataReaderImpl::set_sample_rejected_status(
02257 const DDS::SampleRejectedStatus& status)
02258 {
02259
02260 sample_rejected_status_ = status;
02261 }
02262
02263 void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
02264 SubscriptionInstance_rch&)
02265 {
02266 if (DCPS_debug_level > 0) {
02267 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02268 }
02269 }
02270
02271 void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
02272 {
02273 StatsMapType::iterator location
02274 = this->statistics_.find(sample.header_.publication_id_);
02275
02276 if (location != this->statistics_.end()) {
02277 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02278
02279
02280
02281 if ((this->statistics_enabled()) ||
02282 (this->qos_.latency_budget.duration > zero)) {
02283
02284 ACE_Time_Value latency = ACE_OS::gettimeofday();
02285
02286
02287 DDS::Duration_t then = {
02288 sample.header_.source_timestamp_sec_,
02289 sample.header_.source_timestamp_nanosec_
02290 };
02291
02292
02293 latency -= duration_to_time_value(then);
02294
02295 if (this->statistics_enabled()) {
02296 location->second.add_stat(latency);
02297 }
02298
02299 if (DCPS_debug_level > 9) {
02300 ACE_DEBUG((LM_DEBUG,
02301 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02302 ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02303 latency.sec(),
02304 latency.msec()));
02305 }
02306
02307 if (this->qos_.latency_budget.duration > zero) {
02308
02309 if (time_value_to_duration(latency) > this->qos_.latency_budget.duration) {
02310 this->notify_latency(sample.header_.publication_id_);
02311 }
02312 }
02313 }
02314 } else if (DCPS_debug_level > 0) {
02315
02316
02317
02318
02319
02320
02321
02322
02323 GuidConverter reader_converter(subscription_id_);
02324 GuidConverter writer_converter(sample.header_.publication_id_);
02325 ACE_DEBUG((LM_DEBUG,
02326 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02327 ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02328 OPENDDS_STRING(reader_converter).c_str(),
02329 OPENDDS_STRING(writer_converter).c_str()));
02330 }
02331 }
02332
02333 void DataReaderImpl::notify_latency(PublicationId writer)
02334 {
02335
02336
02337 DataReaderListener_var listener
02338 = DataReaderListener::_narrow(this->listener_.in());
02339
02340 if (!CORBA::is_nil(listener.in())) {
02341 WriterIdSeq writerIds;
02342 writerIds.length(1);
02343 writerIds[ 0] = writer;
02344
02345 DDS::InstanceHandleSeq handles;
02346 this->lookup_instance_handles(writerIds, handles);
02347
02348 if (handles.length() >= 1) {
02349 this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02350
02351 } else {
02352 this->budget_exceeded_status_.last_instance_handle = -1;
02353 }
02354
02355 ++this->budget_exceeded_status_.total_count;
02356 ++this->budget_exceeded_status_.total_count_change;
02357
02358 listener->on_budget_exceeded(this, this->budget_exceeded_status_);
02359
02360 this->budget_exceeded_status_.total_count_change = 0;
02361 }
02362 }
02363
02364 #ifndef OPENDDS_SAFETY_PROFILE
02365 void
02366 DataReaderImpl::get_latency_stats(
02367 OpenDDS::DCPS::LatencyStatisticsSeq & stats)
02368 {
02369 stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02370 int index = 0;
02371
02372 for (StatsMapType::const_iterator current = this->statistics_.begin();
02373 current != this->statistics_.end();
02374 ++current, ++index) {
02375 stats[ index] = current->second.get_stats();
02376 stats[ index].publication = current->first;
02377 }
02378 }
02379 #endif
02380
02381 void
02382 DataReaderImpl::reset_latency_stats()
02383 {
02384 for (StatsMapType::iterator current = this->statistics_.begin();
02385 current != this->statistics_.end();
02386 ++current) {
02387 current->second.reset_stats();
02388 }
02389 }
02390
02391 CORBA::Boolean
02392 DataReaderImpl::statistics_enabled()
02393 {
02394 return this->statistics_enabled_;
02395 }
02396
02397 void
02398 DataReaderImpl::statistics_enabled(
02399 CORBA::Boolean statistics_enabled)
02400 {
02401 this->statistics_enabled_ = statistics_enabled;
02402 }
02403
02404 void
02405 DataReaderImpl::prepare_to_delete()
02406 {
02407 this->set_deleted(true);
02408 this->stop_associating();
02409 this->send_final_acks();
02410 }
02411
02412 SubscriptionInstance_rch
02413 DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02414 {
02415 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
02416
02417 SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02418 if (iter == instances_.end()) {
02419 ACE_DEBUG((LM_WARNING,
02420 ACE_TEXT("(%P|%t) WARNING: ")
02421 ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02422 ACE_TEXT("lookup for 0x%x failed\n"),
02423 handle));
02424 return SubscriptionInstance_rch();
02425 }
02426
02427 return iter->second;
02428 }
02429
02430 DDS::InstanceHandle_t
02431 DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
02432 {
02433 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02434 if (!participant)
02435 return 0;
02436
02437 if (is_bit()) {
02438 Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02439 CORBA::String_var topic = topic_servant_->get_name();
02440
02441 RepoId id = disc->bit_key_to_repo_id(participant.in(), topic, key);
02442 return participant->id_to_handle(id);
02443
02444 } else {
02445 return participant->id_to_handle(GUID_UNKNOWN);
02446 }
02447 }
02448
02449 void
02450 DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
02451 {
02452 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02453
02454
02455
02456 DataReaderListener_var the_listener
02457 = DataReaderListener::_narrow(this->listener_.in());
02458
02459 if (!CORBA::is_nil(the_listener.in())) {
02460 SubscriptionLostStatus status;
02461
02462
02463
02464 this->lookup_instance_handles(pubids, status.publication_handles);
02465 the_listener->on_subscription_disconnected(this, status);
02466 }
02467 }
02468
02469 void
02470 DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
02471 {
02472 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02473
02474 if (!this->is_bit_) {
02475
02476
02477 DataReaderListener_var the_listener
02478 = DataReaderListener::_narrow(this->listener_.in());
02479
02480 if (!CORBA::is_nil(the_listener.in())) {
02481 SubscriptionLostStatus status;
02482
02483
02484 this->lookup_instance_handles(pubids, status.publication_handles);
02485
02486 the_listener->on_subscription_reconnected(this, status);
02487 }
02488 }
02489 }
02490
02491 void
02492 DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
02493 {
02494 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02495
02496 if (!this->is_bit_) {
02497
02498
02499 DataReaderListener_var the_listener
02500 = DataReaderListener::_narrow(this->listener_.in());
02501
02502 if (!CORBA::is_nil(the_listener.in())) {
02503 SubscriptionLostStatus status;
02504
02505 CORBA::ULong len = handles.length();
02506 status.publication_handles.length(len);
02507
02508 for (CORBA::ULong i = 0; i < len; ++ i) {
02509 status.publication_handles[i] = handles[i];
02510 }
02511
02512 the_listener->on_subscription_lost(this, status);
02513 }
02514 }
02515 }
02516
02517 void
02518 DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
02519 {
02520 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02521
02522
02523
02524 DataReaderListener_var the_listener
02525 = DataReaderListener::_narrow(this->listener_.in());
02526
02527 if (!CORBA::is_nil(the_listener.in())) {
02528 SubscriptionLostStatus status;
02529
02530
02531
02532 this->lookup_instance_handles(pubids, status.publication_handles);
02533 the_listener->on_subscription_lost(this, status);
02534 }
02535 }
02536
02537
02538 void
02539 DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
02540 DDS::InstanceHandleSeq & hdls)
02541 {
02542 CORBA::ULong const num_wrts = ids.length();
02543
02544 if (DCPS_debug_level > 9) {
02545 const char* separator = "";
02546 OPENDDS_STRING guids;
02547
02548 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02549 guids += separator;
02550 guids += OPENDDS_STRING(GuidConverter(ids[i]));
02551 separator = ", ";
02552 }
02553
02554 ACE_DEBUG((LM_DEBUG,
02555 ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02556 ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02557 guids.c_str()));
02558 }
02559
02560 hdls.length(num_wrts);
02561
02562 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563 if (participant) {
02564 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02565 hdls[i] = participant->id_to_handle(ids[i]);
02566 }
02567 }
02568 }
02569
02570 bool
02571 DataReaderImpl::filter_sample(const DataSampleHeader& header)
02572 {
02573 ACE_Time_Value now(ACE_OS::gettimeofday());
02574
02575
02576 if (!always_get_history_ && header.historic_sample_
02577 && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02578 if (DCPS_debug_level >= 8) {
02579 ACE_DEBUG((LM_DEBUG,
02580 ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02581 ACE_TEXT("Discarded historic data.\n")));
02582 }
02583
02584 return true;
02585 }
02586
02587
02588
02589 if (header.lifespan_duration_) {
02590
02591
02592 DDS::Time_t const tmp = {
02593 header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02594 header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02595 };
02596
02597
02598
02599 ACE_Time_Value const expiration_time(
02600 OpenDDS::DCPS::time_to_time_value(tmp));
02601
02602 if (now >= expiration_time) {
02603 if (DCPS_debug_level >= 8) {
02604 ACE_Time_Value const diff(now - expiration_time);
02605 ACE_DEBUG((LM_DEBUG,
02606 ACE_TEXT("OpenDDS (%P|%t) Received data ")
02607 ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02608 diff.sec(),
02609 diff.usec()));
02610 }
02611
02612 return true;
02613 }
02614 }
02615
02616 return false;
02617 }
02618
02619 bool
02620
02621 DataReaderImpl::ownership_filter_instance(const SubscriptionInstance_rch& instance,
02622 const PublicationId& pubid)
02623 {
02624 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02625 if (this->is_exclusive_ownership_) {
02626
02627 WriterMapType::iterator iter = writers_.find(pubid);
02628
02629 if (iter == writers_.end()) {
02630 if (DCPS_debug_level > 4) {
02631
02632
02633
02634 GuidConverter reader_converter(subscription_id_);
02635 GuidConverter writer_converter(pubid);
02636 ACE_DEBUG((LM_DEBUG,
02637 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02638 ACE_TEXT("reader %C is not associated with writer %C.\n"),
02639 OPENDDS_STRING(reader_converter).c_str(),
02640 OPENDDS_STRING(writer_converter).c_str()));
02641 }
02642 return true;
02643 }
02644
02645
02646
02647
02648 if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02649 || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02650 OwnershipManagerPtr owner_manager = this->ownership_manager();
02651
02652 bool is_owner = owner_manager && owner_manager->select_owner (
02653 instance->instance_handle_,
02654 iter->second->writer_id_,
02655 iter->second->writer_qos_.ownership_strength.value,
02656 &instance->instance_state_);
02657 iter->second->set_owner_evaluated (instance->instance_handle_, true);
02658
02659 if (! is_owner) {
02660 if (DCPS_debug_level >= 1) {
02661 GuidConverter reader_converter(subscription_id_);
02662 GuidConverter writer_converter(pubid);
02663 GuidConverter owner_converter (instance->instance_state_.get_owner ());
02664 ACE_DEBUG((LM_DEBUG,
02665 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02666 ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02667 OPENDDS_STRING(reader_converter).c_str(),
02668 OPENDDS_STRING(writer_converter).c_str(),
02669 OPENDDS_STRING(owner_converter).c_str()));
02670 }
02671 return true;
02672 }
02673 }
02674 else if (! (instance->instance_state_.get_owner () == pubid)) {
02675 if (DCPS_debug_level >= 1) {
02676 GuidConverter reader_converter(subscription_id_);
02677 GuidConverter writer_converter(pubid);
02678 GuidConverter owner_converter (instance->instance_state_.get_owner ());
02679 ACE_DEBUG((LM_DEBUG,
02680 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02681 ACE_TEXT("reader %C writer %C is not owner %C\n"),
02682 OPENDDS_STRING(reader_converter).c_str(),
02683 OPENDDS_STRING(writer_converter).c_str(),
02684 OPENDDS_STRING(owner_converter).c_str()));
02685 }
02686 return true;
02687 }
02688 }
02689 #else
02690 ACE_UNUSED_ARG(pubid);
02691 ACE_UNUSED_ARG(instance);
02692 #endif
02693 return false;
02694 }
02695
02696 bool
02697 DataReaderImpl::time_based_filter_instance(const SubscriptionInstance_rch& instance, ACE_Time_Value& filter_time_expired)
02698 {
02699 ACE_Time_Value now(ACE_OS::gettimeofday());
02700
02701
02702
02703 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02704
02705 if (qos_.time_based_filter.minimum_separation > zero) {
02706 filter_time_expired = now - instance->last_accepted_;
02707 DDS::Duration_t separation = time_value_to_duration(filter_time_expired);
02708
02709 if (separation < qos_.time_based_filter.minimum_separation) {
02710 return true;
02711 }
02712 }
02713
02714 instance->last_accepted_ = now;
02715
02716 return false;
02717 }
02718
02719 bool DataReaderImpl::is_bit() const
02720 {
02721 return this->is_bit_;
02722 }
02723
02724 bool
02725 DataReaderImpl::has_zero_copies()
02726 {
02727 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02728 guard,
02729 this->sample_lock_,
02730 true );
02731 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
02732
02733 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02734 iter != instances_.end();
02735 ++iter) {
02736 SubscriptionInstance_rch ptr = iter->second;
02737
02738 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02739 item != 0; item = item->next_data_sample_) {
02740 if (item->zero_copy_cnt_ > 0) {
02741 return true;
02742 }
02743 }
02744 }
02745
02746 return false;
02747 }
02748
02749 void DataReaderImpl::notify_liveliness_change()
02750 {
02751
02752
02753
02754 DDS::DataReaderListener_var listener
02755 = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02756
02757 if (!CORBA::is_nil(listener.in())) {
02758 listener->on_liveliness_changed(this, liveliness_changed_status_);
02759
02760 liveliness_changed_status_.alive_count_change = 0;
02761 liveliness_changed_status_.not_alive_count_change = 0;
02762 }
02763 notify_status_condition();
02764
02765 if (DCPS_debug_level > 9) {
02766 OPENDDS_STRING output_str;
02767 output_str += "subscription ";
02768 output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02769 output_str += ", listener at: 0x";
02770 output_str += to_dds_string(this->listener_.in ());
02771
02772 for (WriterMapType::iterator current = this->writers_.begin();
02773 current != this->writers_.end();
02774 ++current) {
02775 RepoId id = current->first;
02776 output_str += "\n\tNOTIFY: writer[ ";
02777 output_str += OPENDDS_STRING(GuidConverter(id));
02778 output_str += "] == ";
02779 output_str += current->second->get_state_str();
02780 }
02781
02782 output_str + "\n";
02783 ACE_DEBUG((LM_DEBUG,
02784 ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02785 ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02786 ACE_TEXT("\tNOTIFY: %C\n"),
02787 listener.in (),
02788 listener_mask_,
02789 output_str.c_str()));
02790 }
02791 }
02792
02793 void DataReaderImpl::post_read_or_take()
02794 {
02795 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02796 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02797 if (subscriber)
02798 subscriber->set_status_changed_flag(
02799 DDS::DATA_ON_READERS_STATUS, false);
02800 }
02801
02802 void DataReaderImpl::reschedule_deadline()
02803 {
02804 if (this->watchdog_.in()) {
02805 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02806 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02807 iter != this->instances_.end();
02808 ++iter) {
02809 if (iter->second->deadline_timer_id_ != -1) {
02810 if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02811 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02812 ACE_TEXT("reset_timer_interval")));
02813 }
02814 }
02815 }
02816 }
02817 }
02818
02819 ACE_Reactor_Timer_Interface*
02820 DataReaderImpl::get_reactor()
02821 {
02822 return this->reactor_;
02823 }
02824
02825 OpenDDS::DCPS::RepoId
02826 DataReaderImpl::get_topic_id()
02827 {
02828 return this->topic_servant_->get_id();
02829 }
02830
02831 OpenDDS::DCPS::RepoId
02832 DataReaderImpl::get_dp_id()
02833 {
02834 return dp_id_;
02835 }
02836
02837 void
02838 DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02839 {
02840 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02841 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02842
02843 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02844 end = instances_.end(); iter != end; ++iter) {
02845 instance_handles.push_back(iter->first);
02846 }
02847 }
02848
02849 void
02850 DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
02851 {
02852 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02853 read_guard,
02854 this->writers_lock_);
02855 for (WriterMapType::iterator iter = writers_.begin();
02856 iter != writers_.end();
02857 ++iter) {
02858 writer_states.push_back(WriterStatePair(iter->first,
02859 iter->second->get_state()));
02860 }
02861 }
02862
02863 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02864 void
02865 DataReaderImpl::update_ownership_strength (const PublicationId& pub_id,
02866 const CORBA::Long& ownership_strength)
02867 {
02868 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02869 read_guard,
02870 this->writers_lock_);
02871 for (WriterMapType::iterator iter = writers_.begin();
02872 iter != writers_.end();
02873 ++iter) {
02874 if (iter->second->writer_id_ == pub_id) {
02875 if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
02876 if (DCPS_debug_level >= 1) {
02877 GuidConverter reader_converter(this->subscription_id_);
02878 GuidConverter writer_converter(pub_id);
02879 ACE_DEBUG((LM_DEBUG,
02880 ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
02881 ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
02882 OPENDDS_STRING(reader_converter).c_str(),
02883 OPENDDS_STRING(writer_converter).c_str(),
02884 iter->second->writer_qos_.ownership_strength, ownership_strength));
02885 }
02886 iter->second->writer_qos_.ownership_strength.value = ownership_strength;
02887 iter->second->clear_owner_evaluated ();
02888 }
02889 break;
02890 }
02891 }
02892 }
02893 #endif
02894
02895 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02896 bool DataReaderImpl::verify_coherent_changes_completion (WriterInfo* writer)
02897 {
02898 if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
02899 || ! this->subqos_.presentation.coherent_access) {
02900 this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02901 this->coherent_changes_completed (this);
02902 return true;
02903 }
02904
02905
02906 Coherent_State state = writer->coherent_change_received();
02907 if (writer->group_coherent_) {
02908 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02909 if (subscriber && state != NOT_COMPLETED_YET) {
02910
02911 subscriber->coherent_change_received (
02912 writer->publisher_id_, this, state);
02913 }
02914 }
02915 else {
02916 if (state == COMPLETED) {
02917 this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02918 }
02919 else if (state == REJECTED) {
02920 this->reject_coherent (writer->writer_id_, writer->publisher_id_);
02921 }
02922 else {
02923 return false;
02924 }
02925
02926
02927 writer->reset_coherent_info ();
02928 }
02929
02930 return state == COMPLETED;
02931 }
02932
02933
02934 void DataReaderImpl::accept_coherent (PublicationId& writer_id,
02935 RepoId& publisher_id)
02936 {
02937 if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02938 GuidConverter reader (this->subscription_id_);
02939 GuidConverter writer (writer_id);
02940 GuidConverter publisher (publisher_id);
02941 ACE_DEBUG((LM_DEBUG,
02942 ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
02943 ACE_TEXT(" reader %C writer %C publisher %C \n"),
02944 OPENDDS_STRING(reader).c_str(),
02945 OPENDDS_STRING(writer).c_str(),
02946 OPENDDS_STRING(publisher).c_str()));
02947 }
02948
02949 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02950 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02951
02952 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02953 iter != this->instances_.end(); ++iter) {
02954 iter->second->rcvd_strategy_->accept_coherent(
02955 writer_id, publisher_id);
02956 }
02957 }
02958
02959
02960 void DataReaderImpl::reject_coherent (PublicationId& writer_id,
02961 RepoId& publisher_id)
02962 {
02963 if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02964 GuidConverter reader (this->subscription_id_);
02965 GuidConverter writer (writer_id);
02966 GuidConverter publisher (publisher_id);
02967 ACE_DEBUG((LM_DEBUG,
02968 ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
02969 ACE_TEXT(" reader %C writer %C publisher %C \n"),
02970 OPENDDS_STRING(reader).c_str(),
02971 OPENDDS_STRING(writer).c_str(),
02972 OPENDDS_STRING(publisher).c_str()));
02973 }
02974
02975 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02976 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02977
02978 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02979 iter != this->instances_.end(); ++iter) {
02980 iter->second->rcvd_strategy_->reject_coherent(
02981 writer_id, publisher_id);
02982 }
02983 this->reset_coherent_info (writer_id, publisher_id);
02984 }
02985
02986
02987 void DataReaderImpl::reset_coherent_info (const PublicationId& writer_id,
02988 const RepoId& publisher_id)
02989 {
02990 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
02991
02992 WriterMapType::iterator itEnd = this->writers_.end();
02993 for (WriterMapType::iterator it = this->writers_.begin();
02994 it != itEnd; ++it) {
02995 if (it->second->writer_id_ == writer_id
02996 && it->second->publisher_id_ == publisher_id) {
02997 it->second->reset_coherent_info();
02998 }
02999 }
03000 }
03001
03002
03003 void
03004 DataReaderImpl::coherent_change_received (RepoId publisher_id, Coherent_State& result)
03005 {
03006 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03007
03008 result = COMPLETED;
03009 for (WriterMapType::iterator iter = writers_.begin();
03010 iter != writers_.end();
03011 ++iter) {
03012
03013 if (iter->second->publisher_id_ == publisher_id) {
03014 const Coherent_State state = iter->second->coherent_change_received();
03015 if (state == NOT_COMPLETED_YET) {
03016 result = NOT_COMPLETED_YET;
03017 break;
03018 }
03019 else if (state == REJECTED) {
03020 result = REJECTED;
03021 }
03022 }
03023 }
03024 }
03025
03026
03027 void
03028 DataReaderImpl::coherent_changes_completed (DataReaderImpl* reader)
03029 {
03030 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
03031 if (!subscriber)
03032 return;
03033
03034 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03035 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03036
03037 ::DDS::SubscriberListener_var sub_listener =
03038 subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
03039 if (!CORBA::is_nil(sub_listener.in()))
03040 {
03041 if (reader == this) {
03042
03043 ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03044 sub_listener->on_data_on_readers(subscriber.in());
03045 }
03046
03047 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03048 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03049 }
03050 else
03051 {
03052 subscriber->notify_status_condition();
03053
03054 ::DDS::DataReaderListener_var listener =
03055 this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03056
03057 if (!CORBA::is_nil(listener.in()))
03058 {
03059 if (reader == this) {
03060
03061 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03062 listener->on_data_available(this);
03063 } else {
03064 listener->on_data_available(this);
03065 }
03066
03067 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03068 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03069 }
03070 else
03071 {
03072 this->notify_status_condition();
03073 }
03074 }
03075 }
03076
03077
03078 void DataReaderImpl::begin_access()
03079 {
03080 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03081 this->coherent_ = true;
03082 }
03083
03084
03085 void DataReaderImpl::end_access()
03086 {
03087 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03088 this->coherent_ = false;
03089 this->group_coherent_ordered_data_.reset();
03090 this->post_read_or_take();
03091 }
03092
03093
03094 void DataReaderImpl::get_ordered_data (GroupRakeData& data,
03095 DDS::SampleStateMask sample_states,
03096 DDS::ViewStateMask view_states,
03097 DDS::InstanceStateMask instance_states)
03098 {
03099 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03100 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03101
03102 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03103 iter != instances_.end(); ++iter) {
03104 SubscriptionInstance_rch ptr = iter->second;
03105 if ((ptr->instance_state_.view_state() & view_states) &&
03106 (ptr->instance_state_.instance_state() & instance_states)) {
03107 size_t i(0);
03108 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03109 item != 0; item = item->next_data_sample_) {
03110 if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03111 data.insert_sample(item, ptr, ++i);
03112 this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03113 }
03114 }
03115 }
03116 }
03117 }
03118
03119 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
03120
03121 void
03122 DataReaderImpl::set_subscriber_qos(
03123 const DDS::SubscriberQos & qos)
03124 {
03125 this->subqos_ = qos;
03126 }
03127
03128 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
03129 void
03130 DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
03131 {
03132 cft->add_reader(*this);
03133 content_filtered_topic_ = cft;
03134 }
03135
03136 DDS::ContentFilteredTopic_ptr
03137 DataReaderImpl::get_cf_topic() const
03138 {
03139 return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
03140 }
03141 #endif
03142
03143 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
03144
03145 void
03146 DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
03147 {
03148 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03149 disco->update_subscription_params(domain_id_,
03150 dp_id_,
03151 subscription_id_,
03152 params);
03153 }
03154 #endif
03155
03156 void
03157 DataReaderImpl::reset_ownership (::DDS::InstanceHandle_t instance)
03158 {
03159 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03160 for (WriterMapType::iterator iter = writers_.begin();
03161 iter != writers_.end();
03162 ++iter) {
03163 iter->second->set_owner_evaluated(instance, false);
03164 }
03165 }
03166
03167 void
03168 DataReaderImpl::resume_sample_processing(const PublicationId& pub_id)
03169 {
03170 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03171 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03172 WriterMapType::iterator where = writers_.find(pub_id);
03173 if (writers_.end() != where) {
03174 WriterInfo& info = *where->second;
03175
03176 if (info.waiting_for_end_historic_samples_) {
03177 end_historic_sweeper_->cancel_timer(where->second);
03178 if (!info.historic_samples_.empty()) {
03179 info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03180 }
03181 to_deliver.swap(info.historic_samples_);
03182 write_guard.release();
03183 deliver_historic(to_deliver);
03184 }
03185 }
03186 }
03187
03188 bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
03189 {
03190 ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03191 WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03192 if (iter != writers_.end()) {
03193 const SequenceNumber& seq = sample.header_.sequence_;
03194 if (iter->second->waiting_for_end_historic_samples_) {
03195 iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03196 return false;
03197 }
03198 if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03199 && !sample.header_.historic_sample_
03200 && seq <= iter->second->last_historic_seq_) {
03201
03202 return false;
03203 }
03204 }
03205 return true;
03206 }
03207
03208 void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
03209 {
03210 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03211 const iter_t end = samples.end();
03212 for (iter_t iter = samples.begin(); iter != end; ++iter) {
03213 iter->second.header_.historic_sample_ = true;
03214 data_received(iter->second);
03215 }
03216 }
03217
03218 void
03219 DataReaderImpl::add_link(const DataLink_rch& link, const RepoId& peer)
03220 {
03221 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03222
03223 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03224
03225 WriterMapType::iterator it = writers_.find(peer);
03226 if (it != writers_.end()) {
03227
03228
03229 end_historic_sweeper_->schedule_timer(it->second);
03230 }
03231 }
03232 TransportClient::add_link(link, peer);
03233 TransportImpl& impl = link->impl();
03234 OPENDDS_STRING type = impl.transport_type();
03235
03236 if (type == "rtps_udp" || type == "multicast") {
03237 resume_sample_processing(peer);
03238 }
03239 }
03240
03241 void
03242 DataReaderImpl::register_for_writer(const RepoId& participant,
03243 const RepoId& readerid,
03244 const RepoId& writerid,
03245 const TransportLocatorSeq& locators,
03246 DiscoveryListener* listener)
03247 {
03248 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03249 }
03250
03251 void
03252 DataReaderImpl::unregister_for_writer(const RepoId& participant,
03253 const RepoId& readerid,
03254 const RepoId& writerid)
03255 {
03256 TransportClient::unregister_for_writer(participant, readerid, writerid);
03257 }
03258
03259 void DataReaderImpl::accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance)
03260 {
03261 bool accepted = true;
03262 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03263 bool verify_coherent = false;
03264 #endif
03265 RcHandle<WriterInfo> writer;
03266
03267 if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
03268 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
03269
03270 WriterMapType::iterator where = writers_.find(header.publication_id_);
03271
03272 if (where != writers_.end()) {
03273 if (header.coherent_change_) {
03274
03275 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03276
03277 where->second->group_coherent_ = header.group_coherent_;
03278 where->second->publisher_id_ = header.publisher_id_;
03279 ++where->second->coherent_samples_;
03280 verify_coherent = true;
03281 #endif
03282 writer = where->second;
03283 }
03284 }
03285 else {
03286 GuidConverter subscriptionBuffer(subscription_id_);
03287 GuidConverter publicationBuffer(header.publication_id_);
03288 ACE_DEBUG((LM_WARNING,
03289 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
03290 ACE_TEXT("subscription %C failed to find ")
03291 ACE_TEXT("publication data for %C.\n"),
03292 OPENDDS_STRING(subscriptionBuffer).c_str(),
03293 OPENDDS_STRING(publicationBuffer).c_str()));
03294 }
03295 }
03296
03297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03298 if (verify_coherent) {
03299 accepted = verify_coherent_changes_completion(writer.in());
03300 }
03301 #endif
03302
03303 if (instance && watchdog_.in()) {
03304 instance->last_sample_tv_ = instance->cur_sample_tv_;
03305 instance->cur_sample_tv_ = ACE_OS::gettimeofday();
03306
03307
03308 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03309 if (is_new_instance) {
03310 watchdog_->schedule_timer(instance);
03311 } else {
03312 watchdog_->execute(instance, false);
03313 }
03314 }
03315
03316 if (accepted) {
03317 notify_read_conditions();
03318 }
03319 }
03320
03321 #if defined(OPENDDS_SECURITY)
03322 DDS::Security::ParticipantCryptoHandle DataReaderImpl::get_crypto_handle() const
03323 {
03324 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
03325 return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
03326 }
03327 #endif
03328
03329 EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
03330 ACE_thread_t owner,
03331 DataReaderImpl* reader)
03332 : ReactorInterceptor (reactor, owner)
03333 , reader_(*reader)
03334 { }
03335
03336 EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
03337 { }
03338
03339 void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03340 {
03341 info->waiting_for_end_historic_samples_ = true;
03342 ScheduleCommand c(this, info);
03343 execute_or_enqueue(c);
03344 }
03345
03346 void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03347 {
03348 info->waiting_for_end_historic_samples_ = false;
03349 CancelCommand c(this, info);
03350 execute_or_enqueue(c);
03351 }
03352
03353 int EndHistoricSamplesMissedSweeper::handle_timeout(
03354 const ACE_Time_Value& ,
03355 const void* arg)
03356 {
03357
03358 WriterInfo* const info =
03359 const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
03360 const PublicationId pub_id = info->writer_id_;
03361
03362 {
03363 ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
03364 info_set_.erase(rchandle_from(info));
03365 }
03366
03367 RcHandle<DataReaderImpl> reader = reader_.lock();
03368 if (!reader)
03369 return 0;
03370
03371 if (DCPS_debug_level >= 1) {
03372 GuidConverter sub_repo(reader->get_repo_id());
03373 GuidConverter pub_repo(pub_id);
03374 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
03375 OPENDDS_STRING(sub_repo).c_str(),
03376 OPENDDS_STRING(pub_repo).c_str()));
03377 }
03378
03379 reader->resume_sample_processing(pub_id);
03380 return 0;
03381 }
03382
03383 void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
03384 {
03385 static const ACE_Time_Value ten_seconds(10);
03386
03387 const void* arg = reinterpret_cast<const void*>(info_.in());
03388
03389 info_->historic_samples_timer_ = sweeper_->reactor()->schedule_timer(sweeper_,
03390 arg,
03391 ten_seconds);
03392 sweeper_->info_set_.insert(info_);
03393 if (DCPS_debug_level) {
03394 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", info_->historic_samples_timer_));
03395 }
03396 }
03397
03398 void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
03399 {
03400 if (info_->historic_samples_timer_ != WriterInfo::NO_TIMER) {
03401 sweeper_->reactor()->cancel_timer(info_->historic_samples_timer_);
03402 if (DCPS_debug_level) {
03403 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", info_->historic_samples_timer_));
03404 }
03405 info_->historic_samples_timer_ = WriterInfo::NO_TIMER;
03406 sweeper_->info_set_.erase(info_);
03407 }
03408 }
03409
03410 }
03411 }
03412
03413 OPENDDS_END_VERSIONED_NAMESPACE_DECL