00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "SubscriptionInstance.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Service_Participant.h"
00014 #include "Qos_Helper.h"
00015 #include "FeatureDisabledQosCheck.h"
00016 #include "GuidConverter.h"
00017 #include "TopicImpl.h"
00018 #include "Serializer.h"
00019 #include "SubscriberImpl.h"
00020 #include "Transient_Kludge.h"
00021 #include "Util.h"
00022 #include "RequestedDeadlineWatchdog.h"
00023 #include "QueryConditionImpl.h"
00024 #include "ReadConditionImpl.h"
00025 #include "MonitorFactory.h"
00026 #include "dds/DCPS/transport/framework/EntryExit.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028 #include "dds/DdsDcpsCoreC.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/SafetyProfileStreams.h"
00031 #if !defined (DDS_HAS_MINIMUM_BIT)
00032 #include "BuiltInTopicUtils.h"
00033 #include "dds/DdsDcpsCoreTypeSupportC.h"
00034 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "RecorderImpl.h"
00036 #include "PoolAllocator.h"
00037
00038 #include "ace/Reactor.h"
00039 #include "ace/Auto_Ptr.h"
00040 #include "ace/Condition_Recursive_Thread_Mutex.h"
00041
00042 #include <stdexcept>
00043
00044 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00045
00046 namespace OpenDDS {
00047 namespace DCPS {
00048
00049 RecorderImpl::RecorderImpl()
00050 : qos_(TheServiceParticipant->initial_DataReaderQos()),
00051 participant_servant_(0),
00052 topic_servant_(0),
00053 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00054 is_exclusive_ownership_ (false),
00055 #endif
00056 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00057 owner_manager_ (0),
00058 #endif
00059 subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00060 topic_desc_(0),
00061 listener_mask_(DEFAULT_STATUS_MASK),
00062 domain_id_(0),
00063 remove_association_sweeper_(
00064 make_rch<RemoveAssociationSweeper<RecorderImpl> >(TheServiceParticipant->reactor(),
00065 TheServiceParticipant->reactor_owner(),
00066 this)),
00067 is_bit_(false)
00068 {
00069
00070 requested_incompatible_qos_status_.total_count = 0;
00071 requested_incompatible_qos_status_.total_count_change = 0;
00072 requested_incompatible_qos_status_.last_policy_id = 0;
00073 requested_incompatible_qos_status_.policies.length(0);
00074
00075 subscription_match_status_.total_count = 0;
00076 subscription_match_status_.total_count_change = 0;
00077 subscription_match_status_.current_count = 0;
00078 subscription_match_status_.current_count_change = 0;
00079 subscription_match_status_.last_publication_handle =
00080 DDS::HANDLE_NIL;
00081
00082 }
00083
00084
00085
00086 RecorderImpl::~RecorderImpl()
00087 {
00088 DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00089 {
00090 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00091 read_guard,
00092 this->writers_lock_);
00093
00094 WriterMapType::iterator writer;
00095 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00096 remove_association_sweeper_->cancel_timer(writer->second);
00097 }
00098 }
00099
00100 remove_association_sweeper_->wait();
00101 }
00102
00103
00104 DDS::ReturnCode_t
00105 RecorderImpl::cleanup()
00106 {
00107
00108 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00109 if (!disco->remove_subscription(this->domain_id_,
00110 participant_servant_->get_id(),
00111 this->subscription_id_)) {
00112 ACE_ERROR_RETURN((LM_ERROR,
00113 ACE_TEXT("(%P|%t) ERROR: ")
00114 ACE_TEXT("RecorderImpl::cleanup: ")
00115 ACE_TEXT(" could not remove subscription from discovery.\n")),
00116 DDS::RETCODE_ERROR);
00117 }
00118
00119
00120
00121
00122 this->remove_all_associations();
00123
00124 {
00125 ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00126 read_guard,
00127 this->writers_lock_,
00128 0);
00129
00130 WriterMapType::iterator writer;
00131 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00132 remove_association_sweeper_->cancel_timer(writer->second);
00133 }
00134 }
00135
00136 remove_association_sweeper_->wait();
00137 return DDS::RETCODE_OK;
00138 }
00139
00140 void RecorderImpl::init(
00141 TopicDescriptionImpl* a_topic_desc,
00142 const DDS::DataReaderQos & qos,
00143 RecorderListener_rch a_listener,
00144 const DDS::StatusMask & mask,
00145 DomainParticipantImpl* participant,
00146 DDS::SubscriberQos subqos)
00147 {
00148
00149 if (DCPS_debug_level >= 1) {
00150
00151 ACE_DEBUG((LM_DEBUG,
00152 ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00153 }
00154
00155
00156 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00157 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00158 topic_servant_ = a_topic;
00159 }
00160
00161 CORBA::String_var topic_name = a_topic_desc->get_name();
00162
00163 #if !defined (DDS_HAS_MINIMUM_BIT)
00164 is_bit_ = topicIsBIT(topic_name.in(), a_topic_desc->get_type_name());
00165 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00166
00167 qos_ = qos;
00168
00169 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00170 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00171 #endif
00172
00173 listener_ = a_listener;
00174 listener_mask_ = mask;
00175
00176
00177
00178 participant_servant_ = participant;
00179
00180 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00181 if (is_exclusive_ownership_) {
00182 owner_manager_ = participant_servant_->ownership_manager ();
00183 }
00184 #endif
00185
00186 domain_id_ = participant_servant_->get_domain_id();
00187 subqos_ = subqos;
00188 }
00189
00190 bool RecorderImpl::check_transport_qos(const TransportInst& ti)
00191 {
00192 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00193 return ti.is_reliable();
00194 }
00195 return true;
00196 }
00197
00198 const RepoId& RecorderImpl::get_repo_id() const
00199 {
00200 return this->subscription_id_;
00201 }
00202
00203 CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
00204 {
00205 return data.publication_transport_priority_;
00206 }
00207
00208
00209 void RecorderImpl::data_received(const ReceivedDataSample& sample)
00210 {
00211 DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00212
00213
00214
00215 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00216
00217 if (DCPS_debug_level > 9) {
00218 GuidConverter converter(subscription_id_);
00219 ACE_DEBUG((LM_DEBUG,
00220 ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00221 ACE_TEXT("%C received sample: %C.\n"),
00222 OPENDDS_STRING(converter).c_str(),
00223 to_string(sample.header_).c_str()));
00224 }
00225
00226
00227 if (sample.header_.message_id_ != SAMPLE_DATA)
00228 return;
00229
00230 RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00231 sample.header_.source_timestamp_sec_,
00232 sample.header_.source_timestamp_nanosec_,
00233 sample.header_.publication_id_,
00234 sample.header_.byte_order_,
00235 sample.sample_.get());
00236
00237 if (listener_.in()) {
00238 listener_->on_sample_data_received(this, rawSample);
00239 }
00240
00241 }
00242
00243 void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
00244 {
00245 }
00246
00247 void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
00248 {
00249
00250 }
00251
00252 void
00253 RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
00254 {
00255 }
00256
00257 void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
00258 {
00259
00260 }
00261
00262
00263 void
00264 RecorderImpl::add_association(const RepoId& yourId,
00265 const WriterAssociation& writer,
00266 bool active)
00267 {
00268 ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00269
00270
00271
00272 if (DCPS_debug_level >= 1) {
00273 GuidConverter reader_converter(yourId);
00274 GuidConverter writer_converter(writer.writerId);
00275 ACE_DEBUG((LM_DEBUG,
00276 ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00277 ACE_TEXT("bit %d local %C remote %C\n"),
00278 is_bit_,
00279 OPENDDS_STRING(reader_converter).c_str(),
00280 OPENDDS_STRING(writer_converter).c_str()));
00281 }
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301 if (GUID_UNKNOWN == subscription_id_) {
00302
00303 subscription_id_ = yourId;
00304 }
00305
00306
00307
00308
00309 {
00310 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00311
00312
00313
00314
00315
00316
00317 {
00318 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00319
00320 const PublicationId& writer_id = writer.writerId;
00321 RcHandle<WriterInfo> info ( make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos));
00322
00323 this->writers_.insert(
00324
00325 WriterMapType::value_type(
00326 writer_id,
00327 info));
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356 }
00357
00358
00359
00360
00361
00362
00363
00364 AssociationData data;
00365 data.remote_id_ = writer.writerId;
00366 data.remote_data_ = writer.writerTransInfo;
00367 data.publication_transport_priority_ =
00368 writer.writerQos.transport_priority.value;
00369 data.remote_reliable_ =
00370 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00371 data.remote_durable_ =
00372 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00373
00374 if (!this->associate(data, active)) {
00375 if (DCPS_debug_level) {
00376 ACE_ERROR((LM_ERROR,
00377 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00378 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00379 }
00380 return;
00381 }
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433 if (!is_bit_) {
00434
00435 DDS::InstanceHandle_t handle =
00436 this->participant_servant_->id_to_handle(writer.writerId);
00437
00438
00439
00440
00441
00442 {
00443 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00444
00445
00446 this->id_to_handle_map_.insert(
00447 RepoIdToHandleMap::value_type(writer.writerId, handle));
00448
00449 if (DCPS_debug_level > 4) {
00450 GuidConverter converter(writer.writerId);
00451 ACE_DEBUG((LM_DEBUG,
00452 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00453 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00454 OPENDDS_STRING(converter).c_str(),
00455 handle));
00456 }
00457
00458
00459
00460
00461 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00462 this->subscription_match_status_.current_count_change
00463 = matchedPublications - this->subscription_match_status_.current_count;
00464 this->subscription_match_status_.current_count = matchedPublications;
00465
00466 ++this->subscription_match_status_.total_count;
00467 ++this->subscription_match_status_.total_count_change;
00468
00469 this->subscription_match_status_.last_publication_handle = handle;
00470
00471
00472
00473
00474 if (listener_.in()) {
00475 listener_->on_recorder_matched(
00476 this,
00477 this->subscription_match_status_);
00478
00479
00480
00481
00482
00483 this->subscription_match_status_.total_count_change = 0;
00484 this->subscription_match_status_.current_count_change = 0;
00485 }
00486
00487
00488 }
00489
00490 {
00491 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00492
00493 this->writers_[writer.writerId]->handle_ = handle;
00494 }
00495 }
00496
00497 if (!active) {
00498 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00499 disco->association_complete(this->domain_id_,
00500 this->participant_servant_->get_id(),
00501 this->subscription_id_, writer.writerId);
00502 }
00503
00504
00505
00506
00507 }
00508
00509 void
00510 RecorderImpl::association_complete(const RepoId& )
00511 {
00512
00513
00514 }
00515
00516 void
00517 RecorderImpl::remove_associations(const WriterIdSeq& writers,
00518 bool notify_lost)
00519 {
00520 DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
00521 if (writers.length() == 0) {
00522 return;
00523 }
00524
00525 if (DCPS_debug_level >= 1) {
00526 GuidConverter reader_converter(subscription_id_);
00527 GuidConverter writer_converter(writers[0]);
00528 ACE_DEBUG((LM_DEBUG,
00529 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
00530 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00531 is_bit_,
00532 OPENDDS_STRING(reader_converter).c_str(),
00533 OPENDDS_STRING(writer_converter).c_str(),
00534 writers.length()));
00535 }
00536 if (!this->entity_deleted_.value()) {
00537
00538 this->stop_associating(writers.get_buffer(), writers.length());
00539
00540
00541
00542 WriterIdSeq non_active_writers;
00543 {
00544 CORBA::ULong wr_len = writers.length();
00545 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00546
00547 for (CORBA::ULong i = 0; i < wr_len; i++) {
00548 PublicationId writer_id = writers[i];
00549
00550 WriterMapType::iterator it = this->writers_.find(writer_id);
00551 if (it != this->writers_.end() &&
00552 it->second->active()) {
00553 remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00554 } else {
00555 push_back(non_active_writers, writer_id);
00556 }
00557 }
00558 }
00559 remove_associations_i(non_active_writers, notify_lost);
00560 } else {
00561 remove_associations_i(writers, notify_lost);
00562 }
00563 }
00564
00565 void
00566 RecorderImpl::remove_publication(const PublicationId& pub_id)
00567 {
00568 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00569 WriterMapType::iterator where = writers_.find(pub_id);
00570 if (writers_.end() != where) {
00571 WriterInfo& info = *where->second;
00572 WriterIdSeq writers;
00573 push_back(writers, pub_id);
00574 bool notify = info.notify_lost_;
00575 write_guard.release();
00576 remove_associations_i(writers, notify);
00577 }
00578 }
00579
00580 void
00581 RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
00582 bool notify_lost)
00583 {
00584 DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00585
00586 if (writers.length() == 0) {
00587 return;
00588 }
00589
00590 if (DCPS_debug_level >= 1) {
00591 GuidConverter reader_converter(subscription_id_);
00592 GuidConverter writer_converter(writers[0]);
00593 ACE_DEBUG((LM_DEBUG,
00594 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00595 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00596 is_bit_,
00597 OPENDDS_STRING(reader_converter).c_str(),
00598 OPENDDS_STRING(writer_converter).c_str(),
00599 writers.length()));
00600 }
00601 DDS::InstanceHandleSeq handles;
00602
00603 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00604
00605
00606
00607
00608 WriterIdSeq updated_writers;
00609
00610 CORBA::ULong wr_len;
00611
00612
00613
00614
00615
00616 {
00617 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00618
00619 wr_len = writers.length();
00620
00621 for (CORBA::ULong i = 0; i < wr_len; i++) {
00622 PublicationId writer_id = writers[i];
00623
00624 WriterMapType::iterator it = this->writers_.find(writer_id);
00625
00626 if (it != this->writers_.end()) {
00627 it->second->removed();
00628 remove_association_sweeper_->cancel_timer(it->second);
00629 }
00630
00631 if (this->writers_.erase(writer_id) == 0) {
00632 if (DCPS_debug_level >= 1) {
00633 GuidConverter converter(writer_id);
00634 ACE_DEBUG((LM_DEBUG,
00635 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00636 ACE_TEXT("the writer local %C was already removed.\n"),
00637 OPENDDS_STRING(converter).c_str()));
00638 }
00639
00640 } else {
00641 push_back(updated_writers, writer_id);
00642 }
00643 }
00644 }
00645
00646 wr_len = updated_writers.length();
00647
00648
00649 if (wr_len == 0) {
00650 return;
00651 }
00652
00653 if (!is_bit_) {
00654
00655
00656 this->lookup_instance_handles(updated_writers, handles);
00657
00658 for (CORBA::ULong i = 0; i < wr_len; ++i) {
00659 id_to_handle_map_.erase(updated_writers[i]);
00660 }
00661 }
00662 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00663 this->disassociate(updated_writers[i]);
00664 }
00665
00666
00667 if (!this->is_bit_) {
00668
00669 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00670 this->subscription_match_status_.current_count_change
00671 = matchedPublications - this->subscription_match_status_.current_count;
00672
00673
00674 if (this->subscription_match_status_.current_count_change != 0) {
00675 this->subscription_match_status_.current_count = matchedPublications;
00676
00677
00678
00679 this->subscription_match_status_.last_publication_handle
00680 = handles[ wr_len - 1];
00681
00682
00683
00684
00685
00686
00687 if (listener_.in()) {
00688 listener_->on_recorder_matched(
00689 this,
00690 this->subscription_match_status_);
00691
00692
00693 this->subscription_match_status_.total_count_change = 0;
00694 this->subscription_match_status_.current_count_change = 0;
00695 }
00696
00697
00698 }
00699 }
00700
00701
00702
00703
00704 if (notify_lost) {
00705 this->notify_subscription_lost(handles);
00706 }
00707
00708
00709
00710
00711 }
00712
00713 void
00714 RecorderImpl::remove_all_associations()
00715 {
00716 DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00717
00718 OpenDDS::DCPS::WriterIdSeq writers;
00719 int size;
00720
00721 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00722
00723 {
00724 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00725
00726 size = static_cast<int>(writers_.size());
00727 writers.length(size);
00728
00729 WriterMapType::iterator curr_writer = writers_.begin();
00730 WriterMapType::iterator end_writer = writers_.end();
00731
00732 int i = 0;
00733
00734 while (curr_writer != end_writer) {
00735 writers[i++] = curr_writer->first;
00736 ++curr_writer;
00737 }
00738 }
00739
00740 try {
00741 CORBA::Boolean dont_notify_lost = 0;
00742
00743 if (0 < size) {
00744 remove_associations(writers, dont_notify_lost);
00745 }
00746
00747 } catch (const CORBA::Exception&) {
00748 }
00749 }
00750
00751 void
00752 RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00753 {
00754
00755
00756 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00757 guard,
00758 this->publication_handle_lock_);
00759
00760 if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00761
00762 return;
00763 }
00764
00765
00766
00767
00768
00769 requested_incompatible_qos_status_.total_count = status.total_count;
00770 requested_incompatible_qos_status_.total_count_change +=
00771 status.count_since_last_send;
00772 requested_incompatible_qos_status_.last_policy_id =
00773 status.last_policy_id;
00774 requested_incompatible_qos_status_.policies = status.policies;
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789 }
00790
00791 void
00792 RecorderImpl::inconsistent_topic()
00793 {
00794 topic_servant_->inconsistent_topic();
00795 }
00796
00797 void
00798 RecorderImpl::signal_liveliness(const RepoId& remote_participant)
00799 {
00800 RepoId prefix = remote_participant;
00801 prefix.entityId = EntityId_t();
00802
00803 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00804
00805 typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00806 typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00807 WriterSet writers;
00808
00809 {
00810 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00811 for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00812 limit = writers_.end();
00813 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00814 ++pos) {
00815 writers.push_back(std::make_pair(pos->first, pos->second));
00816 }
00817 }
00818
00819 ACE_Time_Value when = ACE_OS::gettimeofday();
00820 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00821 pos != limit;
00822 ++pos) {
00823 pos->second->received_activity(when);
00824 }
00825 }
00826
00827 DDS::ReturnCode_t RecorderImpl::set_qos(
00828 const DDS::SubscriberQos & subscriber_qos,
00829 const DDS::DataReaderQos & qos)
00830 {
00831
00832 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
00833
00834 if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
00835 if (subqos_ != subscriber_qos) {
00836
00837 if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) {
00838 return DDS::RETCODE_IMMUTABLE_POLICY;
00839
00840 } else {
00841 subqos_ = subscriber_qos;
00842 }
00843 }
00844 } else {
00845 return DDS::RETCODE_INCONSISTENT_POLICY;
00846 }
00847
00848 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00849 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00850 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851
00852 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00853 if (qos_ == qos)
00854 return DDS::RETCODE_OK;
00855
00856 if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) {
00857 return DDS::RETCODE_IMMUTABLE_POLICY;
00858
00859 } else {
00860 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00861 const bool status =
00862 disco->update_subscription_qos(
00863 this->participant_servant_->get_domain_id(),
00864 this->participant_servant_->get_id(),
00865 this->subscription_id_,
00866 qos,
00867 subscriber_qos);
00868 if (!status) {
00869 ACE_ERROR_RETURN((LM_ERROR,
00870 ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ")
00871 ACE_TEXT("qos not updated. \n")),
00872 DDS::RETCODE_ERROR);
00873 }
00874 }
00875
00876 qos_ = qos;
00877 subqos_ = subscriber_qos;
00878
00879 return DDS::RETCODE_OK;
00880
00881 } else {
00882 return DDS::RETCODE_INCONSISTENT_POLICY;
00883 }
00884 }
00885
00886 DDS::ReturnCode_t
00887 RecorderImpl::get_qos(
00888 DDS::SubscriberQos & subscriber_qos,
00889 DDS::DataReaderQos & qos)
00890 {
00891 qos = qos_;
00892 subscriber_qos = subqos_;
00893 return DDS::RETCODE_OK;
00894 }
00895
00896 DDS::ReturnCode_t
00897 RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
00898 DDS::StatusMask mask)
00899 {
00900 listener_mask_ = mask;
00901
00902 listener_ = a_listener;
00903 return DDS::RETCODE_OK;
00904 }
00905
00906 RecorderListener_rch
00907 RecorderImpl::get_listener()
00908 {
00909 return listener_;
00910 }
00911
00912 void
00913 RecorderImpl::lookup_instance_handles(const WriterIdSeq& ids,
00914 DDS::InstanceHandleSeq & hdls)
00915 {
00916 CORBA::ULong const num_wrts = ids.length();
00917
00918 if (DCPS_debug_level > 9) {
00919 OPENDDS_STRING separator = "";
00920 OPENDDS_STRING buffer;
00921
00922 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00923 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00924 separator = ", ";
00925 }
00926
00927 ACE_DEBUG((LM_DEBUG,
00928 ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00929 ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00930 buffer.c_str()));
00931 }
00932
00933 hdls.length(num_wrts);
00934
00935 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00936 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00937 }
00938 }
00939
00940 DDS::ReturnCode_t
00941 RecorderImpl::enable()
00942 {
00943 if (DCPS_debug_level >= 1) {
00944
00945 ACE_DEBUG((LM_DEBUG,
00946 ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00947 }
00948
00949
00950
00951
00952
00953
00954 if (this->is_enabled()) {
00955 return DDS::RETCODE_OK;
00956 }
00957
00958 this->set_enabled();
00959
00960
00961 if (topic_servant_) {
00962
00963 ACE_DEBUG((LM_DEBUG,
00964 ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00965
00966 try {
00967 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00968 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00969 } catch (const Transport::Exception&) {
00970 ACE_ERROR((LM_ERROR,
00971 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00972 ACE_TEXT("Transport Exception.\n")));
00973 return DDS::RETCODE_ERROR;
00974
00975 }
00976
00977 const TransportLocatorSeq& trans_conf_info = this->connection_info();
00978
00979 CORBA::String_var filterClassName = "";
00980 CORBA::String_var filterExpression = "";
00981 DDS::StringSeq exprParams;
00982
00983
00984 Discovery_rch disco =
00985 TheServiceParticipant->get_discovery(this->domain_id_);
00986
00987 ACE_DEBUG((LM_DEBUG,
00988 ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
00989
00990 this->subscription_id_ =
00991 disco->add_subscription(this->domain_id_,
00992 this->participant_servant_->get_id(),
00993 this->topic_servant_->get_id(),
00994 this,
00995 this->qos_,
00996 trans_conf_info,
00997 this->subqos_,
00998 filterClassName,
00999 filterExpression,
01000 exprParams);
01001
01002 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01003 ACE_ERROR((LM_ERROR,
01004 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01005 ACE_TEXT("add_subscription returned invalid id.\n")));
01006 return DDS::RETCODE_ERROR;
01007 }
01008 }
01009
01010 if (topic_servant_) {
01011 const CORBA::String_var name = topic_servant_->get_name();
01012 DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01013
01014
01015 return return_value;
01016 } else {
01017 return DDS::RETCODE_OK;
01018 }
01019 }
01020
01021 DDS::InstanceHandle_t
01022 RecorderImpl::get_instance_handle()
01023 {
01024 return this->participant_servant_->id_to_handle(subscription_id_);
01025 }
01026
01027 void
01028 RecorderImpl::register_for_writer(const RepoId& participant,
01029 const RepoId& readerid,
01030 const RepoId& writerid,
01031 const TransportLocatorSeq& locators,
01032 DiscoveryListener* listener)
01033 {
01034 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01035 }
01036
01037 void
01038 RecorderImpl::unregister_for_writer(const RepoId& participant,
01039 const RepoId& readerid,
01040 const RepoId& writerid)
01041 {
01042 TransportClient::unregister_for_writer(participant, readerid, writerid);
01043 }
01044
01045 #if !defined (DDS_HAS_MINIMUM_BIT)
01046 DDS::ReturnCode_t
01047 RecorderImpl::repoid_to_bit_key(const DCPS::RepoId& id,
01048 DDS::BuiltinTopicKey_t& key)
01049 {
01050 DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01051
01052 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01053 guard,
01054 this->publication_handle_lock_,
01055 DDS::RETCODE_ERROR);
01056
01057 DDS::PublicationBuiltinTopicDataSeq data;
01058
01059 DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01060 participant_servant_,
01061 BUILT_IN_PUBLICATION_TOPIC,
01062 publication_handle,
01063 data);
01064
01065 if (ret == DDS::RETCODE_OK) {
01066 key = data[0].key;
01067 }
01068
01069 return ret;
01070 }
01071 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01072
01073 }
01074 }
01075
01076 OPENDDS_END_VERSIONED_NAMESPACE_DECL