00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "SubscriptionInstance.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Service_Participant.h"
00014 #include "Qos_Helper.h"
00015 #include "FeatureDisabledQosCheck.h"
00016 #include "GuidConverter.h"
00017 #include "TopicImpl.h"
00018 #include "Serializer.h"
00019 #include "SubscriberImpl.h"
00020 #include "Transient_Kludge.h"
00021 #include "Util.h"
00022 #include "RequestedDeadlineWatchdog.h"
00023 #include "QueryConditionImpl.h"
00024 #include "ReadConditionImpl.h"
00025 #include "MonitorFactory.h"
00026 #include "dds/DCPS/transport/framework/EntryExit.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028 #include "dds/DdsDcpsCoreC.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/SafetyProfileStreams.h"
00031 #if !defined (DDS_HAS_MINIMUM_BIT)
00032 #include "BuiltInTopicUtils.h"
00033 #include "dds/DdsDcpsCoreTypeSupportC.h"
00034 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "RecorderImpl.h"
00036 #include "PoolAllocator.h"
00037
00038 #include "ace/Reactor.h"
00039 #include "ace/Auto_Ptr.h"
00040
00041 #include <stdexcept>
00042
00043
00044 namespace OpenDDS {
00045 namespace DCPS {
00046
00047 RecorderImpl::RecorderImpl()
00048 : qos_(TheServiceParticipant->initial_DataReaderQos()),
00049 participant_servant_(0),
00050 topic_servant_(0),
00051 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00052 is_exclusive_ownership_ (false),
00053 #endif
00054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00055 owner_manager_ (0),
00056 #endif
00057 subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00058 topic_desc_(0),
00059 listener_mask_(DEFAULT_STATUS_MASK),
00060 domain_id_(0),
00061 remove_association_sweeper_(
00062 new RemoveAssociationSweeper<RecorderImpl>(TheServiceParticipant->reactor(),
00063 TheServiceParticipant->reactor_owner(),
00064 this)),
00065 is_bit_(false)
00066 {
00067
00068 requested_incompatible_qos_status_.total_count = 0;
00069 requested_incompatible_qos_status_.total_count_change = 0;
00070 requested_incompatible_qos_status_.last_policy_id = 0;
00071 requested_incompatible_qos_status_.policies.length(0);
00072
00073 subscription_match_status_.total_count = 0;
00074 subscription_match_status_.total_count_change = 0;
00075 subscription_match_status_.current_count = 0;
00076 subscription_match_status_.current_count_change = 0;
00077 subscription_match_status_.last_publication_handle =
00078 DDS::HANDLE_NIL;
00079
00080 }
00081
00082
00083
00084 RecorderImpl::~RecorderImpl()
00085 {
00086 DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00087 {
00088 ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00089 read_guard,
00090 this->writers_lock_);
00091
00092 WriterMapType::iterator writer;
00093 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00094 remove_association_sweeper_->cancel_timer(writer->second);
00095 }
00096 }
00097
00098 remove_association_sweeper_->wait();
00099 remove_association_sweeper_->destroy();
00100 }
00101
00102
00103 DDS::ReturnCode_t
00104 RecorderImpl::cleanup()
00105 {
00106
00107 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00108 if (!disco->remove_subscription(this->domain_id_,
00109 participant_servant_->get_id(),
00110 this->subscription_id_)) {
00111 ACE_ERROR_RETURN((LM_ERROR,
00112 ACE_TEXT("(%P|%t) ERROR: ")
00113 ACE_TEXT("RecorderImpl::cleanup: ")
00114 ACE_TEXT(" could not remove subscription from discovery.\n")),
00115 DDS::RETCODE_ERROR);
00116 }
00117
00118
00119
00120
00121 this->remove_all_associations();
00122
00123 if (topic_servant_) {
00124 topic_servant_->remove_entity_ref();
00125 topic_servant_->_remove_ref();
00126 }
00127 {
00128 ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00129 read_guard,
00130 this->writers_lock_,
00131 0);
00132
00133 WriterMapType::iterator writer;
00134 for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00135 remove_association_sweeper_->cancel_timer(writer->second);
00136 }
00137 }
00138
00139 remove_association_sweeper_->wait();
00140 return DDS::RETCODE_OK;
00141 }
00142
00143 void RecorderImpl::init(
00144 TopicDescriptionImpl* a_topic_desc,
00145 const DDS::DataReaderQos & qos,
00146 RecorderListener_rch a_listener,
00147 const DDS::StatusMask & mask,
00148 DomainParticipantImpl* participant,
00149 DDS::SubscriberQos subqos)
00150 {
00151
00152 if (DCPS_debug_level >= 1) {
00153
00154 ACE_DEBUG((LM_DEBUG,
00155 ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00156 }
00157
00158
00159 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00160 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00161 topic_servant_ = a_topic;
00162 topic_servant_->_add_ref();
00163
00164 topic_servant_->add_entity_ref();
00165 }
00166
00167 CORBA::String_var topic_name = a_topic_desc->get_name();
00168
00169 #if !defined (DDS_HAS_MINIMUM_BIT)
00170 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00171 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00172 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00173 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00174 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00175
00176 qos_ = qos;
00177
00178 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00179 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00180 #endif
00181
00182 listener_ = a_listener;
00183 listener_mask_ = mask;
00184
00185
00186
00187 participant_servant_ = participant;
00188
00189 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00190 if (is_exclusive_ownership_) {
00191 owner_manager_ = participant_servant_->ownership_manager ();
00192 }
00193 #endif
00194
00195 domain_id_ = participant_servant_->get_domain_id();
00196 subqos_ = subqos;
00197 }
00198
00199 bool RecorderImpl::check_transport_qos(const TransportInst& ti)
00200 {
00201 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00202 return ti.is_reliable();
00203 }
00204 return true;
00205 }
00206
00207 const RepoId& RecorderImpl::get_repo_id() const
00208 {
00209 return this->subscription_id_;
00210 }
00211
00212 CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
00213 {
00214 return data.publication_transport_priority_;
00215 }
00216
00217
00218 void RecorderImpl::data_received(const ReceivedDataSample& sample)
00219 {
00220 DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00221
00222
00223
00224 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00225
00226 if (DCPS_debug_level > 9) {
00227 GuidConverter converter(subscription_id_);
00228 ACE_DEBUG((LM_DEBUG,
00229 ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00230 ACE_TEXT("%C received sample: %C.\n"),
00231 OPENDDS_STRING(converter).c_str(),
00232 to_string(sample.header_).c_str()));
00233 }
00234
00235
00236 if (sample.header_.message_id_ != SAMPLE_DATA)
00237 return;
00238
00239 RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00240 sample.header_.source_timestamp_sec_,
00241 sample.header_.source_timestamp_nanosec_,
00242 sample.header_.publication_id_,
00243 sample.header_.byte_order_,
00244 sample.sample_);
00245
00246 if (listener_.in()) {
00247 listener_->on_sample_data_received(this, rawSample);
00248 }
00249
00250 }
00251
00252 void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
00253 {
00254 }
00255
00256 void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
00257 {
00258
00259 }
00260
00261 void
00262 RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
00263 {
00264 }
00265
00266 void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
00267 {
00268
00269 }
00270
00271 void RecorderImpl::notify_connection_deleted(const RepoId&)
00272 {
00273
00274 }
00275
00276
00277
00278 void
00279 RecorderImpl::add_association(const RepoId& yourId,
00280 const WriterAssociation& writer,
00281 bool active)
00282 {
00283 ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00284
00285
00286
00287 if (DCPS_debug_level >= 1) {
00288 GuidConverter reader_converter(yourId);
00289 GuidConverter writer_converter(writer.writerId);
00290 ACE_DEBUG((LM_DEBUG,
00291 ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00292 ACE_TEXT("bit %d local %C remote %C\n"),
00293 is_bit_,
00294 OPENDDS_STRING(reader_converter).c_str(),
00295 OPENDDS_STRING(writer_converter).c_str()));
00296 }
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316 if (GUID_UNKNOWN == subscription_id_) {
00317
00318 subscription_id_ = yourId;
00319 }
00320
00321
00322
00323
00324 {
00325 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00326
00327
00328
00329
00330
00331
00332 {
00333 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00334
00335 const PublicationId& writer_id = writer.writerId;
00336 RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337
00338 this->writers_.insert(
00339
00340 WriterMapType::value_type(
00341 writer_id,
00342 info));
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 }
00372
00373
00374
00375
00376
00377
00378
00379 AssociationData data;
00380 data.remote_id_ = writer.writerId;
00381 data.remote_data_ = writer.writerTransInfo;
00382 data.publication_transport_priority_ =
00383 writer.writerQos.transport_priority.value;
00384 data.remote_reliable_ =
00385 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00386 data.remote_durable_ =
00387 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00388
00389 if (!this->associate(data, active)) {
00390 if (DCPS_debug_level) {
00391 ACE_DEBUG((LM_ERROR,
00392 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00393 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00394 }
00395 return;
00396 }
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439 }
00440
00441
00442
00443
00444
00445
00446
00447
00448 if (!is_bit_) {
00449
00450 DDS::InstanceHandle_t handle =
00451 this->participant_servant_->id_to_handle(writer.writerId);
00452
00453
00454
00455
00456
00457 {
00458 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00459
00460
00461 this->id_to_handle_map_.insert(
00462 RepoIdToHandleMap::value_type(writer.writerId, handle));
00463
00464 if (DCPS_debug_level > 4) {
00465 GuidConverter converter(writer.writerId);
00466 ACE_DEBUG((LM_DEBUG,
00467 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00468 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00469 OPENDDS_STRING(converter).c_str(),
00470 handle));
00471 }
00472
00473
00474
00475
00476 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00477 this->subscription_match_status_.current_count_change
00478 = matchedPublications - this->subscription_match_status_.current_count;
00479 this->subscription_match_status_.current_count = matchedPublications;
00480
00481 ++this->subscription_match_status_.total_count;
00482 ++this->subscription_match_status_.total_count_change;
00483
00484 this->subscription_match_status_.last_publication_handle = handle;
00485
00486
00487
00488
00489 if (listener_.in()) {
00490 listener_->on_recorder_matched(
00491 this,
00492 this->subscription_match_status_);
00493
00494
00495
00496
00497
00498 this->subscription_match_status_.total_count_change = 0;
00499 this->subscription_match_status_.current_count_change = 0;
00500 }
00501
00502
00503 }
00504
00505 {
00506 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00507
00508 this->writers_[writer.writerId]->handle_ = handle;
00509 }
00510 }
00511
00512 if (!active) {
00513 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00514 disco->association_complete(this->domain_id_,
00515 this->participant_servant_->get_id(),
00516 this->subscription_id_, writer.writerId);
00517 }
00518
00519
00520
00521
00522 }
00523
00524 void
00525 RecorderImpl::association_complete(const RepoId& )
00526 {
00527
00528
00529 }
00530
00531 void
00532 RecorderImpl::remove_associations(const WriterIdSeq& writers,
00533 bool notify_lost)
00534 {
00535 DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
00536 if (writers.length() == 0) {
00537 return;
00538 }
00539
00540 if (DCPS_debug_level >= 1) {
00541 GuidConverter reader_converter(subscription_id_);
00542 GuidConverter writer_converter(writers[0]);
00543 ACE_DEBUG((LM_DEBUG,
00544 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
00545 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00546 is_bit_,
00547 OPENDDS_STRING(reader_converter).c_str(),
00548 OPENDDS_STRING(writer_converter).c_str(),
00549 writers.length()));
00550 }
00551 if (!this->entity_deleted_.value()) {
00552
00553 this->stop_associating(writers.get_buffer(), writers.length());
00554
00555
00556
00557 WriterIdSeq non_active_writers;
00558 {
00559 CORBA::ULong wr_len = writers.length();
00560 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00561
00562 for (CORBA::ULong i = 0; i < wr_len; i++) {
00563 PublicationId writer_id = writers[i];
00564
00565 WriterMapType::iterator it = this->writers_.find(writer_id);
00566 if (it != this->writers_.end() &&
00567 it->second->active(TheServiceParticipant->pending_timeout())) {
00568 remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00569 } else {
00570 push_back(non_active_writers, writer_id);
00571 }
00572 }
00573 }
00574 remove_associations_i(non_active_writers, notify_lost);
00575 } else {
00576 remove_associations_i(writers, notify_lost);
00577 }
00578 }
00579
00580 void
00581 RecorderImpl::remove_or_reschedule(const PublicationId& pub_id)
00582 {
00583 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584 WriterMapType::iterator where = writers_.find(pub_id);
00585 if (writers_.end() != where) {
00586 WriterInfo& info = *where->second;
00587 WriterIdSeq writers;
00588 push_back(writers, pub_id);
00589 bool notify = info.notify_lost_;
00590 if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591 write_guard.release();
00592 remove_associations_i(writers, notify);
00593 } else {
00594 write_guard.release();
00595 remove_associations(writers, notify);
00596 }
00597 }
00598 }
00599
00600 void
00601 RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
00602 bool notify_lost)
00603 {
00604 DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00605
00606 if (writers.length() == 0) {
00607 return;
00608 }
00609
00610 if (DCPS_debug_level >= 1) {
00611 GuidConverter reader_converter(subscription_id_);
00612 GuidConverter writer_converter(writers[0]);
00613 ACE_DEBUG((LM_DEBUG,
00614 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00615 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616 is_bit_,
00617 OPENDDS_STRING(reader_converter).c_str(),
00618 OPENDDS_STRING(writer_converter).c_str(),
00619 writers.length()));
00620 }
00621 DDS::InstanceHandleSeq handles;
00622
00623 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00624
00625
00626
00627
00628 WriterIdSeq updated_writers;
00629
00630 CORBA::ULong wr_len;
00631
00632
00633
00634
00635
00636 {
00637 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638
00639 wr_len = writers.length();
00640
00641 for (CORBA::ULong i = 0; i < wr_len; i++) {
00642 PublicationId writer_id = writers[i];
00643
00644 WriterMapType::iterator it = this->writers_.find(writer_id);
00645
00646 if (it != this->writers_.end()) {
00647 it->second->removed();
00648 remove_association_sweeper_->cancel_timer(it->second);
00649 }
00650
00651 if (this->writers_.erase(writer_id) == 0) {
00652 if (DCPS_debug_level >= 1) {
00653 GuidConverter converter(writer_id);
00654 ACE_DEBUG((LM_DEBUG,
00655 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00656 ACE_TEXT("the writer local %C was already removed.\n"),
00657 OPENDDS_STRING(converter).c_str()));
00658 }
00659
00660 } else {
00661 push_back(updated_writers, writer_id);
00662 }
00663 }
00664 }
00665
00666 wr_len = updated_writers.length();
00667
00668
00669 if (wr_len == 0) {
00670 return;
00671 }
00672
00673 if (!is_bit_) {
00674
00675
00676 if (this->lookup_instance_handles(updated_writers, handles) == false) {
00677 if (DCPS_debug_level > 4) {
00678 ACE_DEBUG((LM_DEBUG,
00679 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00680 ACE_TEXT("lookup_instance_handles failed.\n")));
00681 }
00682 }
00683 for (CORBA::ULong i = 0; i < wr_len; ++i) {
00684 id_to_handle_map_.erase(updated_writers[i]);
00685 }
00686 }
00687 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00688 this->disassociate(updated_writers[i]);
00689 }
00690
00691
00692 if (!this->is_bit_) {
00693
00694 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00695 this->subscription_match_status_.current_count_change
00696 = matchedPublications - this->subscription_match_status_.current_count;
00697
00698
00699 if (this->subscription_match_status_.current_count_change != 0) {
00700 this->subscription_match_status_.current_count = matchedPublications;
00701
00702
00703
00704 this->subscription_match_status_.last_publication_handle
00705 = handles[ wr_len - 1];
00706
00707
00708
00709
00710
00711
00712 if (listener_.in()) {
00713 listener_->on_recorder_matched(
00714 this,
00715 this->subscription_match_status_);
00716
00717
00718 this->subscription_match_status_.total_count_change = 0;
00719 this->subscription_match_status_.current_count_change = 0;
00720 }
00721
00722
00723 }
00724 }
00725
00726
00727
00728
00729 if (notify_lost) {
00730 this->notify_subscription_lost(handles);
00731 }
00732
00733
00734
00735
00736 }
00737
00738 void
00739 RecorderImpl::remove_all_associations()
00740 {
00741 DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00742
00743 OpenDDS::DCPS::WriterIdSeq writers;
00744 int size;
00745
00746 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00747
00748 {
00749 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750
00751 size = static_cast<int>(writers_.size());
00752 writers.length(size);
00753
00754 WriterMapType::iterator curr_writer = writers_.begin();
00755 WriterMapType::iterator end_writer = writers_.end();
00756
00757 int i = 0;
00758
00759 while (curr_writer != end_writer) {
00760 writers[i++] = curr_writer->first;
00761 ++curr_writer;
00762 }
00763 }
00764
00765 try {
00766 CORBA::Boolean dont_notify_lost = 0;
00767
00768 if (0 < size) {
00769 remove_associations(writers, dont_notify_lost);
00770 }
00771
00772 } catch (const CORBA::Exception&) {
00773 }
00774 }
00775
00776 void
00777 RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00778 {
00779
00780
00781 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00782 guard,
00783 this->publication_handle_lock_);
00784
00785 if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00786
00787 return;
00788 }
00789
00790
00791
00792
00793
00794 requested_incompatible_qos_status_.total_count = status.total_count;
00795 requested_incompatible_qos_status_.total_count_change +=
00796 status.count_since_last_send;
00797 requested_incompatible_qos_status_.last_policy_id =
00798 status.last_policy_id;
00799 requested_incompatible_qos_status_.policies = status.policies;
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814 }
00815
00816 void
00817 RecorderImpl::inconsistent_topic()
00818 {
00819 topic_servant_->inconsistent_topic();
00820 }
00821
00822 void
00823 RecorderImpl::signal_liveliness(const RepoId& remote_participant)
00824 {
00825 RepoId prefix = remote_participant;
00826 prefix.entityId = EntityId_t();
00827
00828 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00829
00830 typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00831 typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00832 WriterSet writers;
00833
00834 {
00835 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00836 for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00837 limit = writers_.end();
00838 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00839 ++pos) {
00840 writers.push_back(std::make_pair(pos->first, pos->second));
00841 }
00842 }
00843
00844 ACE_Time_Value when = ACE_OS::gettimeofday();
00845 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00846 pos != limit;
00847 ++pos) {
00848 pos->second->received_activity(when);
00849 }
00850 }
00851
00852 DDS::ReturnCode_t RecorderImpl::set_qos(
00853 const DDS::SubscriberQos & subscriber_qos,
00854 const DDS::DataReaderQos & qos)
00855 {
00856
00857 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
00858
00859 if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
00860 if (subqos_ != subscriber_qos) {
00861
00862 if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) {
00863 return DDS::RETCODE_IMMUTABLE_POLICY;
00864
00865 } else {
00866 subqos_ = subscriber_qos;
00867 }
00868 }
00869 } else {
00870 return DDS::RETCODE_INCONSISTENT_POLICY;
00871 }
00872
00873 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00874 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00875 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00876
00877 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00878 if (qos_ == qos)
00879 return DDS::RETCODE_OK;
00880
00881 if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) {
00882 return DDS::RETCODE_IMMUTABLE_POLICY;
00883
00884 } else {
00885 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00886 const bool status =
00887 disco->update_subscription_qos(
00888 this->participant_servant_->get_domain_id(),
00889 this->participant_servant_->get_id(),
00890 this->subscription_id_,
00891 qos,
00892 subscriber_qos);
00893 if (!status) {
00894 ACE_ERROR_RETURN((LM_ERROR,
00895 ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ")
00896 ACE_TEXT("qos not updated. \n")),
00897 DDS::RETCODE_ERROR);
00898 }
00899 }
00900
00901 qos_ = qos;
00902 subqos_ = subscriber_qos;
00903
00904 return DDS::RETCODE_OK;
00905
00906 } else {
00907 return DDS::RETCODE_INCONSISTENT_POLICY;
00908 }
00909 }
00910
00911 DDS::ReturnCode_t
00912 RecorderImpl::get_qos(
00913 DDS::SubscriberQos & subscriber_qos,
00914 DDS::DataReaderQos & qos)
00915 {
00916 qos = qos_;
00917 subscriber_qos = subqos_;
00918 return DDS::RETCODE_OK;
00919 }
00920
00921 DDS::ReturnCode_t
00922 RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
00923 DDS::StatusMask mask)
00924 {
00925 listener_mask_ = mask;
00926
00927 listener_ = a_listener;
00928 return DDS::RETCODE_OK;
00929 }
00930
00931 RecorderListener_rch
00932 RecorderImpl::get_listener()
00933 {
00934 return listener_;
00935 }
00936
00937 bool
00938 RecorderImpl::lookup_instance_handles(const WriterIdSeq& ids,
00939 DDS::InstanceHandleSeq & hdls)
00940 {
00941 if (DCPS_debug_level > 9) {
00942 CORBA::ULong const size = ids.length();
00943 OPENDDS_STRING separator = "";
00944 OPENDDS_STRING buffer;
00945
00946 for (unsigned long i = 0; i < size; ++i) {
00947 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00948 separator = ", ";
00949 }
00950
00951 ACE_DEBUG((LM_DEBUG,
00952 ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00953 ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00954 buffer.c_str()));
00955 }
00956
00957 CORBA::ULong const num_wrts = ids.length();
00958 hdls.length(num_wrts);
00959
00960 for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00961 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00962 }
00963
00964 return true;
00965 }
00966
00967 DDS::ReturnCode_t
00968 RecorderImpl::enable()
00969 {
00970 if (DCPS_debug_level >= 1) {
00971
00972 ACE_DEBUG((LM_DEBUG,
00973 ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00974 }
00975
00976
00977
00978
00979
00980
00981 if (this->is_enabled()) {
00982 return DDS::RETCODE_OK;
00983 }
00984
00985 this->set_enabled();
00986
00987
00988 if (topic_servant_) {
00989
00990 ACE_DEBUG((LM_DEBUG,
00991 ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00992
00993 try {
00994 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00995 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00996 } catch (const Transport::Exception&) {
00997 ACE_ERROR((LM_ERROR,
00998 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00999 ACE_TEXT("Transport Exception.\n")));
01000 return DDS::RETCODE_ERROR;
01001
01002 }
01003
01004 const TransportLocatorSeq& trans_conf_info = this->connection_info();
01005
01006 CORBA::String_var filterClassName = "";
01007 CORBA::String_var filterExpression = "";
01008 DDS::StringSeq exprParams;
01009
01010
01011 Discovery_rch disco =
01012 TheServiceParticipant->get_discovery(this->domain_id_);
01013
01014 ACE_DEBUG((LM_DEBUG,
01015 ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
01016
01017 this->subscription_id_ =
01018 disco->add_subscription(this->domain_id_,
01019 this->participant_servant_->get_id(),
01020 this->topic_servant_->get_id(),
01021 this,
01022 this->qos_,
01023 trans_conf_info,
01024 this->subqos_,
01025 filterClassName,
01026 filterExpression,
01027 exprParams);
01028
01029 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01030 ACE_ERROR((LM_ERROR,
01031 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01032 ACE_TEXT("add_subscription returned invalid id.\n")));
01033 return DDS::RETCODE_ERROR;
01034 }
01035 }
01036
01037 if (topic_servant_) {
01038 const CORBA::String_var name = topic_servant_->get_name();
01039 DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01040
01041
01042 return return_value;
01043 } else {
01044 return DDS::RETCODE_OK;
01045 }
01046 }
01047
01048 DDS::InstanceHandle_t
01049 RecorderImpl::get_instance_handle()
01050 {
01051 return this->participant_servant_->id_to_handle(subscription_id_);
01052 }
01053
01054 void
01055 RecorderImpl::register_for_writer(const RepoId& participant,
01056 const RepoId& readerid,
01057 const RepoId& writerid,
01058 const TransportLocatorSeq& locators,
01059 DiscoveryListener* listener)
01060 {
01061 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01062 }
01063
01064 void
01065 RecorderImpl::unregister_for_writer(const RepoId& participant,
01066 const RepoId& readerid,
01067 const RepoId& writerid)
01068 {
01069 TransportClient::unregister_for_writer(participant, readerid, writerid);
01070 }
01071
01072 #if !defined (DDS_HAS_MINIMUM_BIT)
01073 DDS::ReturnCode_t
01074 RecorderImpl::repoid_to_bit_key(const DCPS::RepoId& id,
01075 DDS::BuiltinTopicKey_t& key)
01076 {
01077 DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01078
01079 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01080 guard,
01081 this->publication_handle_lock_,
01082 DDS::RETCODE_ERROR);
01083
01084 BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01085 DDS::PublicationBuiltinTopicDataDataReader_var,
01086 DDS::PublicationBuiltinTopicDataSeq > hh;
01087
01088 DDS::PublicationBuiltinTopicDataSeq data;
01089
01090 DDS::ReturnCode_t ret
01091 = hh.instance_handle_to_bit_data(participant_servant_,
01092 BUILT_IN_PUBLICATION_TOPIC,
01093 publication_handle,
01094 data);
01095
01096 if (ret == DDS::RETCODE_OK) {
01097 key = data[0].key;
01098 }
01099
01100 return ret;
01101 }
01102 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01103
01104 }
01105 }