00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ReplayerImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "SendStateDataSampleList.h"
00018 #include "DataSampleElement.h"
00019 #include "Serializer.h"
00020 #include "Transient_Kludge.h"
00021 #include "DataDurabilityCache.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "MonitorFactory.h"
00024 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00025 #include "CoherentChangeControl.h"
00026 #endif
00027 #include "AssociationData.h"
00028
00029 #if !defined (DDS_HAS_MINIMUM_BIT)
00030 #include "BuiltInTopicUtils.h"
00031 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00032
00033 #include "Util.h"
00034
00035 #include "dds/DCPS/transport/framework/EntryExit.h"
00036 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00037 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00038 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00039
00040 #include "ace/Reactor.h"
00041 #include "ace/Auto_Ptr.h"
00042
00043 #include <stdexcept>
00044
00045 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00046
00047 namespace OpenDDS {
00048 namespace DCPS {
00049
00050
00051 ReplayerImpl::ReplayerImpl()
00052 : data_dropped_count_(0),
00053 data_delivered_count_(0),
00054 n_chunks_(TheServiceParticipant->n_chunks()),
00055 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00056 qos_(TheServiceParticipant->initial_DataWriterQos()),
00057 participant_servant_(0),
00058 topic_id_(GUID_UNKNOWN),
00059 topic_servant_(0),
00060 listener_mask_(DEFAULT_STATUS_MASK),
00061 domain_id_(0),
00062 publisher_servant_(0),
00063 publication_id_(GUID_UNKNOWN),
00064 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00065
00066
00067
00068 is_bit_(false),
00069 empty_condition_(lock_),
00070 pending_write_count_(0)
00071 {
00072
00073
00074
00075
00076
00077
00078
00079 offered_incompatible_qos_status_.total_count = 0;
00080 offered_incompatible_qos_status_.total_count_change = 0;
00081 offered_incompatible_qos_status_.last_policy_id = 0;
00082 offered_incompatible_qos_status_.policies.length(0);
00083
00084 publication_match_status_.total_count = 0;
00085 publication_match_status_.total_count_change = 0;
00086 publication_match_status_.current_count = 0;
00087 publication_match_status_.current_count_change = 0;
00088 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00089
00090 }
00091
00092
00093
00094 ReplayerImpl::~ReplayerImpl()
00095 {
00096 DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00097 }
00098
00099
00100 DDS::ReturnCode_t
00101 ReplayerImpl::cleanup()
00102 {
00103
00104
00105
00106
00107
00108
00109 {
00110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00111
00112
00113
00114 while (this->pending_write_count_)
00115 this->empty_condition_.wait();
00116
00117
00118
00119
00120 this->remove_all_associations();
00121
00122
00123 topic_objref_ = DDS::Topic::_nil();
00124 topic_servant_ = 0;
00125
00126 }
00127
00128
00129
00130
00131 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00132 if (!disco->remove_publication(
00133 this->domain_id_,
00134 this->participant_servant_->get_id(),
00135 this->publication_id_)) {
00136 ACE_ERROR_RETURN((LM_ERROR,
00137 ACE_TEXT("(%P|%t) ERROR: ")
00138 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00139 ACE_TEXT("publication not removed from discovery.\n")),
00140 DDS::RETCODE_ERROR);
00141 }
00142 return DDS::RETCODE_OK;
00143 }
00144
00145 void
00146 ReplayerImpl::init(
00147 DDS::Topic_ptr topic,
00148 TopicImpl * topic_servant,
00149 const DDS::DataWriterQos & qos,
00150 ReplayerListener_rch a_listener,
00151 const DDS::StatusMask & mask,
00152 OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00153 const DDS::PublisherQos& publisher_qos)
00154 {
00155 DBG_ENTRY_LVL("ReplayerImpl","init",6);
00156 topic_objref_ = DDS::Topic::_duplicate(topic);
00157 topic_servant_ = topic_servant;
00158 topic_name_ = topic_servant_->get_name();
00159 topic_id_ = topic_servant_->get_id();
00160 type_name_ = topic_servant_->get_type_name();
00161
00162 #if !defined (DDS_HAS_MINIMUM_BIT)
00163 is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00164 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00165
00166 qos_ = qos;
00167
00168
00169 listener_ = a_listener;
00170 listener_mask_ = mask;
00171
00172
00173
00174 participant_servant_ = participant_servant;
00175 domain_id_ = participant_servant_->get_domain_id();
00176
00177 publisher_qos_ = publisher_qos;
00178 }
00179
00180
00181 DDS::ReturnCode_t ReplayerImpl::set_qos (const DDS::PublisherQos & publisher_qos,
00182 const DDS::DataWriterQos & qos)
00183 {
00184
00185 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00186
00187 if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00188 if (publisher_qos_ == publisher_qos)
00189 return DDS::RETCODE_OK;
00190
00191
00192 if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00193 return DDS::RETCODE_IMMUTABLE_POLICY;
00194
00195 } else {
00196 publisher_qos_ = publisher_qos;
00197 }
00198 } else {
00199 return DDS::RETCODE_INCONSISTENT_POLICY;
00200 }
00201
00202 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00203 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00204 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00205 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00206 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00207
00208 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00209 if (qos_ == qos)
00210 return DDS::RETCODE_OK;
00211
00212 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00213 return DDS::RETCODE_IMMUTABLE_POLICY;
00214
00215 } else {
00216 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00217
00218
00219 DDS::PublisherQos publisherQos = this->publisher_qos_;
00220 const bool status
00221 = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00222 this->participant_servant_->get_id(),
00223 this->publication_id_,
00224 qos,
00225 publisherQos);
00226
00227 if (!status) {
00228 ACE_ERROR_RETURN((LM_ERROR,
00229 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00230 ACE_TEXT("qos not updated. \n")),
00231 DDS::RETCODE_ERROR);
00232 }
00233 }
00234
00235 if (!(qos_ == qos)) {
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262 qos_ = qos;
00263 }
00264
00265 return DDS::RETCODE_OK;
00266
00267 } else {
00268 return DDS::RETCODE_INCONSISTENT_POLICY;
00269 }
00270 }
00271
00272 DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos & publisher_qos,
00273 DDS::DataWriterQos & qos)
00274 {
00275 qos = qos_;
00276 publisher_qos = publisher_qos_;
00277 return DDS::RETCODE_OK;
00278 }
00279
00280
00281 DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
00282 DDS::StatusMask mask)
00283 {
00284 listener_ = a_listener;
00285 listener_mask_ = mask;
00286 return DDS::RETCODE_OK;
00287 }
00288
00289 ReplayerListener_rch ReplayerImpl::get_listener ()
00290 {
00291 return listener_;
00292 }
00293
00294 DDS::ReturnCode_t
00295 ReplayerImpl::enable()
00296 {
00297
00298
00299
00300
00301
00302
00303 if (this->is_enabled()) {
00304 return DDS::RETCODE_OK;
00305 }
00306
00307
00308
00309
00310
00311 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00312
00313 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00314 n_chunks_ = qos_.resource_limits.max_samples;
00315 }
00316
00317
00318 mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00319 db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
00320 header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
00321
00322 sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_));
00323
00324
00325 if (DCPS_debug_level >= 2) {
00326 ACE_DEBUG((LM_DEBUG,
00327 "(%P|%t) ReplayerImpl::enable-mb"
00328 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00329 mb_allocator_.get(),
00330 n_chunks_));
00331
00332 ACE_DEBUG((LM_DEBUG,
00333 "(%P|%t) ReplayerImpl::enable-db"
00334 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00335 db_allocator_.get(),
00336 n_chunks_));
00337
00338 ACE_DEBUG((LM_DEBUG,
00339 "(%P|%t) ReplayerImpl::enable-header"
00340 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00341 header_allocator_.get(),
00342 n_chunks_));
00343 }
00344
00345 this->set_enabled();
00346
00347 try {
00348 this->enable_transport(reliable,
00349 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00350
00351 } catch (const Transport::Exception&) {
00352 ACE_ERROR((LM_ERROR,
00353 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00354 ACE_TEXT("Transport Exception.\n")));
00355 return DDS::RETCODE_ERROR;
00356
00357 }
00358
00359 const TransportLocatorSeq& trans_conf_info = connection_info();
00360
00361
00362 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00363 this->publication_id_ =
00364 disco->add_publication(this->domain_id_,
00365 this->participant_servant_->get_id(),
00366 this->topic_servant_->get_id(),
00367 this,
00368 this->qos_,
00369 trans_conf_info,
00370 this->publisher_qos_);
00371
00372 if (this->publication_id_ == GUID_UNKNOWN) {
00373 ACE_ERROR((LM_ERROR,
00374 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00375 ACE_TEXT("add_publication returned invalid id. \n")));
00376 return DDS::RETCODE_ERROR;
00377 }
00378
00379 return DDS::RETCODE_OK;
00380 }
00381
00382
00383
00384 void
00385 ReplayerImpl::add_association(const RepoId& yourId,
00386 const ReaderAssociation& reader,
00387 bool active)
00388 {
00389 DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00390
00391 if (DCPS_debug_level >= 1) {
00392 GuidConverter writer_converter(yourId);
00393 GuidConverter reader_converter(reader.readerId);
00394 ACE_DEBUG((LM_DEBUG,
00395 ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00396 ACE_TEXT("bit %d local %C remote %C\n"),
00397 is_bit_,
00398 OPENDDS_STRING(writer_converter).c_str(),
00399 OPENDDS_STRING(reader_converter).c_str()));
00400 }
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411 if (GUID_UNKNOWN == publication_id_) {
00412 publication_id_ = yourId;
00413 }
00414
00415 {
00416 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00417 reader_info_.insert(std::make_pair(reader.readerId,
00418 ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00419 reader.exprParams, participant_servant_,
00420 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00421 }
00422
00423 if (DCPS_debug_level > 4) {
00424 GuidConverter converter(publication_id_);
00425 ACE_DEBUG((LM_DEBUG,
00426 ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00427 ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00428 OPENDDS_STRING(converter).c_str(),
00429 qos_.transport_priority.value));
00430 }
00431
00432 AssociationData data;
00433 data.remote_id_ = reader.readerId;
00434 data.remote_data_ = reader.readerTransInfo;
00435 data.remote_reliable_ =
00436 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00437 data.remote_durable_ =
00438 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00439
00440 if (!this->associate(data, active)) {
00441
00442 if (DCPS_debug_level) {
00443 ACE_ERROR((LM_ERROR,
00444 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00445 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00446 }
00447 return;
00448 }
00449
00450 if (active) {
00451 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00452
00453
00454 if (assoc_complete_readers_.count(reader.readerId)) {
00455 assoc_complete_readers_.erase(reader.readerId);
00456 association_complete_i(reader.readerId);
00457
00458
00459
00460 } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00461 GuidConverter converter(reader.readerId);
00462 ACE_ERROR((LM_ERROR,
00463 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00464 ACE_TEXT("failed to mark %C as pending.\n"),
00465 OPENDDS_STRING(converter).c_str()));
00466
00467 } else {
00468 if (DCPS_debug_level > 0) {
00469 GuidConverter converter(reader.readerId);
00470 ACE_DEBUG((LM_DEBUG,
00471 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00472 ACE_TEXT("marked %C as pending.\n"),
00473 OPENDDS_STRING(converter).c_str()));
00474 }
00475 }
00476 } else {
00477
00478
00479 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00480 disco->association_complete(this->domain_id_,
00481 this->participant_servant_->get_id(),
00482 this->publication_id_, reader.readerId);
00483 }
00484 }
00485
00486
00487 ReplayerImpl::ReaderInfo::ReaderInfo(const char* filter,
00488 const DDS::StringSeq& params,
00489 DomainParticipantImpl* participant,
00490 bool durable)
00491 : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00492 , durable_(durable)
00493 {
00494 ACE_UNUSED_ARG(filter);
00495 ACE_UNUSED_ARG(params);
00496 ACE_UNUSED_ARG(participant);
00497 }
00498
00499
00500 ReplayerImpl::ReaderInfo::~ReaderInfo()
00501 {
00502 }
00503
00504
00505 void
00506 ReplayerImpl::association_complete(const RepoId& remote_id)
00507 {
00508 DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00509
00510 if (DCPS_debug_level >= 1) {
00511 GuidConverter writer_converter(this->publication_id_);
00512 GuidConverter reader_converter(remote_id);
00513 ACE_DEBUG((LM_DEBUG,
00514 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00515 ACE_TEXT("bit %d local %C remote %C\n"),
00516 is_bit_,
00517 OPENDDS_STRING(writer_converter).c_str(),
00518 OPENDDS_STRING(reader_converter).c_str()));
00519 }
00520
00521 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00522 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00523
00524
00525 assoc_complete_readers_.insert(remote_id);
00526 } else {
00527 association_complete_i(remote_id);
00528 }
00529 }
00530
00531 void
00532 ReplayerImpl::association_complete_i(const RepoId& remote_id)
00533 {
00534 DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00535
00536 {
00537 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00538 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00539 GuidConverter converter(remote_id);
00540 ACE_ERROR((LM_ERROR,
00541 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00542 ACE_TEXT("insert %C from pending failed.\n"),
00543 OPENDDS_STRING(converter).c_str()));
00544 }
00545
00546
00547
00548
00549 }
00550
00551 if (!is_bit_) {
00552
00553 DDS::InstanceHandle_t handle =
00554 this->participant_servant_->id_to_handle(remote_id);
00555
00556 {
00557
00558 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00559
00560
00561 ++publication_match_status_.total_count;
00562 ++publication_match_status_.total_count_change;
00563 ++publication_match_status_.current_count;
00564 ++publication_match_status_.current_count_change;
00565
00566 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00567 GuidConverter converter(remote_id);
00568 ACE_DEBUG((LM_WARNING,
00569 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00570 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00571 OPENDDS_STRING(converter).c_str(),
00572 handle));
00573 return;
00574
00575 } else if (DCPS_debug_level > 4) {
00576 GuidConverter converter(remote_id);
00577 ACE_DEBUG((LM_DEBUG,
00578 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00579 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00580 OPENDDS_STRING(converter).c_str(),
00581 handle));
00582 }
00583
00584 publication_match_status_.last_subscription_handle = handle;
00585
00586 }
00587
00588
00589 if (listener_.in()) {
00590 listener_->on_replayer_matched(this,
00591 publication_match_status_);
00592
00593
00594
00595 publication_match_status_.total_count_change = 0;
00596 publication_match_status_.current_count_change = 0;
00597 }
00598
00599 }
00600
00601 }
00602
00603 void
00604 ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
00605 CORBA::Boolean notify_lost)
00606 {
00607 if (DCPS_debug_level >= 1) {
00608 GuidConverter writer_converter(publication_id_);
00609 GuidConverter reader_converter(readers[0]);
00610 ACE_DEBUG((LM_DEBUG,
00611 ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00612 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00613 is_bit_,
00614 OPENDDS_STRING(writer_converter).c_str(),
00615 OPENDDS_STRING(reader_converter).c_str(),
00616 readers.length()));
00617 }
00618
00619 this->stop_associating(readers.get_buffer(), readers.length());
00620
00621 ReaderIdSeq fully_associated_readers;
00622 CORBA::ULong fully_associated_len = 0;
00623 ReaderIdSeq rds;
00624 CORBA::ULong rds_len = 0;
00625 DDS::InstanceHandleSeq handles;
00626
00627 {
00628
00629
00630 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00631
00632
00633
00634
00635
00636
00637 CORBA::ULong len = readers.length();
00638
00639 for (CORBA::ULong i = 0; i < len; ++i) {
00640
00641
00642
00643
00644 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00645 ++fully_associated_len;
00646 fully_associated_readers.length(fully_associated_len);
00647 fully_associated_readers [fully_associated_len - 1] = readers[i];
00648
00649
00650
00651
00652 RepoIdToSequenceMap::iterator where
00653 = this->idToSequence_.find(readers[i]);
00654
00655 if (where != this->idToSequence_.end()) {
00656 this->idToSequence_.erase(where);
00657
00658
00659
00660
00661 }
00662
00663 ++rds_len;
00664 rds.length(rds_len);
00665 rds [rds_len - 1] = readers[i];
00666
00667 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00668 ++rds_len;
00669 rds.length(rds_len);
00670 rds [rds_len - 1] = readers[i];
00671
00672 GuidConverter converter(readers[i]);
00673 ACE_DEBUG((LM_WARNING,
00674 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00675 ACE_TEXT("removing reader %C before association_complete() call.\n"),
00676 OPENDDS_STRING(converter).c_str()));
00677 }
00678 reader_info_.erase(readers[i]);
00679
00680
00681 }
00682
00683 if (fully_associated_len > 0 && !is_bit_) {
00684
00685 this->lookup_instance_handles(fully_associated_readers, handles);
00686
00687 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00688 id_to_handle_map_.erase(fully_associated_readers[i]);
00689 }
00690 }
00691
00692
00693
00694
00695
00696 if (!this->is_bit_) {
00697
00698
00699 int matchedSubscriptions =
00700 static_cast<int>(this->id_to_handle_map_.size());
00701 this->publication_match_status_.current_count_change =
00702 matchedSubscriptions - this->publication_match_status_.current_count;
00703
00704
00705 if (this->publication_match_status_.current_count_change != 0) {
00706 this->publication_match_status_.current_count = matchedSubscriptions;
00707
00708
00709
00710
00711
00712 this->publication_match_status_.last_subscription_handle =
00713 handles[rds_len - 1];
00714
00715
00716 if (listener_.in()) {
00717 listener_->on_replayer_matched(
00718 this,
00719 this->publication_match_status_);
00720
00721
00722 this->publication_match_status_.total_count_change = 0;
00723 this->publication_match_status_.current_count_change = 0;
00724 }
00725
00726 }
00727 }
00728 }
00729
00730 for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00731 this->disassociate(rds[i]);
00732 }
00733
00734
00735
00736
00737 if (notify_lost && handles.length() > 0) {
00738 this->notify_publication_lost(handles);
00739 }
00740 }
00741
00742 void ReplayerImpl::remove_all_associations()
00743 {
00744 this->stop_associating();
00745
00746 OpenDDS::DCPS::ReaderIdSeq readers;
00747 CORBA::ULong size;
00748 CORBA::ULong num_pending_readers;
00749 {
00750 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00751
00752 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00753 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00754 readers.length(size);
00755
00756 RepoIdSet::iterator itEnd = readers_.end();
00757 int i = 0;
00758
00759 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00760 readers[i++] = *it;
00761 }
00762
00763 itEnd = pending_readers_.end();
00764 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00765 readers[i++] = *it;
00766 }
00767
00768 if (num_pending_readers > 0) {
00769 ACE_DEBUG((LM_WARNING,
00770 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00771 ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00772 num_pending_readers));
00773 }
00774 }
00775
00776 try {
00777 if (0 < size) {
00778 CORBA::Boolean dont_notify_lost = false;
00779 this->remove_associations(readers, dont_notify_lost);
00780 }
00781
00782 } catch (const CORBA::Exception&) {
00783 }
00784 }
00785
00786 void
00787 ReplayerImpl::register_for_reader(const RepoId& participant,
00788 const RepoId& writerid,
00789 const RepoId& readerid,
00790 const TransportLocatorSeq& locators,
00791 DiscoveryListener* listener)
00792 {
00793 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00794 }
00795
00796 void
00797 ReplayerImpl::unregister_for_reader(const RepoId& participant,
00798 const RepoId& writerid,
00799 const RepoId& readerid)
00800 {
00801 TransportClient::unregister_for_reader(participant, writerid, readerid);
00802 }
00803
00804 void
00805 ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00806 {
00807
00808
00809 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00810
00811
00812 offered_incompatible_qos_status_.total_count = status.total_count;
00813 offered_incompatible_qos_status_.total_count_change +=
00814 status.count_since_last_send;
00815 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00816 offered_incompatible_qos_status_.policies = status.policies;
00817
00818 }
00819
00820 void
00821 ReplayerImpl::update_subscription_params(const RepoId& readerId,
00822 const DDS::StringSeq& params)
00823 {
00824 ACE_UNUSED_ARG(readerId);
00825 ACE_UNUSED_ARG(params);
00826 }
00827
00828 void
00829 ReplayerImpl::inconsistent_topic()
00830 {
00831 topic_servant_->inconsistent_topic();
00832 }
00833
00834 bool
00835 ReplayerImpl::check_transport_qos(const TransportInst&)
00836 {
00837
00838
00839 return true;
00840 }
00841
00842 const RepoId&
00843 ReplayerImpl::get_repo_id() const
00844 {
00845 return this->publication_id_;
00846 }
00847
00848 CORBA::Long
00849 ReplayerImpl::get_priority_value(const AssociationData&) const
00850 {
00851 return this->qos_.transport_priority.value;
00852 }
00853
00854 void
00855 ReplayerImpl::data_delivered(const DataSampleElement* sample)
00856 {
00857 DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00858 if (!(sample->get_pub_id() == this->publication_id_)) {
00859 GuidConverter sample_converter(sample->get_pub_id());
00860 GuidConverter writer_converter(publication_id_);
00861 ACE_ERROR((LM_ERROR,
00862 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00863 ACE_TEXT(" The publication id %C from delivered element ")
00864 ACE_TEXT("does not match the datawriter's id %C\n"),
00865 OPENDDS_STRING(sample_converter).c_str(),
00866 OPENDDS_STRING(writer_converter).c_str()));
00867 return;
00868 }
00869 DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00870
00871 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00872 ++data_delivered_count_;
00873
00874 {
00875 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00876 if ((--pending_write_count_) == 0) {
00877 empty_condition_.broadcast();
00878 }
00879 }
00880 }
00881
00882 void
00883 ReplayerImpl::control_delivered(const Message_Block_Ptr& sample)
00884 {
00885 ACE_UNUSED_ARG(sample);
00886 }
00887
00888 void
00889 ReplayerImpl::data_dropped(const DataSampleElement* sample,
00890 bool dropped_by_transport)
00891 {
00892 DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00893
00894 ACE_UNUSED_ARG(dropped_by_transport);
00895 DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00896 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00897 ++data_dropped_count_;
00898 {
00899 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00900 if ((--pending_write_count_) == 0) {
00901 empty_condition_.broadcast();
00902 }
00903 }
00904 }
00905
00906 void
00907 ReplayerImpl::control_dropped(const Message_Block_Ptr& sample,
00908 bool )
00909 {
00910 ACE_UNUSED_ARG(sample);
00911 }
00912
00913 void
00914 ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
00915 {
00916 ACE_UNUSED_ARG(subids);
00917 }
00918
00919 void
00920 ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
00921 {
00922 ACE_UNUSED_ARG(subids);
00923 }
00924
00925 void
00926 ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
00927 {
00928 ACE_UNUSED_ARG(subids);
00929 }
00930
00931 void
00932 ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
00933 {
00934 ACE_UNUSED_ARG(handles);
00935 }
00936
00937
00938 void
00939 ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
00940 {
00941 qos_data.pub_qos = this->publisher_qos_;
00942 qos_data.dw_qos = this->qos_;
00943 qos_data.topic_name = this->topic_name_.in();
00944 }
00945
00946 DDS::ReturnCode_t
00947 ReplayerImpl::write (const RawDataSample* samples,
00948 int num_samples,
00949 DDS::InstanceHandle_t* reader_ih_ptr)
00950 {
00951 DBG_ENTRY_LVL("ReplayerImpl","write",6);
00952
00953 OpenDDS::DCPS::RepoId repo_id;
00954 if (reader_ih_ptr) {
00955 repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00956 if (repo_id == GUID_UNKNOWN) {
00957 ACE_ERROR_RETURN((LM_ERROR,
00958 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00959 ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00960 DDS::RETCODE_ERROR);
00961 }
00962 }
00963
00964 SendStateDataSampleList list;
00965
00966 for (int i = 0; i < num_samples; ++i) {
00967 DataSampleElement* element = 0;
00968
00969 ACE_NEW_MALLOC_RETURN(
00970 element,
00971 static_cast<DataSampleElement*>(
00972 sample_list_element_allocator_->malloc(
00973 sizeof(DataSampleElement))),
00974 DataSampleElement(publication_id_,
00975 this,
00976 PublicationInstance_rch()),
00977 DDS::RETCODE_ERROR);
00978
00979 element->get_header().byte_order_ = samples[i].sample_byte_order_;
00980 element->get_header().publication_id_ = this->publication_id_;
00981 list.enqueue_tail(element);
00982 Message_Block_Ptr temp;
00983 Message_Block_Ptr sample(samples[i].sample_->duplicate());
00984 DDS::ReturnCode_t ret = create_sample_data_message(move(sample),
00985 element->get_header(),
00986 temp,
00987 samples[i].source_timestamp_,
00988 false);
00989 element->set_sample(move(temp));
00990 if (reader_ih_ptr) {
00991 element->set_num_subs(1);
00992 element->set_sub_id(0, repo_id);
00993 }
00994
00995 if (ret != DDS::RETCODE_OK) {
00996
00997 while (list.dequeue(element)) {
00998 ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
00999 }
01000
01001 return ret;
01002 }
01003 }
01004
01005 {
01006 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01007 ++pending_write_count_;
01008 }
01009
01010 this->send(list);
01011
01012 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01013 end = reader_info_.end(); iter != end; ++iter) {
01014 iter->second.expected_sequence_ = sequence_number_;
01015 }
01016
01017 return DDS::RETCODE_OK;
01018 }
01019
01020 DDS::ReturnCode_t
01021 ReplayerImpl::write(const RawDataSample& sample)
01022 {
01023 return this->write(&sample, 1, 0);
01024 }
01025
01026 DDS::ReturnCode_t
01027 ReplayerImpl::create_sample_data_message(Message_Block_Ptr data,
01028 DataSampleHeader& header_data,
01029 Message_Block_Ptr& message,
01030 const DDS::Time_t& source_timestamp,
01031 bool content_filter)
01032 {
01033 header_data.message_id_ = SAMPLE_DATA;
01034 header_data.coherent_change_ = content_filter;
01035
01036 header_data.content_filter_ = 0;
01037 header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01038 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01039 header_data.sequence_repair_ = need_sequence_repair();
01040 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01041 this->sequence_number_ = SequenceNumber();
01042 } else {
01043 ++this->sequence_number_;
01044 }
01045 header_data.sequence_ = this->sequence_number_;
01046 header_data.source_timestamp_sec_ = source_timestamp.sec;
01047 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01048
01049 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01050 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01051 header_data.lifespan_duration_ = true;
01052 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01053 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01054 }
01055
01056
01057
01058 size_t max_marshaled_size = header_data.max_marshaled_size();
01059 ACE_Message_Block* tmp;
01060 ACE_NEW_MALLOC_RETURN(tmp,
01061 static_cast<ACE_Message_Block*>(
01062 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01063 ACE_Message_Block(max_marshaled_size,
01064 ACE_Message_Block::MB_DATA,
01065 data.release(),
01066 0,
01067 header_allocator_.get(),
01068 0,
01069 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01070 ACE_Time_Value::zero,
01071 ACE_Time_Value::max_time,
01072 db_allocator_.get(),
01073 mb_allocator_.get()),
01074 DDS::RETCODE_ERROR);
01075 message.reset(tmp);
01076 *message << header_data;
01077 return DDS::RETCODE_OK;
01078 }
01079
01080 void
01081 ReplayerImpl::lookup_instance_handles(const ReaderIdSeq& ids,
01082 DDS::InstanceHandleSeq & hdls)
01083 {
01084 CORBA::ULong const num_rds = ids.length();
01085
01086 if (DCPS_debug_level > 9) {
01087 OPENDDS_STRING separator;
01088 OPENDDS_STRING buffer;
01089
01090 for (CORBA::ULong i = 0; i < num_rds; ++i) {
01091 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01092 separator = ", ";
01093 }
01094
01095 ACE_DEBUG((LM_DEBUG,
01096 ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
01097 ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01098 buffer.c_str()));
01099 }
01100
01101 hdls.length(num_rds);
01102
01103 for (CORBA::ULong i = 0; i < num_rds; ++i) {
01104 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01105 }
01106 }
01107
01108 bool
01109 ReplayerImpl::need_sequence_repair() const
01110 {
01111 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01112 end = reader_info_.end(); it != end; ++it) {
01113 if (it->second.expected_sequence_ != sequence_number_) {
01114 return true;
01115 }
01116 }
01117 return false;
01118 }
01119
01120 DDS::InstanceHandle_t
01121 ReplayerImpl::get_instance_handle()
01122 {
01123 return this->participant_servant_->id_to_handle(publication_id_);
01124 }
01125
01126 DDS::ReturnCode_t
01127 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01128 const RawDataSample& sample )
01129 {
01130 return write(&sample, 1, &subscription);
01131 }
01132
01133 DDS::ReturnCode_t
01134 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01135 const RawDataSampleList& samples )
01136 {
01137 if (samples.size())
01138 return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01139 return DDS::RETCODE_ERROR;
01140 }
01141
01142 }
01143 }
01144
01145 OPENDDS_END_VERSIONED_NAMESPACE_DECL