00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataReaderImpl.h"
00010 #include "tao/ORB_Core.h"
00011 #include "SubscriptionInstance.h"
00012 #include "ReceivedDataElementList.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "FeatureDisabledQosCheck.h"
00017 #include "GuidConverter.h"
00018 #include "TopicImpl.h"
00019 #include "Serializer.h"
00020 #include "SubscriberImpl.h"
00021 #include "Transient_Kludge.h"
00022 #include "Util.h"
00023 #include "RequestedDeadlineWatchdog.h"
00024 #include "QueryConditionImpl.h"
00025 #include "ReadConditionImpl.h"
00026 #include "MonitorFactory.h"
00027 #include "dds/DCPS/transport/framework/EntryExit.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 #include "dds/DdsDcpsCoreC.h"
00030 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00031 #include "dds/DCPS/SafetyProfileStreams.h"
00032 #if !defined (DDS_HAS_MINIMUM_BIT)
00033 #include "BuiltInTopicUtils.h"
00034 #include "dds/DdsDcpsCoreTypeSupportC.h"
00035 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00036
00037 #include "ace/Reactor.h"
00038 #include "ace/Auto_Ptr.h"
00039 #include "ace/OS_NS_sys_time.h"
00040
00041 #include <cstdio>
00042 #include <stdexcept>
00043
00044 #if !defined (__ACE_INLINE__)
00045 # include "DataReaderImpl.inl"
00046 #endif
00047
00048 namespace OpenDDS {
00049 namespace DCPS {
00050
00051 DataReaderImpl::DataReaderImpl()
00052 : rd_allocator_(0),
00053 qos_(TheServiceParticipant->initial_DataReaderQos()),
00054 reverse_sample_lock_(sample_lock_),
00055 participant_servant_(0),
00056 topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058 is_exclusive_ownership_ (false),
00059 #endif
00060 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00061 owner_manager_ (0),
00062 #endif
00063 coherent_(false),
00064 subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00065 topic_desc_(0),
00066 listener_mask_(DEFAULT_STATUS_MASK),
00067 domain_id_(0),
00068 subscriber_servant_(0),
00069 end_historic_sweeper_(new EndHistoricSamplesMissedSweeper(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00070 remove_association_sweeper_(new RemoveAssociationSweeper<DataReaderImpl>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071 n_chunks_(TheServiceParticipant->n_chunks()),
00072 reverse_pub_handle_lock_(publication_handle_lock_),
00073 reactor_(0),
00074 liveliness_timer_(new LivelinessTimer(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00075 last_deadline_missed_total_count_(0),
00076 watchdog_(),
00077 is_bit_(false),
00078 initialized_(false),
00079 always_get_history_(false),
00080 statistics_enabled_(false),
00081 raw_latency_buffer_size_(0),
00082 raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00083 monitor_(0),
00084 periodic_monitor_(0),
00085 transport_disabled_(false)
00086 {
00087 reactor_ = TheServiceParticipant->timer();
00088
00089 liveliness_changed_status_.alive_count = 0;
00090 liveliness_changed_status_.not_alive_count = 0;
00091 liveliness_changed_status_.alive_count_change = 0;
00092 liveliness_changed_status_.not_alive_count_change = 0;
00093 liveliness_changed_status_.last_publication_handle =
00094 DDS::HANDLE_NIL;
00095
00096 requested_deadline_missed_status_.total_count = 0;
00097 requested_deadline_missed_status_.total_count_change = 0;
00098 requested_deadline_missed_status_.last_instance_handle =
00099 DDS::HANDLE_NIL;
00100
00101 requested_incompatible_qos_status_.total_count = 0;
00102 requested_incompatible_qos_status_.total_count_change = 0;
00103 requested_incompatible_qos_status_.last_policy_id = 0;
00104 requested_incompatible_qos_status_.policies.length(0);
00105
00106 subscription_match_status_.total_count = 0;
00107 subscription_match_status_.total_count_change = 0;
00108 subscription_match_status_.current_count = 0;
00109 subscription_match_status_.current_count_change = 0;
00110 subscription_match_status_.last_publication_handle =
00111 DDS::HANDLE_NIL;
00112
00113 sample_lost_status_.total_count = 0;
00114 sample_lost_status_.total_count_change = 0;
00115
00116 sample_rejected_status_.total_count = 0;
00117 sample_rejected_status_.total_count_change = 0;
00118 sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00119 sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00120
00121 this->budget_exceeded_status_.total_count = 0;
00122 this->budget_exceeded_status_.total_count_change = 0;
00123 this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00124
00125 monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00126 periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00127 }
00128
00129
00130
00131 DataReaderImpl::~DataReaderImpl()
00132 {
00133 DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00134
00135 {
00136 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00137 read_guard,
00138 this->writers_lock_);
00139
00140 WriterMapType::iterator writer;
00141 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00142 end_historic_sweeper_->cancel_timer(writer->second);
00143 remove_association_sweeper_->cancel_timer(writer->second);
00144 }
00145 }
00146
00147 end_historic_sweeper_->wait();
00148 end_historic_sweeper_->destroy();
00149
00150 remove_association_sweeper_->wait();
00151 remove_association_sweeper_->destroy();
00152
00153 liveliness_timer_->cancel_timer();
00154 liveliness_timer_->wait();
00155 liveliness_timer_->destroy();
00156
00157 if (initialized_) {
00158 delete rd_allocator_;
00159 }
00160 }
00161
00162
00163 void
00164 DataReaderImpl::cleanup()
00165 {
00166 {
00167
00168 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00169 guard,
00170 this->sample_lock_);
00171
00172 liveliness_timer_->cancel_timer();
00173 }
00174 liveliness_timer_->wait();
00175
00176
00177 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00178 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00179 iter != instances_.end();
00180 ++iter) {
00181 SubscriptionInstance *ptr = iter->second;
00182 if (this->watchdog_ && ptr->deadline_timer_id_ != -1) {
00183 this->watchdog_->cancel_timer(ptr);
00184 }
00185 }
00186 }
00187
00188 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00189 if (owner_manager_) {
00190 owner_manager_->unregister_reader(topic_servant_->type_name(), this);
00191 }
00192 #endif
00193
00194 if (topic_servant_) {
00195 topic_servant_->remove_entity_ref();
00196 topic_servant_->_remove_ref();
00197 }
00198
00199 dr_local_objref_ = DDS::DataReader::_nil();
00200
00201 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00202 if (!CORBA::is_nil(content_filtered_topic_.in())) {
00203 ContentFilteredTopicImpl* cft =
00204 dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in());
00205 cft->remove_reader(*this);
00206 cft->update_reader_count(false);
00207 content_filtered_topic_ = DDS::ContentFilteredTopic::_nil ();
00208 }
00209 #endif
00210
00211 {
00212 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00213 read_guard,
00214 this->writers_lock_);
00215
00216 WriterMapType::iterator writer;
00217 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00218 end_historic_sweeper_->cancel_timer(writer->second);
00219 remove_association_sweeper_->cancel_timer(writer->second);
00220 }
00221 }
00222
00223 end_historic_sweeper_->wait();
00224 remove_association_sweeper_->wait();
00225 }
00226
00227 void DataReaderImpl::init(
00228 TopicDescriptionImpl* a_topic_desc,
00229 const DDS::DataReaderQos & qos,
00230 DDS::DataReaderListener_ptr a_listener,
00231 const DDS::StatusMask & mask,
00232 DomainParticipantImpl* participant,
00233 SubscriberImpl* subscriber,
00234 DDS::DataReader_ptr dr_objref)
00235 {
00236 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00237 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00238 topic_servant_ = a_topic;
00239 topic_servant_->_add_ref();
00240
00241 topic_servant_->add_entity_ref();
00242 }
00243
00244 CORBA::String_var topic_name = a_topic_desc->get_name();
00245
00246 #if !defined (DDS_HAS_MINIMUM_BIT)
00247 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00248 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00249 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00250 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00251 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00252
00253 qos_ = qos;
00254
00255 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00256 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00257 #endif
00258
00259 listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00260 listener_mask_ = mask;
00261
00262
00263
00264 participant_servant_ = participant;
00265
00266 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00267 if (is_exclusive_ownership_) {
00268 owner_manager_ = participant_servant_->ownership_manager ();
00269 }
00270 #endif
00271
00272 domain_id_ = participant_servant_->get_domain_id();
00273
00274
00275
00276 subscriber_servant_ = subscriber;
00277 dr_local_objref_ = DDS::DataReader::_duplicate(dr_objref);
00278
00279 if (this->subscriber_servant_->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00280 ACE_DEBUG((LM_WARNING,
00281 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00282 ACE_TEXT("failed to get SubscriberQos\n")));
00283 }
00284
00285 initialized_ = true;
00286 }
00287
00288 DDS::InstanceHandle_t
00289 DataReaderImpl::get_instance_handle()
00290 {
00291 return this->participant_servant_->id_to_handle(subscription_id_);
00292 }
00293
00294 void
00295 DataReaderImpl::add_association(const RepoId& yourId,
00296 const WriterAssociation& writer,
00297 bool active)
00298 {
00299 if (DCPS_debug_level) {
00300 GuidConverter reader_converter(yourId);
00301 GuidConverter writer_converter(writer.writerId);
00302 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00303 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00304 OPENDDS_STRING(reader_converter).c_str(),
00305 OPENDDS_STRING(writer_converter).c_str()));
00306 }
00307
00308 if (entity_deleted_.value()) {
00309 if (DCPS_debug_level) {
00310 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00311 ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00312 }
00313 return;
00314 }
00315
00316
00317
00318
00319 if (GUID_UNKNOWN == subscription_id_) {
00320 subscription_id_ = yourId;
00321 }
00322
00323
00324 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00325
00326
00327
00328
00329
00330
00331 {
00332
00333 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00334
00335 const PublicationId& writer_id = writer.writerId;
00336 RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337 std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00338
00339 WriterMapType::value_type(
00340 writer_id,
00341 info));
00342
00343
00344
00345 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00346 info->waiting_for_end_historic_samples_ = true;
00347 }
00348
00349 this->statistics_.insert(
00350 StatsMapType::value_type(
00351 writer_id,
00352 WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00353
00354
00355 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00356
00357 }
00358
00359 if (DCPS_debug_level > 4) {
00360 GuidConverter converter(writer_id);
00361 ACE_DEBUG((LM_DEBUG,
00362 "(%P|%t) DataReaderImpl::add_association: "
00363 "inserted writer %C.return %d \n",
00364 OPENDDS_STRING(converter).c_str(), bpair.second));
00365
00366 WriterMapType::iterator iter = writers_.find(writer_id);
00367 if (iter != writers_.end()) {
00368
00369
00370
00371 GuidConverter reader_converter(subscription_id_);
00372 GuidConverter writer_converter(writer_id);
00373 ACE_DEBUG((LM_DEBUG,
00374 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00375 ACE_TEXT("reader %C is associated with writer %C.\n"),
00376 OPENDDS_STRING(reader_converter).c_str(),
00377 OPENDDS_STRING(writer_converter).c_str()));
00378 }
00379 }
00380 }
00381
00382
00383
00384
00385
00386 AssociationData data;
00387 data.remote_id_ = writer.writerId;
00388 data.remote_data_ = writer.writerTransInfo;
00389 data.publication_transport_priority_ =
00390 writer.writerQos.transport_priority.value;
00391 data.remote_reliable_ =
00392 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00393 data.remote_durable_ =
00394 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00395
00396
00397
00398
00399
00400 guard.release();
00401
00402 if (!associate(data, active)) {
00403 if (DCPS_debug_level) {
00404 ACE_DEBUG((LM_ERROR,
00405 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00406 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00407 }
00408 }
00409 }
00410
00411 void
00412 DataReaderImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00413 {
00414 if (!(flags & ASSOC_OK)) {
00415 if (DCPS_debug_level) {
00416 const GuidConverter conv(remote_id);
00417 ACE_DEBUG((LM_ERROR,
00418 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00419 ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00420 OPENDDS_STRING(conv).c_str()));
00421 }
00422 return;
00423 }
00424
00425 const bool active = flags & ASSOC_ACTIVE;
00426 {
00427
00428 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00429
00430
00431 if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00432 if (DCPS_debug_level >= 5) {
00433 GuidConverter converter(subscription_id_);
00434 ACE_DEBUG((LM_DEBUG,
00435 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00436 ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00437 OPENDDS_STRING(converter).c_str()));
00438 }
00439
00440 liveliness_timer_->check_liveliness();
00441 }
00442 }
00443
00444
00445 if (!is_bit_) {
00446
00447 DDS::InstanceHandle_t handle = participant_servant_->id_to_handle(remote_id);
00448
00449
00450
00451 {
00452 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00453
00454
00455 id_to_handle_map_.insert(
00456 RepoIdToHandleMap::value_type(remote_id, handle));
00457
00458 if (DCPS_debug_level > 4) {
00459 GuidConverter converter(remote_id);
00460 ACE_DEBUG((LM_DEBUG,
00461 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00462 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00463 OPENDDS_STRING(converter).c_str(),
00464 handle));
00465 }
00466
00467
00468
00469
00470 const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00471 subscription_match_status_.current_count_change =
00472 matchedPublications - subscription_match_status_.current_count;
00473 subscription_match_status_.current_count = matchedPublications;
00474
00475 ++subscription_match_status_.total_count;
00476 ++subscription_match_status_.total_count_change;
00477
00478 subscription_match_status_.last_publication_handle = handle;
00479
00480 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00481
00482 DDS::DataReaderListener_var listener =
00483 listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00484
00485 if (!CORBA::is_nil(listener)) {
00486 listener->on_subscription_matched(dr_local_objref_,
00487 subscription_match_status_);
00488
00489
00490
00491
00492
00493 subscription_match_status_.total_count_change = 0;
00494 subscription_match_status_.current_count_change = 0;
00495 }
00496
00497 notify_status_condition();
00498 }
00499
00500 {
00501 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00502 ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00503
00504 if(!writers_.count(remote_id)){
00505 return;
00506 }
00507 writers_[remote_id]->handle_ = handle;
00508 }
00509 }
00510
00511 if (!active) {
00512 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00513
00514 disco->association_complete(domain_id_, participant_servant_->get_id(),
00515 subscription_id_, remote_id);
00516 }
00517
00518 if (monitor_) {
00519 monitor_->report();
00520 }
00521 }
00522
00523 void
00524 DataReaderImpl::association_complete(const RepoId& )
00525 {
00526
00527
00528 }
00529
00530 void
00531 DataReaderImpl::remove_associations(const WriterIdSeq& writers,
00532 bool notify_lost)
00533 {
00534 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00535
00536 if (writers.length() == 0) {
00537 return;
00538 }
00539
00540 if (DCPS_debug_level >= 1) {
00541 GuidConverter reader_converter(subscription_id_);
00542 GuidConverter writer_converter(writers[0]);
00543 ACE_DEBUG((LM_DEBUG,
00544 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00545 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00546 is_bit_,
00547 OPENDDS_STRING(reader_converter).c_str(),
00548 OPENDDS_STRING(writer_converter).c_str(),
00549 writers.length()));
00550 }
00551 if (!this->entity_deleted_.value()) {
00552
00553 this->stop_associating(writers.get_buffer(), writers.length());
00554
00555
00556
00557 WriterIdSeq non_active_writers;
00558 {
00559 CORBA::ULong wr_len = writers.length();
00560 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00561
00562 for (CORBA::ULong i = 0; i < wr_len; i++) {
00563 PublicationId writer_id = writers[i];
00564
00565 WriterMapType::iterator it = this->writers_.find(writer_id);
00566 if (it != this->writers_.end() &&
00567 it->second->active(TheServiceParticipant->pending_timeout())) {
00568 remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00569 } else {
00570 push_back(non_active_writers, writer_id);
00571 }
00572 }
00573 }
00574 remove_associations_i(non_active_writers, notify_lost);
00575 } else {
00576 remove_associations_i(writers, notify_lost);
00577 }
00578 }
00579
00580 void
00581 DataReaderImpl::remove_or_reschedule(const PublicationId& pub_id)
00582 {
00583 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584 WriterMapType::iterator where = writers_.find(pub_id);
00585 if (writers_.end() != where) {
00586 WriterInfo& info = *where->second;
00587 WriterIdSeq writers;
00588 push_back(writers, pub_id);
00589 bool notify = info.notify_lost_;
00590 if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591 write_guard.release();
00592 remove_associations_i(writers, notify);
00593 } else {
00594 write_guard.release();
00595 remove_associations(writers, notify);
00596 }
00597 }
00598 }
00599
00600 void
00601 DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
00602 bool notify_lost)
00603 {
00604 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00605
00606 if (writers.length() == 0) {
00607 return;
00608 }
00609
00610 if (DCPS_debug_level >= 1) {
00611 GuidConverter reader_converter(subscription_id_);
00612 GuidConverter writer_converter(writers[0]);
00613 ACE_DEBUG((LM_DEBUG,
00614 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00615 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616 is_bit_,
00617 OPENDDS_STRING(reader_converter).c_str(),
00618 OPENDDS_STRING(writer_converter).c_str(),
00619 writers.length()));
00620 }
00621 DDS::InstanceHandleSeq handles;
00622
00623 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00624
00625
00626
00627
00628 WriterIdSeq updated_writers;
00629
00630 CORBA::ULong wr_len;
00631
00632
00633
00634
00635
00636 {
00637 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638
00639 wr_len = writers.length();
00640
00641 for (CORBA::ULong i = 0; i < wr_len; i++) {
00642 PublicationId writer_id = writers[i];
00643
00644 WriterMapType::iterator it = this->writers_.find(writer_id);
00645
00646 if (it != this->writers_.end()) {
00647 it->second->removed();
00648 end_historic_sweeper_->cancel_timer(it->second);
00649 remove_association_sweeper_->cancel_timer(it->second);
00650 }
00651
00652 if (this->writers_.erase(writer_id) == 0) {
00653 if (DCPS_debug_level >= 1) {
00654 GuidConverter converter(writer_id);
00655 ACE_DEBUG((LM_DEBUG,
00656 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00657 ACE_TEXT("the writer local %C was already removed.\n"),
00658 OPENDDS_STRING(converter).c_str()));
00659 }
00660
00661 } else {
00662 push_back(updated_writers, writer_id);
00663 }
00664 }
00665 }
00666
00667 wr_len = updated_writers.length();
00668
00669
00670 if (wr_len == 0) {
00671 return;
00672 }
00673
00674 if (!is_bit_) {
00675
00676
00677 if (this->lookup_instance_handles(updated_writers, handles) == false) {
00678 if (DCPS_debug_level > 4) {
00679 ACE_DEBUG((LM_DEBUG,
00680 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00681 ACE_TEXT("lookup_instance_handles failed.\n")));
00682 }
00683 }
00684
00685 for (CORBA::ULong i = 0; i < wr_len; ++i) {
00686 id_to_handle_map_.erase(updated_writers[i]);
00687 }
00688 }
00689
00690 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00691 {
00692 this->disassociate(updated_writers[i]);
00693 }
00694 }
00695
00696
00697 if (!this->is_bit_) {
00698
00699 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00700 this->subscription_match_status_.current_count_change
00701 = matchedPublications - this->subscription_match_status_.current_count;
00702
00703
00704 if (this->subscription_match_status_.current_count_change != 0) {
00705 this->subscription_match_status_.current_count = matchedPublications;
00706
00707
00708
00709
00710 this->subscription_match_status_.last_publication_handle
00711 = handles[ wr_len - 1];
00712
00713 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00714
00715 DDS::DataReaderListener_var listener
00716 = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00717
00718 if (!CORBA::is_nil(listener.in())) {
00719 listener->on_subscription_matched(
00720 dr_local_objref_.in(),
00721 this->subscription_match_status_);
00722
00723
00724 this->subscription_match_status_.total_count_change = 0;
00725 this->subscription_match_status_.current_count_change = 0;
00726 }
00727 notify_status_condition();
00728 }
00729 }
00730
00731
00732
00733
00734 if (notify_lost) {
00735 this->notify_subscription_lost(handles);
00736 }
00737
00738 if (this->monitor_) {
00739 this->monitor_->report();
00740 }
00741 }
00742
00743 void
00744 DataReaderImpl::remove_all_associations()
00745 {
00746 DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00747
00748 this->stop_associating();
00749
00750 OpenDDS::DCPS::WriterIdSeq writers;
00751 int size;
00752
00753 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00754
00755 {
00756 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00757
00758 size = static_cast<int>(writers_.size());
00759 writers.length(size);
00760
00761 WriterMapType::iterator curr_writer = writers_.begin();
00762 WriterMapType::iterator end_writer = writers_.end();
00763
00764 int i = 0;
00765
00766 while (curr_writer != end_writer) {
00767 writers[i++] = curr_writer->first;
00768 ++curr_writer;
00769 }
00770 }
00771
00772 try {
00773 CORBA::Boolean dont_notify_lost = 0;
00774
00775 if (0 < size) {
00776 remove_associations(writers, dont_notify_lost);
00777 }
00778
00779 } catch (const CORBA::Exception&) {
00780 ACE_DEBUG((LM_WARNING,
00781 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00782 ACE_TEXT("caught exception from remove_associations.\n")));
00783 }
00784 }
00785
00786 void
00787 DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00788 {
00789 DDS::DataReaderListener_var listener =
00790 listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00791
00792 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00793 guard,
00794 this->publication_handle_lock_);
00795
00796
00797 if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00798
00799 return;
00800 }
00801
00802 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00803 true);
00804
00805
00806 requested_incompatible_qos_status_.total_count = status.total_count;
00807 requested_incompatible_qos_status_.total_count_change +=
00808 status.count_since_last_send;
00809 requested_incompatible_qos_status_.last_policy_id =
00810 status.last_policy_id;
00811 requested_incompatible_qos_status_.policies = status.policies;
00812
00813 if (!CORBA::is_nil(listener.in())) {
00814 listener->on_requested_incompatible_qos(dr_local_objref_.in(),
00815 requested_incompatible_qos_status_);
00816
00817
00818
00819
00820
00821
00822 requested_incompatible_qos_status_.total_count_change = 0;
00823 }
00824
00825 notify_status_condition();
00826 }
00827
00828 void
00829 DataReaderImpl::inconsistent_topic()
00830 {
00831 topic_servant_->inconsistent_topic();
00832 }
00833
00834 void
00835 DataReaderImpl::signal_liveliness(const RepoId& remote_participant)
00836 {
00837 RepoId prefix = remote_participant;
00838 prefix.entityId = EntityId_t();
00839
00840 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00841
00842 typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00843 typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00844 WriterSet writers;
00845
00846 {
00847 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00848 for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00849 limit = writers_.end();
00850 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00851 ++pos) {
00852 writers.push_back(std::make_pair(pos->first, pos->second));
00853 }
00854 }
00855
00856 ACE_Time_Value when = ACE_OS::gettimeofday();
00857 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00858 pos != limit;
00859 ++pos) {
00860 pos->second->received_activity(when);
00861 }
00862
00863 if (!writers.empty()) {
00864 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00865 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00866 pos != limit;
00867 ++pos) {
00868 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00869 iter != instances_.end();
00870 ++iter) {
00871 SubscriptionInstance *ptr = iter->second;
00872 ptr->instance_state_.lively(pos->first);
00873 }
00874 }
00875 }
00876 }
00877
00878 DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
00879 DDS::SampleStateMask sample_states,
00880 DDS::ViewStateMask view_states,
00881 DDS::InstanceStateMask instance_states)
00882 {
00883 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00884 DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00885 view_states, instance_states);
00886 read_conditions_.insert(rc);
00887 return rc._retn();
00888 }
00889
00890 #ifndef OPENDDS_NO_QUERY_CONDITION
00891 DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
00892 DDS::SampleStateMask sample_states,
00893 DDS::ViewStateMask view_states,
00894 DDS::InstanceStateMask instance_states,
00895 const char* query_expression,
00896 const DDS::StringSeq& query_parameters)
00897 {
00898 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00899 try {
00900 DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00901 view_states, instance_states, query_expression, query_parameters);
00902 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00903 read_conditions_.insert(rc);
00904 return qc._retn();
00905 } catch (const std::exception& e) {
00906 if (DCPS_debug_level) {
00907 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ")
00908 ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00909 e.what()));
00910 }
00911 return 0;
00912 }
00913 }
00914 #endif
00915
00916 bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
00917 {
00918
00919 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00920 return read_conditions_.find(rc) != read_conditions_.end();
00921 }
00922
00923 DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
00924 DDS::ReadCondition_ptr a_condition)
00925 {
00926 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00927 DDS::RETCODE_OUT_OF_RESOURCES);
00928 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00929 return read_conditions_.erase(rc)
00930 ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00931 }
00932
00933 DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
00934 {
00935 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00936 DDS::RETCODE_OUT_OF_RESOURCES);
00937 read_conditions_.clear();
00938 return DDS::RETCODE_OK;
00939 }
00940
00941 DDS::ReturnCode_t DataReaderImpl::set_qos(
00942 const DDS::DataReaderQos & qos)
00943 {
00944
00945 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00946 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00947 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00948
00949 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00950 if (qos_ == qos)
00951 return DDS::RETCODE_OK;
00952
00953 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00954 return DDS::RETCODE_IMMUTABLE_POLICY;
00955
00956 } else {
00957 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00958 DDS::SubscriberQos subscriberQos;
00959 this->subscriber_servant_->get_qos(subscriberQos);
00960 const bool status =
00961 disco->update_subscription_qos(
00962 this->participant_servant_->get_domain_id(),
00963 this->participant_servant_->get_id(),
00964 this->subscription_id_,
00965 qos,
00966 subscriberQos);
00967 if (!status) {
00968 ACE_ERROR_RETURN((LM_ERROR,
00969 ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00970 ACE_TEXT("qos not updated. \n")),
00971 DDS::RETCODE_ERROR);
00972 }
00973 }
00974
00975
00976 if (qos_.deadline.period.sec != qos.deadline.period.sec
00977 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00978 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00979 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00980 this->watchdog_ =
00981 new RequestedDeadlineWatchdog(
00982 this->sample_lock_,
00983 qos.deadline,
00984 this,
00985 this->dr_local_objref_.in(),
00986 this->requested_deadline_missed_status_,
00987 this->last_deadline_missed_total_count_);
00988
00989 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00990 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00991 this->watchdog_->cancel_all();
00992 this->watchdog_->destroy();
00993 this->watchdog_ = 0;
00994
00995 } else {
00996 this->watchdog_->reset_interval(
00997 duration_to_time_value(qos.deadline.period));
00998 }
00999 }
01000
01001 qos_ = qos;
01002
01003 return DDS::RETCODE_OK;
01004
01005 } else {
01006 return DDS::RETCODE_INCONSISTENT_POLICY;
01007 }
01008 }
01009
01010 DDS::ReturnCode_t
01011 DataReaderImpl::get_qos(
01012 DDS::DataReaderQos & qos)
01013 {
01014 qos = qos_;
01015 return DDS::RETCODE_OK;
01016 }
01017
01018 DDS::ReturnCode_t DataReaderImpl::set_listener(
01019 DDS::DataReaderListener_ptr a_listener,
01020 DDS::StatusMask mask)
01021 {
01022 listener_mask_ = mask;
01023
01024 listener_ = DDS::DataReaderListener::_duplicate(a_listener);
01025 return DDS::RETCODE_OK;
01026 }
01027
01028 DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
01029 {
01030 return DDS::DataReaderListener::_duplicate(listener_.in());
01031 }
01032
01033 DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
01034 {
01035 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01036 DDS::ContentFilteredTopic_ptr cft = this->get_cf_topic();
01037 if (cft) {
01038 return cft;
01039 }
01040 #endif
01041 return DDS::TopicDescription::_duplicate(topic_desc_.in());
01042 }
01043
01044 DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
01045 {
01046 return DDS::Subscriber::_duplicate(subscriber_servant_);
01047 }
01048
01049 DDS::ReturnCode_t
01050 DataReaderImpl::get_sample_rejected_status(
01051 DDS::SampleRejectedStatus & status)
01052 {
01053 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01054
01055 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
01056 status = sample_rejected_status_;
01057 sample_rejected_status_.total_count_change = 0;
01058 return DDS::RETCODE_OK;
01059 }
01060
01061 DDS::ReturnCode_t
01062 DataReaderImpl::get_liveliness_changed_status(
01063 DDS::LivelinessChangedStatus & status)
01064 {
01065 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01066
01067 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
01068 false);
01069 status = liveliness_changed_status_;
01070
01071 liveliness_changed_status_.alive_count_change = 0;
01072 liveliness_changed_status_.not_alive_count_change = 0;
01073
01074 return DDS::RETCODE_OK;
01075 }
01076
01077 DDS::ReturnCode_t
01078 DataReaderImpl::get_requested_deadline_missed_status(
01079 DDS::RequestedDeadlineMissedStatus & status)
01080 {
01081 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01082
01083 set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
01084 false);
01085
01086 this->requested_deadline_missed_status_.total_count_change =
01087 this->requested_deadline_missed_status_.total_count
01088 - this->last_deadline_missed_total_count_;
01089
01090
01091
01092
01093
01094 this->last_deadline_missed_total_count_ =
01095 this->requested_deadline_missed_status_.total_count;
01096
01097 status = requested_deadline_missed_status_;
01098
01099 return DDS::RETCODE_OK;
01100 }
01101
01102 DDS::ReturnCode_t
01103 DataReaderImpl::get_requested_incompatible_qos_status(
01104 DDS::RequestedIncompatibleQosStatus & status)
01105 {
01106
01107 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01108 this->publication_handle_lock_);
01109
01110 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
01111 false);
01112 status = requested_incompatible_qos_status_;
01113 requested_incompatible_qos_status_.total_count_change = 0;
01114
01115 return DDS::RETCODE_OK;
01116 }
01117
01118 DDS::ReturnCode_t
01119 DataReaderImpl::get_subscription_matched_status(
01120 DDS::SubscriptionMatchedStatus & status)
01121 {
01122
01123 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01124 this->publication_handle_lock_);
01125
01126 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
01127 status = subscription_match_status_;
01128 subscription_match_status_.total_count_change = 0;
01129 subscription_match_status_.current_count_change = 0;
01130
01131 return DDS::RETCODE_OK;
01132 }
01133
01134 DDS::ReturnCode_t
01135 DataReaderImpl::get_sample_lost_status(
01136 DDS::SampleLostStatus & status)
01137 {
01138 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01139
01140 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
01141 status = sample_lost_status_;
01142 sample_lost_status_.total_count_change = 0;
01143 return DDS::RETCODE_OK;
01144 }
01145
01146 DDS::ReturnCode_t
01147 DataReaderImpl::wait_for_historical_data(
01148 const DDS::Duration_t & )
01149 {
01150
01151 return 0;
01152 }
01153
01154 DDS::ReturnCode_t
01155 DataReaderImpl::get_matched_publications(
01156 DDS::InstanceHandleSeq & publication_handles)
01157 {
01158 if (enabled_ == false) {
01159 ACE_ERROR_RETURN((LM_ERROR,
01160 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01161 ACE_TEXT(" Entity is not enabled. \n")),
01162 DDS::RETCODE_NOT_ENABLED);
01163 }
01164
01165 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01166 guard,
01167 this->publication_handle_lock_,
01168 DDS::RETCODE_ERROR);
01169
01170
01171 int index = 0;
01172 publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01173
01174 for (RepoIdToHandleMap::iterator
01175 current = this->id_to_handle_map_.begin();
01176 current != this->id_to_handle_map_.end();
01177 ++current, ++index) {
01178 publication_handles[ index] = current->second;
01179 }
01180
01181 return DDS::RETCODE_OK;
01182 }
01183
01184 #if !defined (DDS_HAS_MINIMUM_BIT)
01185 DDS::ReturnCode_t
01186 DataReaderImpl::get_matched_publication_data(
01187 DDS::PublicationBuiltinTopicData & publication_data,
01188 DDS::InstanceHandle_t publication_handle)
01189 {
01190 if (enabled_ == false) {
01191 ACE_ERROR_RETURN((LM_ERROR,
01192 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01193 ACE_TEXT("get_matched_publication_data: ")
01194 ACE_TEXT("Entity is not enabled. \n")),
01195 DDS::RETCODE_NOT_ENABLED);
01196 }
01197
01198
01199 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01200 guard,
01201 this->publication_handle_lock_,
01202 DDS::RETCODE_ERROR);
01203
01204
01205 BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01206 DDS::PublicationBuiltinTopicDataDataReader_var,
01207 DDS::PublicationBuiltinTopicDataSeq > hh;
01208
01209 DDS::PublicationBuiltinTopicDataSeq data;
01210
01211 DDS::ReturnCode_t ret
01212 = hh.instance_handle_to_bit_data(participant_servant_,
01213 BUILT_IN_PUBLICATION_TOPIC,
01214 publication_handle,
01215 data);
01216
01217 if (ret == DDS::RETCODE_OK) {
01218 publication_data = data[0];
01219 }
01220
01221 return ret;
01222 }
01223 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01224
01225 DDS::ReturnCode_t
01226 DataReaderImpl::enable()
01227 {
01228
01229
01230
01231
01232
01233
01234 if (this->is_enabled()) {
01235 return DDS::RETCODE_OK;
01236 }
01237
01238 if (this->subscriber_servant_->is_enabled() == false) {
01239 return DDS::RETCODE_PRECONDITION_NOT_MET;
01240 }
01241
01242 if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01243
01244
01245 depth_ = qos_.resource_limits.max_samples_per_instance;
01246
01247 } else {
01248 depth_ = qos_.history.depth;
01249 }
01250
01251 if (depth_ == DDS::LENGTH_UNLIMITED) {
01252
01253
01254
01255
01256
01257
01258 depth_ = 2147483647L;
01259 }
01260
01261 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01262 n_chunks_ = qos_.resource_limits.max_samples;
01263 }
01264
01265
01266
01267
01268 this->enable_specific();
01269
01270
01271
01272 rd_allocator_ = new ReceivedDataAllocator(n_chunks_);
01273
01274 if (DCPS_debug_level >= 2)
01275 ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01276 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01277 rd_allocator_, n_chunks_));
01278
01279 if ((qos_.liveliness.lease_duration.sec !=
01280 DDS::DURATION_INFINITE_SEC) &&
01281 (qos_.liveliness.lease_duration.nanosec !=
01282 DDS::DURATION_INFINITE_NSEC)) {
01283 liveliness_lease_duration_ =
01284 duration_to_time_value(qos_.liveliness.lease_duration);
01285 }
01286
01287
01288
01289 DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01290
01291 if (this->watchdog_ == 0
01292 && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01293 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01294 this->watchdog_ =
01295 new RequestedDeadlineWatchdog(
01296 this->sample_lock_,
01297 this->qos_.deadline,
01298 this,
01299 this->dr_local_objref_.in(),
01300 this->requested_deadline_missed_status_,
01301 this->last_deadline_missed_total_count_);
01302 }
01303
01304 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01305 disco->pre_reader(this);
01306
01307 this->set_enabled();
01308
01309 if (topic_servant_ && !transport_disabled_) {
01310
01311 try {
01312 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01313 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01314 } catch (const Transport::Exception&) {
01315 ACE_ERROR((LM_ERROR,
01316 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01317 ACE_TEXT("Transport Exception.\n")));
01318 return DDS::RETCODE_ERROR;
01319
01320 }
01321
01322 const TransportLocatorSeq& trans_conf_info = this->connection_info();
01323
01324 CORBA::String_var filterClassName = "";
01325 CORBA::String_var filterExpression = "";
01326 DDS::StringSeq exprParams;
01327 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01328 DDS::ContentFilteredTopic_var cft = this->get_cf_topic();
01329 if (cft) {
01330 OpenDDS::DCPS::ContentFilteredTopicImpl* impl =
01331 dynamic_cast<OpenDDS::DCPS::ContentFilteredTopicImpl*>(cft.in());
01332 if (impl) {
01333 filterClassName = impl->get_filter_class_name();
01334 }
01335 filterExpression = cft->get_filter_expression();
01336 cft->get_expression_parameters(exprParams);
01337 }
01338 #endif
01339
01340 DDS::SubscriberQos sub_qos;
01341 this->subscriber_servant_->get_qos(sub_qos);
01342
01343 this->subscription_id_ =
01344 disco->add_subscription(this->domain_id_,
01345 this->participant_servant_->get_id(),
01346 this->topic_servant_->get_id(),
01347 this,
01348 this->qos_,
01349 trans_conf_info,
01350 sub_qos,
01351 filterClassName,
01352 filterExpression,
01353 exprParams);
01354
01355 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01356 ACE_ERROR((LM_ERROR,
01357 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01358 ACE_TEXT("add_subscription returned invalid id.\n")));
01359 return DDS::RETCODE_ERROR;
01360 }
01361 }
01362
01363 if (topic_servant_) {
01364 const CORBA::String_var name = topic_servant_->get_name();
01365 DDS::ReturnCode_t return_value =
01366 this->subscriber_servant_->reader_enabled(name.in(), this);
01367
01368 if (this->monitor_) {
01369 this->monitor_->report();
01370 }
01371
01372 return return_value;
01373 } else {
01374 return DDS::RETCODE_OK;
01375 }
01376 }
01377
01378 void
01379 DataReaderImpl::writer_activity(const DataSampleHeader& header)
01380 {
01381
01382
01383 RcHandle<WriterInfo> writer;
01384
01385
01386
01387
01388
01389 {
01390 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01391 WriterMapType::iterator iter = writers_.find(header.publication_id_);
01392
01393 if (iter != writers_.end()) {
01394 writer = iter->second;
01395
01396 } else if (DCPS_debug_level > 4) {
01397
01398
01399
01400 GuidConverter reader_converter(subscription_id_);
01401 GuidConverter writer_converter(header.publication_id_);
01402 ACE_DEBUG((LM_DEBUG,
01403 ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01404 ACE_TEXT("reader %C is not associated with writer %C.\n"),
01405 OPENDDS_STRING(reader_converter).c_str(),
01406 OPENDDS_STRING(writer_converter).c_str()));
01407 }
01408 }
01409
01410 if (!writer.is_nil()) {
01411 ACE_Time_Value when = ACE_OS::gettimeofday();
01412 writer->received_activity(when);
01413
01414 if ((header.message_id_ == SAMPLE_DATA) ||
01415 (header.message_id_ == INSTANCE_REGISTRATION) ||
01416 (header.message_id_ == UNREGISTER_INSTANCE) ||
01417 (header.message_id_ == DISPOSE_INSTANCE) ||
01418 (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01419
01420 const SequenceNumber defaultSN;
01421 SequenceRange resetRange(defaultSN, header.sequence_);
01422
01423 if (writer->seen_data_ && !header.sequence_repair_) {
01424
01425
01426
01427 writer->ack_sequence(header.sequence_);
01428
01429 } else {
01430
01431
01432
01433 writer->seen_data_ = true;
01434 writer->ack_sequence_.reset();
01435 writer->ack_sequence_.insert(resetRange);
01436 }
01437
01438 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01439 if (header.coherent_change_) {
01440 if (writer->coherent_samples_ == 0) {
01441 writer->coherent_sample_sequence_.reset();
01442 writer->coherent_sample_sequence_.insert(resetRange);
01443 }
01444 else {
01445 writer->coherent_sample_sequence_.insert(header.sequence_);
01446 }
01447 }
01448 #endif
01449 }
01450 }
01451 }
01452
01453 void
01454 DataReaderImpl::data_received(const ReceivedDataSample& sample)
01455 {
01456 DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01457
01458
01459
01460 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01461
01462 if (get_deleted()) return;
01463
01464 if (DCPS_debug_level > 9) {
01465 GuidConverter converter(subscription_id_);
01466 ACE_DEBUG((LM_DEBUG,
01467 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01468 ACE_TEXT("%C received sample: %C.\n"),
01469 OPENDDS_STRING(converter).c_str(),
01470 to_string(sample.header_).c_str()));
01471 }
01472
01473 switch (sample.header_.message_id_) {
01474 case SAMPLE_DATA:
01475 case INSTANCE_REGISTRATION: {
01476 if (!check_historic(sample)) break;
01477
01478 DataSampleHeader const & header = sample.header_;
01479
01480 this->writer_activity(header);
01481
01482
01483 if (this->filter_sample(header)) break;
01484
01485
01486 this->subscriber_servant_->data_received(this);
01487
01488
01489 if (header.message_id_ == SAMPLE_DATA) {
01490 this->process_latency(sample);
01491 }
01492
01493
01494
01495
01496 SubscriptionInstance* instance = 0;
01497 bool is_new_instance = false;
01498 bool filtered = false;
01499 if (sample.header_.key_fields_only_) {
01500 dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01501 } else {
01502 dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01503 }
01504
01505
01506 if (DCPS_debug_level >= 8) {
01507 GuidConverter reader_converter(subscription_id_);
01508 GuidConverter writer_converter(header.publication_id_);
01509
01510 ACE_DEBUG ((LM_DEBUG,
01511 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01512 ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01513 OPENDDS_STRING(reader_converter).c_str(),
01514 OPENDDS_STRING(writer_converter).c_str(),
01515 instance ? instance->instance_handle_ : 0,
01516 is_new_instance, filtered));
01517 }
01518
01519 if (filtered) break;
01520 bool accepted = true;
01521 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01522 bool verify_coherent = false;
01523 #endif
01524 RcHandle<WriterInfo> writer;
01525
01526 if (header.publication_id_.entityId.entityKind
01527 != OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER) {
01528 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01529
01530 WriterMapType::iterator where
01531 = this->writers_.find(header.publication_id_);
01532
01533 if (where != this->writers_.end()) {
01534 if (header.coherent_change_) {
01535
01536 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01537
01538 where->second->group_coherent_ = header.group_coherent_;
01539 where->second->publisher_id_ = header.publisher_id_;
01540 ++where->second->coherent_samples_;
01541 verify_coherent = true;
01542 #endif
01543 writer = where->second;
01544 }
01545 } else {
01546 GuidConverter subscriptionBuffer(this->subscription_id_);
01547 GuidConverter publicationBuffer(header.publication_id_);
01548 ACE_DEBUG((LM_WARNING,
01549 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01550 ACE_TEXT("subscription %C failed to find ")
01551 ACE_TEXT("publication data for %C.\n"),
01552 OPENDDS_STRING(subscriptionBuffer).c_str(),
01553 OPENDDS_STRING(publicationBuffer).c_str()));
01554 }
01555 }
01556
01557 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01558 if (verify_coherent) {
01559 accepted = this->verify_coherent_changes_completion(writer.in());
01560 }
01561 #endif
01562
01563 if (this->watchdog_) {
01564 instance->last_sample_tv_ = instance->cur_sample_tv_;
01565 instance->cur_sample_tv_ = ACE_OS::gettimeofday();
01566
01567
01568 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01569 if (is_new_instance) {
01570 this->watchdog_->schedule_timer(instance);
01571
01572 } else {
01573 this->watchdog_->execute(instance, false);
01574 }
01575 }
01576
01577 if (accepted) {
01578 this->notify_read_conditions();
01579 }
01580 }
01581 break;
01582
01583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01584 case END_COHERENT_CHANGES: {
01585 CoherentChangeControl control;
01586
01587 this->writer_activity(sample.header_);
01588
01589 Serializer serializer(
01590 sample.sample_, sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01591 serializer >> control;
01592
01593 if (DCPS_debug_level > 0) {
01594 std::stringstream buffer;
01595 buffer << control << std::endl;
01596
01597 ACE_DEBUG((LM_DEBUG,
01598 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01599 ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01600 buffer.str().c_str()));
01601 }
01602
01603 RcHandle<WriterInfo> writer;
01604 {
01605 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01606
01607 WriterMapType::iterator it =
01608 this->writers_.find(sample.header_.publication_id_);
01609
01610 if (it == this->writers_.end()) {
01611 GuidConverter sub_id(this->subscription_id_);
01612 GuidConverter pub_id(sample.header_.publication_id_);
01613 ACE_DEBUG((LM_WARNING,
01614 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01615 ACE_TEXT(" subscription %C failed to find ")
01616 ACE_TEXT(" publication data for %C!\n"),
01617 OPENDDS_STRING(sub_id).c_str(),
01618 OPENDDS_STRING(pub_id).c_str()));
01619 return;
01620 }
01621 else {
01622 writer = it->second;
01623 }
01624 it->second->set_group_info (control);
01625 }
01626
01627 if (this->verify_coherent_changes_completion(writer.in())) {
01628 this->notify_read_conditions();
01629 }
01630 }
01631 break;
01632 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01633
01634 case DATAWRITER_LIVELINESS: {
01635 if (DCPS_debug_level >= 4) {
01636 GuidConverter reader_converter(subscription_id_);
01637 GuidConverter writer_converter(sample.header_.publication_id_);
01638 ACE_DEBUG((LM_DEBUG,
01639 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01640 ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01641 OPENDDS_STRING(reader_converter).c_str(),
01642 OPENDDS_STRING(writer_converter).c_str()));
01643 }
01644 this->writer_activity(sample.header_);
01645
01646
01647 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01648 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01649 iter != instances_.end();
01650 ++iter) {
01651 SubscriptionInstance *ptr = iter->second;
01652
01653 ptr->instance_state_.lively(sample.header_.publication_id_);
01654 }
01655 }
01656
01657 }
01658 break;
01659
01660 case DISPOSE_INSTANCE: {
01661 if (!check_historic(sample)) break;
01662 this->writer_activity(sample.header_);
01663 SubscriptionInstance* instance = 0;
01664
01665 if (this->watchdog_) {
01666
01667
01668
01669 ReceivedDataSample dup(sample);
01670 this->lookup_instance(dup, instance);
01671 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01672 if (! this->is_exclusive_ownership_
01673 || (this->is_exclusive_ownership_
01674 && (instance != 0 )
01675 && (this->owner_manager_->is_owner (instance->instance_handle_,
01676 sample.header_.publication_id_)))) {
01677 #endif
01678 this->watchdog_->cancel_timer(instance);
01679 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01680 }
01681 #endif
01682 }
01683 instance = 0;
01684 this->dispose_unregister(sample, instance);
01685 }
01686 this->notify_read_conditions();
01687 break;
01688
01689 case UNREGISTER_INSTANCE: {
01690 if (!check_historic(sample)) break;
01691 this->writer_activity(sample.header_);
01692 SubscriptionInstance* instance = 0;
01693
01694 if (this->watchdog_) {
01695
01696
01697
01698 ReceivedDataSample dup(sample);
01699 this->lookup_instance(dup, instance);
01700 if( instance != 0) {
01701 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01702 if (! this->is_exclusive_ownership_
01703 || (this->is_exclusive_ownership_
01704 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01705 #endif
01706 this->watchdog_->cancel_timer(instance);
01707 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01708 }
01709 #endif
01710 }
01711 }
01712 instance = 0;
01713 this->dispose_unregister(sample, instance);
01714 }
01715 this->notify_read_conditions();
01716 break;
01717
01718 case DISPOSE_UNREGISTER_INSTANCE: {
01719 if (!check_historic(sample)) break;
01720 this->writer_activity(sample.header_);
01721 SubscriptionInstance* instance = 0;
01722
01723 if (this->watchdog_) {
01724
01725
01726
01727 ReceivedDataSample dup(sample);
01728 this->lookup_instance(dup, instance);
01729 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01730 if (! this->is_exclusive_ownership_
01731 || (this->is_exclusive_ownership_
01732 && (instance != 0 )
01733 && (this->owner_manager_->is_owner (instance->instance_handle_,
01734 sample.header_.publication_id_)))
01735 || (this->is_exclusive_ownership_
01736 && (instance != 0 )
01737 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01738 #endif
01739 this->watchdog_->cancel_timer(instance);
01740 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01741 }
01742 #endif
01743 }
01744 instance = 0;
01745 this->dispose_unregister(sample, instance);
01746 }
01747 this->notify_read_conditions();
01748 break;
01749
01750 case END_HISTORIC_SAMPLES: {
01751 if (sample.header_.message_length_ >= sizeof(RepoId)) {
01752 Serializer ser(sample.sample_);
01753 RepoId readerId = GUID_UNKNOWN;
01754 ser >> readerId;
01755 if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01756 break;
01757 }
01758 }
01759 if (DCPS_debug_level > 4) {
01760 ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01761 }
01762
01763 guard.release();
01764 this->resume_sample_processing(sample.header_.publication_id_);
01765 if (DCPS_debug_level > 4) {
01766 GuidConverter pub_id(sample.header_.publication_id_);
01767 ACE_DEBUG((
01768 LM_INFO,
01769 "(%P|%t) Resumed sample processing for durable writer %C\n",
01770 OPENDDS_STRING(pub_id).c_str()));
01771 }
01772 break;
01773 }
01774
01775 default:
01776 ACE_ERROR((LM_ERROR,
01777 "(%P|%t) ERROR: DataReaderImpl::data_received"
01778 "unexpected message_id = %d\n",
01779 sample.header_.message_id_));
01780 break;
01781 }
01782 }
01783
01784 EntityImpl*
01785 DataReaderImpl::parent() const
01786 {
01787 return this->subscriber_servant_;
01788 }
01789
01790 bool
01791 DataReaderImpl::check_transport_qos(const TransportInst& ti)
01792 {
01793 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01794 return ti.is_reliable();
01795 }
01796 return true;
01797 }
01798
01799 void DataReaderImpl::notify_read_conditions()
01800 {
01801
01802 ReadConditionSet local_read_conditions = read_conditions_;
01803 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01804
01805 for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01806 end = local_read_conditions.end(); it != end; ++it) {
01807 dynamic_cast<ConditionImpl*>(it->in())->signal_all();
01808 }
01809 }
01810
01811 SubscriberImpl* DataReaderImpl::get_subscriber_servant()
01812 {
01813 return subscriber_servant_;
01814 }
01815
01816 RepoId DataReaderImpl::get_subscription_id() const
01817 {
01818 return subscription_id_;
01819 }
01820
01821 char *
01822 DataReaderImpl::get_topic_name() const
01823 {
01824 return topic_servant_->get_name();
01825 }
01826
01827 bool DataReaderImpl::have_sample_states(
01828 DDS::SampleStateMask sample_states) const
01829 {
01830
01831
01832 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01833
01834 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01835 iter != instances_.end();
01836 ++iter) {
01837 SubscriptionInstance *ptr = iter->second;
01838
01839 for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01840 item != 0; item = item->next_data_sample_) {
01841 if (item->sample_state_ & sample_states) {
01842 return true;
01843 }
01844 }
01845 }
01846
01847 return false;
01848 }
01849
01850 bool
01851 DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
01852 {
01853
01854
01855 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01856
01857 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01858 iter != instances_.end();
01859 ++iter) {
01860 SubscriptionInstance *ptr = iter->second;
01861
01862 if (ptr->instance_state_.view_state() & view_states) {
01863 return true;
01864 }
01865 }
01866
01867 return false;
01868 }
01869
01870 bool DataReaderImpl::have_instance_states(
01871 DDS::InstanceStateMask instance_states) const
01872 {
01873
01874
01875 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01876
01877 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01878 iter != instances_.end();
01879 ++iter) {
01880 SubscriptionInstance *ptr = iter->second;
01881
01882 if (ptr->instance_state_.instance_state() & instance_states) {
01883 return true;
01884 }
01885 }
01886
01887 return false;
01888 }
01889
01890
01891
01892 bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
01893 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
01894 {
01895 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01896 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01897
01898 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01899 end = instances_.end(); iter != end; ++iter) {
01900 SubscriptionInstance& inst = *iter->second;
01901
01902 if ((inst.instance_state_.view_state() & view_states) &&
01903 (inst.instance_state_.instance_state() & instance_states)) {
01904 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01905 item = item->next_data_sample_) {
01906 if (item->sample_state_ & sample_states
01907 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01908 && !item->coherent_change_
01909 #endif
01910 ) {
01911 return true;
01912 }
01913 }
01914 }
01915 }
01916
01917 return false;
01918 }
01919
01920 DDS::DataReaderListener_ptr
01921 DataReaderImpl::listener_for(DDS::StatusKind kind)
01922 {
01923
01924
01925
01926 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01927 return subscriber_servant_->listener_for(kind);
01928
01929 } else {
01930 return DDS::DataReaderListener::_duplicate(listener_.in());
01931 }
01932 }
01933
01934 void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
01935 const ReceivedDataElement *ptr)
01936 {
01937
01938 sample_info.sample_rank = 0;
01939
01940
01941
01942
01943
01944
01945
01946 sample_info.generation_rank =
01947 (sample_info.disposed_generation_count +
01948 sample_info.no_writers_generation_count) -
01949 sample_info.generation_rank;
01950
01951
01952
01953
01954
01955
01956
01957 sample_info.absolute_generation_rank =
01958 (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01959 static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01960 sample_info.absolute_generation_rank;
01961
01962 sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01963 }
01964
01965 CORBA::Long DataReaderImpl::total_samples() const
01966 {
01967
01968 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01969
01970 CORBA::Long count(0);
01971
01972 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01973 iter != instances_.end();
01974 ++iter) {
01975 SubscriptionInstance *ptr = iter->second;
01976
01977 count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01978 }
01979
01980 return count;
01981 }
01982
01983 int
01984 DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
01985 const void * )
01986 {
01987 check_liveliness_i(false, tv);
01988 return 0;
01989 }
01990
01991 void
01992 DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
01993 const ACE_Time_Value& now)
01994 {
01995
01996 long local_timer_id = liveliness_timer_id_;
01997 bool timer_was_reset = false;
01998
01999 if (local_timer_id != -1 && cancel) {
02000 if (DCPS_debug_level >= 5) {
02001 GuidConverter converter(data_reader_->get_subscription_id());
02002 ACE_DEBUG((LM_DEBUG,
02003 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02004 ACE_TEXT(" canceling timer for reader %C.\n"),
02005 OPENDDS_STRING(converter).c_str()));
02006 }
02007
02008
02009
02010 if (this->reactor()->cancel_timer(local_timer_id) == -1) {
02011
02012
02013
02014 ACE_DEBUG((LM_DEBUG,
02015 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02016 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
02017 }
02018
02019 timer_was_reset = true;
02020 }
02021
02022
02023 ACE_Time_Value smallest(ACE_Time_Value::max_time);
02024 int alive_writers = 0;
02025
02026
02027
02028
02029
02030
02031
02032
02033
02034
02035 if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
02036 liveliness_timer_id_ = -1;
02037 }
02038
02039 ACE_Time_Value next_absolute;
02040
02041
02042 {
02043 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02044 read_guard,
02045 data_reader_->writers_lock_);
02046
02047 for (WriterMapType::iterator iter = data_reader_->writers_.begin();
02048 iter != data_reader_->writers_.end();
02049 ++iter) {
02050
02051
02052 next_absolute = iter->second->check_activity(now);
02053
02054 if (next_absolute != ACE_Time_Value::max_time) {
02055 alive_writers++;
02056
02057 if (next_absolute < smallest) {
02058 smallest = next_absolute;
02059 }
02060 }
02061 }
02062 }
02063
02064 if (!alive_writers) {
02065
02066
02067 liveliness_timer_id_ = -1;
02068 }
02069
02070 if (DCPS_debug_level >= 5) {
02071 GuidConverter converter(data_reader_->get_subscription_id());
02072 ACE_DEBUG((LM_DEBUG,
02073 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02074 ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
02075 OPENDDS_STRING(converter).c_str(),
02076 alive_writers,
02077 !cancel));
02078 }
02079
02080
02081 if (alive_writers) {
02082 ACE_Time_Value relative;
02083
02084
02085 if (now < smallest)
02086 relative = smallest - now;
02087
02088 else
02089 relative = ACE_Time_Value(0,1);
02090
02091 liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
02092
02093 if (liveliness_timer_id_ == -1) {
02094 ACE_ERROR((LM_ERROR,
02095 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02096 ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
02097 }
02098 }
02099 }
02100
02101 void
02102 DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
02103 {
02104 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02105 if (this->is_exclusive_ownership_) {
02106 this->owner_manager_->remove_writers (handle);
02107 }
02108 #endif
02109
02110 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
02111 SubscriptionInstance* instance = this->get_handle_instance(handle);
02112
02113 if (instance == 0) {
02114 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
02115 "could not find the instance by handle 0x%x\n", handle));
02116 return;
02117 }
02118
02119 this->purge_data(instance);
02120
02121 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02122 this->instances_.erase(handle);
02123 }
02124 this->release_instance_i(handle);
02125 if (this->monitor_) {
02126 this->monitor_->report();
02127 }
02128 }
02129
02130
02131 OpenDDS::DCPS::WriterStats::WriterStats(
02132 int amount,
02133 DataCollector<double>::OnFull type) : stats_(amount, type)
02134 {
02135 }
02136
02137 void OpenDDS::DCPS::WriterStats::add_stat(const ACE_Time_Value& delay)
02138 {
02139 double datum = static_cast<double>(delay.sec());
02140 datum += delay.usec() / 1000000.0;
02141 this->stats_.add(datum);
02142 }
02143
02144 OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
02145 {
02146 LatencyStatistics value;
02147
02148 value.publication = GUID_UNKNOWN;
02149 value.n = this->stats_.n();
02150 value.maximum = this->stats_.maximum();
02151 value.minimum = this->stats_.minimum();
02152 value.mean = this->stats_.mean();
02153 value.variance = this->stats_.var();
02154
02155 return value;
02156 }
02157
02158 void OpenDDS::DCPS::WriterStats::reset_stats()
02159 {
02160 this->stats_.reset();
02161 }
02162
02163 #ifndef OPENDDS_SAFETY_PROFILE
02164 std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
02165 {
02166 str << std::dec << this->stats_.size()
02167 << " samples out of " << this->stats_.n() << std::endl;
02168 return str << this->stats_;
02169 }
02170 #endif //OPENDDS_SAFETY_PROFILE
02171
02172 void
02173 DataReaderImpl::writer_removed(WriterInfo& info)
02174 {
02175 if (DCPS_debug_level >= 5) {
02176 GuidConverter reader_converter(subscription_id_);
02177 GuidConverter writer_converter(info.writer_id_);
02178 ACE_DEBUG((LM_DEBUG,
02179 ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02180 ACE_TEXT("reader %C from writer %C.\n"),
02181 OPENDDS_STRING(reader_converter).c_str(),
02182 OPENDDS_STRING(writer_converter).c_str()));
02183 }
02184
02185 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02186 if (this->is_exclusive_ownership_) {
02187 this->owner_manager_->remove_writer (info.writer_id_);
02188 info.clear_owner_evaluated ();
02189 }
02190 #endif
02191
02192 bool liveliness_changed = false;
02193
02194 if (info.state_ == WriterInfo::ALIVE) {
02195 -- liveliness_changed_status_.alive_count;
02196 -- liveliness_changed_status_.alive_count_change;
02197 liveliness_changed = true;
02198 }
02199
02200 if (info.state_ == WriterInfo::DEAD) {
02201 -- liveliness_changed_status_.not_alive_count;
02202 -- liveliness_changed_status_.not_alive_count_change;
02203 liveliness_changed = true;
02204 }
02205
02206 liveliness_changed_status_.last_publication_handle = info.handle_;
02207 instances_liveliness_update(info, ACE_OS::gettimeofday());
02208
02209 if (liveliness_changed) {
02210 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02211 this->notify_liveliness_change();
02212 }
02213 }
02214
02215 void
02216 DataReaderImpl::writer_became_alive(WriterInfo& info,
02217 const ACE_Time_Value& )
02218 {
02219 if (DCPS_debug_level >= 5) {
02220 GuidConverter reader_converter(subscription_id_);
02221 GuidConverter writer_converter(info.writer_id_);
02222 ACE_DEBUG((LM_DEBUG,
02223 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02224 ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02225 OPENDDS_STRING(reader_converter).c_str(),
02226 OPENDDS_STRING(writer_converter).c_str(),
02227 info.get_state_str().c_str()));
02228 }
02229
02230
02231
02232
02233
02234 bool liveliness_changed = false;
02235
02236 if (info.state_ != WriterInfo::ALIVE) {
02237 liveliness_changed_status_.alive_count++;
02238 liveliness_changed_status_.alive_count_change++;
02239 liveliness_changed = true;
02240 }
02241
02242 if (info.state_ == WriterInfo::DEAD) {
02243 liveliness_changed_status_.not_alive_count--;
02244 liveliness_changed_status_.not_alive_count_change--;
02245 liveliness_changed = true;
02246 }
02247
02248 liveliness_changed_status_.last_publication_handle = info.handle_;
02249
02250 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02251
02252 if (liveliness_changed_status_.alive_count < 0) {
02253 ACE_ERROR((LM_ERROR,
02254 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02255 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02256 liveliness_changed_status_.alive_count));
02257 return;
02258 }
02259
02260 if (liveliness_changed_status_.not_alive_count < 0) {
02261 ACE_ERROR((LM_ERROR,
02262 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02263 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02264 liveliness_changed_status_.not_alive_count));
02265 return;
02266 }
02267
02268
02269
02270 info.state_ = WriterInfo::ALIVE;
02271
02272 if (this->monitor_) {
02273 this->monitor_->report();
02274 }
02275
02276
02277 if (liveliness_changed) {
02278
02279
02280
02281 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02282 this->notify_liveliness_change();
02283 }
02284
02285
02286 liveliness_timer_->check_liveliness();
02287 }
02288
02289 void
02290 DataReaderImpl::writer_became_dead(WriterInfo & info,
02291 const ACE_Time_Value& when)
02292 {
02293 if (DCPS_debug_level >= 5) {
02294 GuidConverter reader_converter(subscription_id_);
02295 GuidConverter writer_converter(info.writer_id_);
02296 ACE_DEBUG((LM_DEBUG,
02297 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02298 ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02299
02300 OPENDDS_STRING(reader_converter).c_str(),
02301 OPENDDS_STRING(writer_converter).c_str(),
02302 info.get_state_str().c_str()));
02303 }
02304
02305 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02306 if (this->is_exclusive_ownership_) {
02307 this->owner_manager_->remove_writer (info.writer_id_);
02308 info.clear_owner_evaluated ();
02309 }
02310 #endif
02311
02312
02313 bool liveliness_changed = false;
02314
02315 if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02316 liveliness_changed_status_.not_alive_count++;
02317 liveliness_changed_status_.not_alive_count_change++;
02318 liveliness_changed = true;
02319 }
02320
02321 if (info.state_ == WriterInfo::ALIVE) {
02322 liveliness_changed_status_.alive_count--;
02323 liveliness_changed_status_.alive_count_change--;
02324 liveliness_changed_status_.not_alive_count++;
02325 liveliness_changed_status_.not_alive_count_change++;
02326 liveliness_changed = true;
02327 }
02328
02329 liveliness_changed_status_.last_publication_handle = info.handle_;
02330
02331
02332 info.state_ = WriterInfo::DEAD;
02333 info.seen_data_ = false;
02334
02335 if (this->monitor_) {
02336 this->monitor_->report();
02337 }
02338
02339 if (liveliness_changed_status_.alive_count < 0) {
02340 ACE_ERROR((LM_ERROR,
02341 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02342 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02343 liveliness_changed_status_.alive_count));
02344 return;
02345 }
02346
02347 if (liveliness_changed_status_.not_alive_count < 0) {
02348 ACE_ERROR((LM_ERROR,
02349 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02350 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02351 liveliness_changed_status_.not_alive_count));
02352 return;
02353 }
02354
02355 instances_liveliness_update(info, when);
02356
02357
02358 if (liveliness_changed) {
02359 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02360 this->notify_liveliness_change();
02361 }
02362 }
02363
02364 void
02365 DataReaderImpl::instances_liveliness_update(WriterInfo& info,
02366 const ACE_Time_Value& when)
02367 {
02368 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02369 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02370 next = iter; iter != instances_.end(); iter = next) {
02371 ++next;
02372 iter->second->instance_state_.writer_became_dead(
02373 info.writer_id_, liveliness_changed_status_.alive_count, when);
02374 }
02375 }
02376
02377 void
02378 DataReaderImpl::set_sample_lost_status(
02379 const DDS::SampleLostStatus& status)
02380 {
02381
02382 sample_lost_status_ = status;
02383 }
02384
02385 void
02386 DataReaderImpl::set_sample_rejected_status(
02387 const DDS::SampleRejectedStatus& status)
02388 {
02389
02390 sample_rejected_status_ = status;
02391 }
02392
02393 void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
02394 SubscriptionInstance*&)
02395 {
02396 if (DCPS_debug_level > 0) {
02397 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02398 }
02399 }
02400
02401 void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
02402 {
02403 StatsMapType::iterator location
02404 = this->statistics_.find(sample.header_.publication_id_);
02405
02406 if (location != this->statistics_.end()) {
02407
02408 ACE_Time_Value latency = ACE_OS::gettimeofday();
02409
02410
02411 DDS::Duration_t then = {
02412 sample.header_.source_timestamp_sec_,
02413 sample.header_.source_timestamp_nanosec_
02414 };
02415
02416
02417 latency -= duration_to_time_value(then);
02418
02419 if (this->statistics_enabled()) {
02420 location->second.add_stat(latency);
02421 }
02422
02423 if (DCPS_debug_level > 9) {
02424 ACE_DEBUG((LM_DEBUG,
02425 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02426 ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02427 latency.sec(),
02428 latency.msec()));
02429 }
02430
02431
02432 if (time_value_to_duration(latency)
02433 > this->qos_.latency_budget.duration) {
02434 this->notify_latency(sample.header_.publication_id_);
02435 }
02436
02437 } else if (DCPS_debug_level > 0) {
02438
02439
02440
02441
02442
02443
02444
02445
02446 GuidConverter reader_converter(subscription_id_);
02447 GuidConverter writer_converter(sample.header_.publication_id_);
02448 ACE_DEBUG((LM_DEBUG,
02449 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02450 ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02451 OPENDDS_STRING(reader_converter).c_str(),
02452 OPENDDS_STRING(writer_converter).c_str()));
02453 }
02454 }
02455
02456 void DataReaderImpl::notify_latency(PublicationId writer)
02457 {
02458
02459
02460 DataReaderListener_var listener
02461 = DataReaderListener::_narrow(this->listener_.in());
02462
02463 if (!CORBA::is_nil(listener.in())) {
02464 WriterIdSeq writerIds;
02465 writerIds.length(1);
02466 writerIds[ 0] = writer;
02467
02468 DDS::InstanceHandleSeq handles;
02469 this->lookup_instance_handles(writerIds, handles);
02470
02471 if (handles.length() >= 1) {
02472 this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02473
02474 } else {
02475 this->budget_exceeded_status_.last_instance_handle = -1;
02476 }
02477
02478 ++this->budget_exceeded_status_.total_count;
02479 ++this->budget_exceeded_status_.total_count_change;
02480
02481 listener->on_budget_exceeded(
02482 this->dr_local_objref_.in(),
02483 this->budget_exceeded_status_);
02484
02485 this->budget_exceeded_status_.total_count_change = 0;
02486 }
02487 }
02488
02489 #ifndef OPENDDS_SAFETY_PROFILE
02490 void
02491 DataReaderImpl::get_latency_stats(
02492 OpenDDS::DCPS::LatencyStatisticsSeq & stats)
02493 {
02494 stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02495 int index = 0;
02496
02497 for (StatsMapType::const_iterator current = this->statistics_.begin();
02498 current != this->statistics_.end();
02499 ++current, ++index) {
02500 stats[ index] = current->second.get_stats();
02501 stats[ index].publication = current->first;
02502 }
02503 }
02504 #endif
02505
02506 void
02507 DataReaderImpl::reset_latency_stats()
02508 {
02509 for (StatsMapType::iterator current = this->statistics_.begin();
02510 current != this->statistics_.end();
02511 ++current) {
02512 current->second.reset_stats();
02513 }
02514 }
02515
02516 CORBA::Boolean
02517 DataReaderImpl::statistics_enabled()
02518 {
02519 return this->statistics_enabled_;
02520 }
02521
02522 void
02523 DataReaderImpl::statistics_enabled(
02524 CORBA::Boolean statistics_enabled)
02525 {
02526 this->statistics_enabled_ = statistics_enabled;
02527 }
02528
02529 void
02530 DataReaderImpl::prepare_to_delete()
02531 {
02532 this->set_deleted(true);
02533 this->stop_associating();
02534 this->send_final_acks();
02535 }
02536
02537 SubscriptionInstance*
02538 DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02539 {
02540 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, 0);
02541
02542 SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02543 if (iter == instances_.end()) {
02544 ACE_DEBUG((LM_WARNING,
02545 ACE_TEXT("(%P|%t) WARNING: ")
02546 ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02547 ACE_TEXT("lookup for 0x%x failed\n"),
02548 handle));
02549 return 0;
02550 }
02551
02552 return iter->second;
02553 }
02554
02555 DDS::InstanceHandle_t
02556 DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
02557 {
02558 if (is_bit()) {
02559 Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02560 CORBA::String_var topic = get_topic_name();
02561 RepoId id = disc->bit_key_to_repo_id(participant_servant_, topic, key);
02562 return participant_servant_->id_to_handle(id);
02563
02564 } else {
02565 return participant_servant_->id_to_handle(GUID_UNKNOWN);
02566 }
02567 }
02568
02569 void
02570 DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
02571 {
02572 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02573
02574
02575
02576 DataReaderListener_var the_listener
02577 = DataReaderListener::_narrow(this->listener_.in());
02578
02579 if (!CORBA::is_nil(the_listener.in())) {
02580 SubscriptionLostStatus status;
02581
02582
02583
02584 this->lookup_instance_handles(pubids, status.publication_handles);
02585 the_listener->on_subscription_disconnected(this->dr_local_objref_.in(),
02586 status);
02587 }
02588 }
02589
02590 void
02591 DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
02592 {
02593 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02594
02595 if (!this->is_bit_) {
02596
02597
02598 DataReaderListener_var the_listener
02599 = DataReaderListener::_narrow(this->listener_.in());
02600
02601 if (!CORBA::is_nil(the_listener.in())) {
02602 SubscriptionLostStatus status;
02603
02604
02605
02606 if (this->lookup_instance_handles(pubids, status.publication_handles) == false) {
02607 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::notify_subscription_reconnected: "
02608 "lookup_instance_handles failed.\n"));
02609 }
02610
02611 the_listener->on_subscription_reconnected(this->dr_local_objref_.in(),
02612 status);
02613 }
02614 }
02615 }
02616
02617 void
02618 DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
02619 {
02620 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02621
02622 if (!this->is_bit_) {
02623
02624
02625 DataReaderListener_var the_listener
02626 = DataReaderListener::_narrow(this->listener_.in());
02627
02628 if (!CORBA::is_nil(the_listener.in())) {
02629 SubscriptionLostStatus status;
02630
02631 CORBA::ULong len = handles.length();
02632 status.publication_handles.length(len);
02633
02634 for (CORBA::ULong i = 0; i < len; ++ i) {
02635 status.publication_handles[i] = handles[i];
02636 }
02637
02638 the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02639 status);
02640 }
02641 }
02642 }
02643
02644 void
02645 DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
02646 {
02647 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02648
02649
02650
02651 DataReaderListener_var the_listener
02652 = DataReaderListener::_narrow(this->listener_.in());
02653
02654 if (!CORBA::is_nil(the_listener.in())) {
02655 SubscriptionLostStatus status;
02656
02657
02658
02659 this->lookup_instance_handles(pubids, status.publication_handles);
02660 the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02661 status);
02662 }
02663 }
02664
02665 void
02666 DataReaderImpl::notify_connection_deleted(const RepoId& peerId)
02667 {
02668 DBG_ENTRY_LVL("DataReaderImpl","notify_connection_deleted",6);
02669 on_notification_of_connection_deletion(peerId);
02670
02671
02672 DataReaderListener_var the_listener = DataReaderListener::_narrow(this->listener_.in());
02673
02674 if (!CORBA::is_nil(the_listener.in()))
02675 the_listener->on_connection_deleted(this->dr_local_objref_.in());
02676 }
02677
02678 bool
02679 DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
02680 DDS::InstanceHandleSeq & hdls)
02681 {
02682 if (DCPS_debug_level > 9) {
02683 CORBA::ULong const size = ids.length();
02684 const char* separator = "";
02685 OPENDDS_STRING guids;
02686
02687 for (unsigned long i = 0; i < size; ++i) {
02688 guids += separator;
02689 guids += OPENDDS_STRING(GuidConverter(ids[i]));
02690 separator = ", ";
02691 }
02692
02693 ACE_DEBUG((LM_DEBUG,
02694 ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02695 ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02696 guids.c_str()));
02697 }
02698
02699 CORBA::ULong const num_wrts = ids.length();
02700 hdls.length(num_wrts);
02701
02702 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02703 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02704 }
02705
02706 return true;
02707 }
02708
02709 bool
02710 DataReaderImpl::filter_sample(const DataSampleHeader& header)
02711 {
02712 ACE_Time_Value now(ACE_OS::gettimeofday());
02713
02714
02715 if (!always_get_history_ && header.historic_sample_
02716 && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02717 if (DCPS_debug_level >= 8) {
02718 ACE_DEBUG((LM_DEBUG,
02719 ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02720 ACE_TEXT("Discarded historic data.\n")));
02721 }
02722
02723 return true;
02724 }
02725
02726
02727
02728 if (header.lifespan_duration_) {
02729
02730
02731 DDS::Time_t const tmp = {
02732 header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02733 header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02734 };
02735
02736
02737
02738 ACE_Time_Value const expiration_time(
02739 OpenDDS::DCPS::time_to_time_value(tmp));
02740
02741 if (now >= expiration_time) {
02742 if (DCPS_debug_level >= 8) {
02743 ACE_Time_Value const diff(now - expiration_time);
02744 ACE_DEBUG((LM_DEBUG,
02745 ACE_TEXT("OpenDDS (%P|%t) Received data ")
02746 ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02747 diff.sec(),
02748 diff.usec()));
02749 }
02750
02751 return true;
02752 }
02753 }
02754
02755 return false;
02756 }
02757
02758 bool
02759 DataReaderImpl::filter_instance(SubscriptionInstance* instance,
02760 const PublicationId& pubid)
02761 {
02762 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02763 if (this->is_exclusive_ownership_) {
02764
02765 WriterMapType::iterator iter = writers_.find(pubid);
02766
02767 if (iter == writers_.end()) {
02768 if (DCPS_debug_level > 4) {
02769
02770
02771
02772 GuidConverter reader_converter(subscription_id_);
02773 GuidConverter writer_converter(pubid);
02774 ACE_DEBUG((LM_DEBUG,
02775 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02776 ACE_TEXT("reader %C is not associated with writer %C.\n"),
02777 OPENDDS_STRING(reader_converter).c_str(),
02778 OPENDDS_STRING(writer_converter).c_str()));
02779 }
02780 return true;
02781 }
02782
02783
02784
02785
02786 if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02787 || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02788 bool is_owner = this->owner_manager_->select_owner (
02789 instance->instance_handle_,
02790 iter->second->writer_id_,
02791 iter->second->writer_qos_.ownership_strength.value,
02792 &instance->instance_state_);
02793 iter->second->set_owner_evaluated (instance->instance_handle_, true);
02794
02795 if (! is_owner) {
02796 if (DCPS_debug_level >= 1) {
02797 GuidConverter reader_converter(subscription_id_);
02798 GuidConverter writer_converter(pubid);
02799 GuidConverter owner_converter (instance->instance_state_.get_owner ());
02800 ACE_DEBUG((LM_DEBUG,
02801 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02802 ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02803 OPENDDS_STRING(reader_converter).c_str(),
02804 OPENDDS_STRING(writer_converter).c_str(),
02805 OPENDDS_STRING(owner_converter).c_str()));
02806 }
02807 return true;
02808 }
02809 }
02810 else if (! (instance->instance_state_.get_owner () == pubid)) {
02811 if (DCPS_debug_level >= 1) {
02812 GuidConverter reader_converter(subscription_id_);
02813 GuidConverter writer_converter(pubid);
02814 GuidConverter owner_converter (instance->instance_state_.get_owner ());
02815 ACE_DEBUG((LM_DEBUG,
02816 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02817 ACE_TEXT("reader %C writer %C is not owner %C\n"),
02818 OPENDDS_STRING(reader_converter).c_str(),
02819 OPENDDS_STRING(writer_converter).c_str(),
02820 OPENDDS_STRING(owner_converter).c_str()));
02821 }
02822 return true;
02823 }
02824 }
02825 #else
02826 ACE_UNUSED_ARG(pubid);
02827 #endif
02828
02829 ACE_Time_Value now(ACE_OS::gettimeofday());
02830
02831
02832
02833 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02834
02835 if (this->qos_.time_based_filter.minimum_separation > zero) {
02836 DDS::Duration_t separation =
02837 time_value_to_duration(now - instance->last_accepted_);
02838
02839 if (separation < this->qos_.time_based_filter.minimum_separation) {
02840 return true;
02841 }
02842 }
02843
02844 instance->last_accepted_ = now;
02845
02846 return false;
02847 }
02848
02849 bool DataReaderImpl::is_bit() const
02850 {
02851 return this->is_bit_;
02852 }
02853
02854 int
02855 DataReaderImpl::num_zero_copies()
02856 {
02857 int loans = 0;
02858 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02859 guard,
02860 this->sample_lock_,
02861 1 );
02862 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,1);
02863
02864 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02865 iter != instances_.end();
02866 ++iter) {
02867 SubscriptionInstance *ptr = iter->second;
02868
02869 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02870 item != 0; item = item->next_data_sample_) {
02871 loans += item->zero_copy_cnt_.value();
02872 }
02873 }
02874
02875 return loans;
02876 }
02877
02878 void DataReaderImpl::notify_liveliness_change()
02879 {
02880
02881
02882
02883 DDS::DataReaderListener_var listener
02884 = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02885
02886 if (!CORBA::is_nil(listener.in())) {
02887 listener->on_liveliness_changed(dr_local_objref_.in(),
02888 liveliness_changed_status_);
02889
02890 liveliness_changed_status_.alive_count_change = 0;
02891 liveliness_changed_status_.not_alive_count_change = 0;
02892 }
02893 notify_status_condition();
02894
02895 if (DCPS_debug_level > 9) {
02896 OPENDDS_STRING output_str;
02897 output_str += "subscription ";
02898 output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02899 output_str += ", listener at: 0x";
02900 output_str += to_dds_string(this->listener_.in ());
02901
02902 for (WriterMapType::iterator current = this->writers_.begin();
02903 current != this->writers_.end();
02904 ++current) {
02905 RepoId id = current->first;
02906 output_str += "\n\tNOTIFY: writer[ ";
02907 output_str += OPENDDS_STRING(GuidConverter(id));
02908 output_str += "] == ";
02909 output_str += current->second->get_state_str();
02910 }
02911
02912 output_str + "\n";
02913 ACE_DEBUG((LM_DEBUG,
02914 ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02915 ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02916 ACE_TEXT("\tNOTIFY: %C\n"),
02917 listener.in (),
02918 listener_mask_,
02919 output_str.c_str()));
02920 }
02921 }
02922
02923 void DataReaderImpl::post_read_or_take()
02924 {
02925 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02926 get_subscriber_servant()->set_status_changed_flag(
02927 DDS::DATA_ON_READERS_STATUS, false);
02928 }
02929
02930 void DataReaderImpl::reschedule_deadline()
02931 {
02932 if (this->watchdog_) {
02933 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02934 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02935 iter != this->instances_.end();
02936 ++iter) {
02937 if (iter->second->deadline_timer_id_ != -1) {
02938 if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02939 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02940 ACE_TEXT("reset_timer_interval")));
02941 }
02942 }
02943 }
02944 }
02945 }
02946
02947 ACE_Reactor_Timer_Interface*
02948 DataReaderImpl::get_reactor()
02949 {
02950 return this->reactor_;
02951 }
02952
02953 OpenDDS::DCPS::RepoId
02954 DataReaderImpl::get_topic_id()
02955 {
02956 return this->topic_servant_->get_id();
02957 }
02958
02959 OpenDDS::DCPS::RepoId
02960 DataReaderImpl::get_dp_id()
02961 {
02962 return this->participant_servant_->get_id();
02963 }
02964
02965 void
02966 DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02967 {
02968 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02969 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02970
02971 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02972 end = instances_.end(); iter != end; ++iter) {
02973 instance_handles.push_back(iter->first);
02974 }
02975 }
02976
02977 void
02978 DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
02979 {
02980 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02981 read_guard,
02982 this->writers_lock_);
02983 for (WriterMapType::iterator iter = writers_.begin();
02984 iter != writers_.end();
02985 ++iter) {
02986 writer_states.push_back(WriterStatePair(iter->first,
02987 iter->second->get_state()));
02988 }
02989 }
02990
02991 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02992 void
02993 DataReaderImpl::update_ownership_strength (const PublicationId& pub_id,
02994 const CORBA::Long& ownership_strength)
02995 {
02996 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02997 read_guard,
02998 this->writers_lock_);
02999 for (WriterMapType::iterator iter = writers_.begin();
03000 iter != writers_.end();
03001 ++iter) {
03002 if (iter->second->writer_id_ == pub_id) {
03003 if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
03004 if (DCPS_debug_level >= 1) {
03005 GuidConverter reader_converter(this->subscription_id_);
03006 GuidConverter writer_converter(pub_id);
03007 ACE_DEBUG((LM_DEBUG,
03008 ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
03009 ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
03010 OPENDDS_STRING(reader_converter).c_str(),
03011 OPENDDS_STRING(writer_converter).c_str(),
03012 iter->second->writer_qos_.ownership_strength, ownership_strength));
03013 }
03014 iter->second->writer_qos_.ownership_strength.value = ownership_strength;
03015 iter->second->clear_owner_evaluated ();
03016 }
03017 break;
03018 }
03019 }
03020 }
03021 #endif
03022
03023 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03024 bool DataReaderImpl::verify_coherent_changes_completion (WriterInfo* writer)
03025 {
03026 if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
03027 || ! this->subqos_.presentation.coherent_access) {
03028 this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03029 this->coherent_changes_completed (this);
03030 return true;
03031 }
03032
03033
03034 Coherent_State state = writer->coherent_change_received();
03035 if (writer->group_coherent_) {
03036 if (state != NOT_COMPLETED_YET) {
03037
03038 this->subscriber_servant_->coherent_change_received (
03039 writer->publisher_id_, this, state);
03040 }
03041 }
03042 else {
03043 if (state == COMPLETED) {
03044 this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03045 }
03046 else if (state == REJECTED) {
03047 this->reject_coherent (writer->writer_id_, writer->publisher_id_);
03048 }
03049 else {
03050 return false;
03051 }
03052
03053
03054 writer->reset_coherent_info ();
03055 }
03056
03057 return state == COMPLETED;
03058 }
03059
03060
03061 void DataReaderImpl::accept_coherent (PublicationId& writer_id,
03062 RepoId& publisher_id)
03063 {
03064 if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03065 GuidConverter reader (this->subscription_id_);
03066 GuidConverter writer (writer_id);
03067 GuidConverter publisher (publisher_id);
03068 ACE_DEBUG((LM_DEBUG,
03069 ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
03070 ACE_TEXT(" reader %C writer %C publisher %C \n"),
03071 OPENDDS_STRING(reader).c_str(),
03072 OPENDDS_STRING(writer).c_str(),
03073 OPENDDS_STRING(publisher).c_str()));
03074 }
03075
03076 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03077 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03078
03079 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03080 iter != this->instances_.end(); ++iter) {
03081 iter->second->rcvd_strategy_->accept_coherent(
03082 writer_id, publisher_id);
03083 }
03084 }
03085
03086
03087 void DataReaderImpl::reject_coherent (PublicationId& writer_id,
03088 RepoId& publisher_id)
03089 {
03090 if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03091 GuidConverter reader (this->subscription_id_);
03092 GuidConverter writer (writer_id);
03093 GuidConverter publisher (publisher_id);
03094 ACE_DEBUG((LM_DEBUG,
03095 ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
03096 ACE_TEXT(" reader %C writer %C publisher %C \n"),
03097 OPENDDS_STRING(reader).c_str(),
03098 OPENDDS_STRING(writer).c_str(),
03099 OPENDDS_STRING(publisher).c_str()));
03100 }
03101
03102 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03103 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03104
03105 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03106 iter != this->instances_.end(); ++iter) {
03107 iter->second->rcvd_strategy_->reject_coherent(
03108 writer_id, publisher_id);
03109 }
03110 this->reset_coherent_info (writer_id, publisher_id);
03111 }
03112
03113
03114 void DataReaderImpl::reset_coherent_info (const PublicationId& writer_id,
03115 const RepoId& publisher_id)
03116 {
03117 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03118
03119 WriterMapType::iterator itEnd = this->writers_.end();
03120 for (WriterMapType::iterator it = this->writers_.begin();
03121 it != itEnd; ++it) {
03122 if (it->second->writer_id_ == writer_id
03123 && it->second->publisher_id_ == publisher_id) {
03124 it->second->reset_coherent_info();
03125 }
03126 }
03127 }
03128
03129
03130 void
03131 DataReaderImpl::coherent_change_received (RepoId publisher_id, Coherent_State& result)
03132 {
03133 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03134
03135 result = COMPLETED;
03136 for (WriterMapType::iterator iter = writers_.begin();
03137 iter != writers_.end();
03138 ++iter) {
03139
03140 if (iter->second->publisher_id_ == publisher_id) {
03141 Coherent_State state = iter->second->coherent_change_received();
03142 if (state == NOT_COMPLETED_YET) {
03143 result = state;
03144 break;
03145 }
03146 else if (state == REJECTED) {
03147 result = REJECTED;
03148 }
03149 }
03150 }
03151 }
03152
03153
03154 void
03155 DataReaderImpl::coherent_changes_completed (DataReaderImpl* reader)
03156 {
03157 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03158 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03159
03160 ::DDS::SubscriberListener_var sub_listener =
03161 this->subscriber_servant_->listener_for(::DDS::DATA_ON_READERS_STATUS);
03162 if (!CORBA::is_nil(sub_listener.in()))
03163 {
03164 if (reader == this) {
03165
03166 ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03167 sub_listener->on_data_on_readers(this->subscriber_servant_);
03168 }
03169
03170 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03171 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03172 }
03173 else
03174 {
03175 this->subscriber_servant_->notify_status_condition();
03176
03177 ::DDS::DataReaderListener_var listener =
03178 this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03179
03180 if (!CORBA::is_nil(listener.in()))
03181 {
03182 if (reader == this) {
03183
03184 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03185 listener->on_data_available(dr_local_objref_.in ());
03186 }
03187 else {
03188 listener->on_data_available(dr_local_objref_.in ());
03189 }
03190
03191 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03192 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03193 }
03194 else
03195 {
03196 this->notify_status_condition();
03197 }
03198 }
03199 }
03200
03201
03202 void DataReaderImpl::begin_access()
03203 {
03204 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03205 this->coherent_ = true;
03206 }
03207
03208
03209 void DataReaderImpl::end_access()
03210 {
03211 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03212 this->coherent_ = false;
03213 this->group_coherent_ordered_data_.reset();
03214 this->post_read_or_take();
03215 }
03216
03217
03218 void DataReaderImpl::get_ordered_data (GroupRakeData& data,
03219 DDS::SampleStateMask sample_states,
03220 DDS::ViewStateMask view_states,
03221 DDS::InstanceStateMask instance_states)
03222 {
03223 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03224 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03225
03226 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03227 iter != instances_.end(); ++iter) {
03228 SubscriptionInstance *ptr = iter->second;
03229 if ((ptr->instance_state_.view_state() & view_states) &&
03230 (ptr->instance_state_.instance_state() & instance_states)) {
03231 size_t i(0);
03232 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03233 item != 0; item = item->next_data_sample_) {
03234 if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03235 data.insert_sample(item, ptr, ++i);
03236 this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03237 }
03238 }
03239 }
03240 }
03241 }
03242
03243 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
03244
03245 void
03246 DataReaderImpl::set_subscriber_qos(
03247 const DDS::SubscriberQos & qos)
03248 {
03249 this->subqos_ = qos;
03250 }
03251
03252 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
03253 void
03254 DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
03255 {
03256 cft->update_reader_count(true);
03257 cft->add_reader(*this);
03258 content_filtered_topic_ = DDS::ContentFilteredTopic::_duplicate(cft);
03259 }
03260
03261 DDS::ContentFilteredTopic_ptr
03262 DataReaderImpl::get_cf_topic() const
03263 {
03264 return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_);
03265 }
03266 #endif
03267
03268 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
03269
03270 void
03271 DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
03272 {
03273 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03274 disco->update_subscription_params(participant_servant_->get_domain_id(),
03275 participant_servant_->get_id(),
03276 subscription_id_,
03277 params);
03278 }
03279 #endif
03280
03281 void
03282 DataReaderImpl::reset_ownership (::DDS::InstanceHandle_t instance)
03283 {
03284 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03285 for (WriterMapType::iterator iter = writers_.begin();
03286 iter != writers_.end();
03287 ++iter) {
03288 iter->second->set_owner_evaluated(instance, false);
03289 }
03290 }
03291
03292 void
03293 DataReaderImpl::resume_sample_processing(const PublicationId& pub_id)
03294 {
03295 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03296 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03297 WriterMapType::iterator where = writers_.find(pub_id);
03298 if (writers_.end() != where) {
03299 WriterInfo& info = *where->second;
03300
03301 if (info.waiting_for_end_historic_samples_) {
03302 end_historic_sweeper_->cancel_timer(where->second);
03303 if (!info.historic_samples_.empty()) {
03304 info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03305 }
03306 to_deliver.swap(info.historic_samples_);
03307 write_guard.release();
03308 deliver_historic(to_deliver);
03309 }
03310 }
03311 }
03312
03313 bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
03314 {
03315 ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03316 WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03317 if (iter != writers_.end()) {
03318 const SequenceNumber& seq = sample.header_.sequence_;
03319 if (iter->second->waiting_for_end_historic_samples_) {
03320 iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03321 return false;
03322 }
03323 if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03324 && !sample.header_.historic_sample_
03325 && seq <= iter->second->last_historic_seq_) {
03326
03327 return false;
03328 }
03329 }
03330 return true;
03331 }
03332
03333 void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
03334 {
03335 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03336 const iter_t end = samples.end();
03337 for (iter_t iter = samples.begin(); iter != end; ++iter) {
03338 iter->second.header_.historic_sample_ = true;
03339 data_received(iter->second);
03340 }
03341 }
03342
03343 void
03344 DataReaderImpl::add_link(const DataLink_rch& link, const RepoId& peer)
03345 {
03346 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03347
03348 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03349
03350 WriterMapType::iterator it = writers_.find(peer);
03351 if (it != writers_.end()) {
03352
03353
03354 end_historic_sweeper_->schedule_timer(it->second);
03355 }
03356 }
03357 TransportClient::add_link(link, peer);
03358 TransportImpl_rch impl = link->impl();
03359 OPENDDS_STRING type = impl->transport_type();
03360
03361 if (type == "rtps_udp" || type == "multicast") {
03362 resume_sample_processing(peer);
03363 }
03364 }
03365
03366 void
03367 DataReaderImpl::register_for_writer(const RepoId& participant,
03368 const RepoId& readerid,
03369 const RepoId& writerid,
03370 const TransportLocatorSeq& locators,
03371 DiscoveryListener* listener)
03372 {
03373 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03374 }
03375
03376 void
03377 DataReaderImpl::unregister_for_writer(const RepoId& participant,
03378 const RepoId& readerid,
03379 const RepoId& writerid)
03380 {
03381 TransportClient::unregister_for_writer(participant, readerid, writerid);
03382 }
03383
03384 EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
03385 ACE_thread_t owner,
03386 DataReaderImpl* reader)
03387 : ReactorInterceptor (reactor, owner)
03388 , reader_(reader)
03389 { }
03390
03391 EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
03392 { }
03393
03394 void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03395 {
03396 info->waiting_for_end_historic_samples_ = true;
03397 ScheduleCommand c(this, info);
03398 execute_or_enqueue(c);
03399 }
03400
03401 void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
03402 {
03403 info->waiting_for_end_historic_samples_ = false;
03404 CancelCommand c(this, info);
03405 execute_or_enqueue(c);
03406 }
03407
03408 int EndHistoricSamplesMissedSweeper::handle_timeout(
03409 const ACE_Time_Value& ,
03410 const void* arg)
03411 {
03412 PublicationId pub_id = reinterpret_cast<const WriterInfo*>(arg)->writer_id_;
03413
03414 if (DCPS_debug_level >= 1) {
03415 GuidConverter sub_repo(reader_->get_repo_id());
03416 GuidConverter pub_repo(pub_id);
03417 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
03418 OPENDDS_STRING(sub_repo).c_str(),
03419 OPENDDS_STRING(pub_repo).c_str()));
03420 }
03421
03422 reader_->resume_sample_processing(pub_id);
03423 return 0;
03424 }
03425
03426 void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
03427 {
03428 static const ACE_Time_Value ten_seconds(10);
03429
03430
03431 const void* arg = reinterpret_cast<const void*>(info_.in());
03432 info_->_add_ref();
03433
03434 info_->historic_samples_timer_ = sweeper_->reactor()->schedule_timer(sweeper_,
03435 arg,
03436 ten_seconds);
03437 if (DCPS_debug_level) {
03438 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", info_->historic_samples_timer_));
03439 }
03440 }
03441
03442 void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
03443 {
03444 if (info_->historic_samples_timer_ != WriterInfo::NO_TIMER) {
03445 sweeper_->reactor()->cancel_timer(info_->historic_samples_timer_);
03446 if (DCPS_debug_level) {
03447 ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", info_->historic_samples_timer_));
03448 }
03449 info_->historic_samples_timer_ = WriterInfo::NO_TIMER;
03450 info_->_remove_ref();
03451 }
03452 }
03453
03454 }
03455 }