00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "SendStateDataSampleList.h"
00018 #include "DataSampleElement.h"
00019 #include "Serializer.h"
00020 #include "Transient_Kludge.h"
00021 #include "DataDurabilityCache.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "MonitorFactory.h"
00024 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00025 #include "CoherentChangeControl.h"
00026 #endif
00027 #include "AssociationData.h"
00028
00029 #if !defined (DDS_HAS_MINIMUM_BIT)
00030 #include "BuiltInTopicUtils.h"
00031 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00032
00033 #include "Util.h"
00034
00035 #include "dds/DCPS/transport/framework/EntryExit.h"
00036 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00037 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00038 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00039
00040 #include "tao/ORB_Core.h"
00041 #include "ace/Reactor.h"
00042 #include "ace/Auto_Ptr.h"
00043
00044 #include <stdexcept>
00045
00046 #include "ReplayerImpl.h"
00047
00048 namespace OpenDDS {
00049 namespace DCPS {
00050
00051
00052 ReplayerImpl::ReplayerImpl()
00053 : data_dropped_count_(0),
00054 data_delivered_count_(0),
00055 n_chunks_(TheServiceParticipant->n_chunks()),
00056 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00057 qos_(TheServiceParticipant->initial_DataWriterQos()),
00058 participant_servant_(0),
00059 topic_id_(GUID_UNKNOWN),
00060 topic_servant_(0),
00061 listener_mask_(DEFAULT_STATUS_MASK),
00062 domain_id_(0),
00063 publisher_servant_(0),
00064 publication_id_(GUID_UNKNOWN),
00065 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066
00067
00068
00069 is_bit_(false),
00070 empty_condition_(lock_),
00071 pending_write_count_(0)
00072 {
00073
00074
00075
00076
00077
00078
00079
00080 offered_incompatible_qos_status_.total_count = 0;
00081 offered_incompatible_qos_status_.total_count_change = 0;
00082 offered_incompatible_qos_status_.last_policy_id = 0;
00083 offered_incompatible_qos_status_.policies.length(0);
00084
00085 publication_match_status_.total_count = 0;
00086 publication_match_status_.total_count_change = 0;
00087 publication_match_status_.current_count = 0;
00088 publication_match_status_.current_count_change = 0;
00089 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00090
00091 }
00092
00093
00094
00095 ReplayerImpl::~ReplayerImpl()
00096 {
00097 DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00098 }
00099
00100
00101 DDS::ReturnCode_t
00102 ReplayerImpl::cleanup()
00103 {
00104
00105
00106
00107
00108
00109
00110 {
00111 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00112
00113
00114
00115 while (this->pending_write_count_)
00116 this->empty_condition_.wait();
00117
00118
00119
00120
00121 this->remove_all_associations();
00122
00123
00124 topic_objref_ = DDS::Topic::_nil();
00125 topic_servant_->remove_entity_ref();
00126 topic_servant_->_remove_ref();
00127 topic_servant_ = 0;
00128
00129 }
00130
00131
00132
00133
00134 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00135 if (!disco->remove_publication(
00136 this->domain_id_,
00137 this->participant_servant_->get_id(),
00138 this->publication_id_)) {
00139 ACE_ERROR_RETURN((LM_ERROR,
00140 ACE_TEXT("(%P|%t) ERROR: ")
00141 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00142 ACE_TEXT("publication not removed from discovery.\n")),
00143 DDS::RETCODE_ERROR);
00144 }
00145 return DDS::RETCODE_OK;
00146 }
00147
00148 void
00149 ReplayerImpl::init(
00150 DDS::Topic_ptr topic,
00151 TopicImpl * topic_servant,
00152 const DDS::DataWriterQos & qos,
00153 ReplayerListener_rch a_listener,
00154 const DDS::StatusMask & mask,
00155 OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00156 const DDS::PublisherQos& publisher_qos)
00157 {
00158 DBG_ENTRY_LVL("ReplayerImpl","init",6);
00159 topic_objref_ = DDS::Topic::_duplicate(topic);
00160 topic_servant_ = topic_servant;
00161 topic_servant_->_add_ref();
00162 topic_servant_->add_entity_ref();
00163 topic_name_ = topic_servant_->get_name();
00164 topic_id_ = topic_servant_->get_id();
00165 type_name_ = topic_servant_->get_type_name();
00166
00167 #if !defined (DDS_HAS_MINIMUM_BIT)
00168 is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00169 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00170 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00171 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00172 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00173
00174 qos_ = qos;
00175
00176
00177 listener_ = a_listener;
00178 listener_mask_ = mask;
00179
00180
00181
00182 participant_servant_ = participant_servant;
00183 domain_id_ = participant_servant_->get_domain_id();
00184
00185 publisher_qos_ = publisher_qos;
00186 }
00187
00188
00189 DDS::ReturnCode_t ReplayerImpl::set_qos (const ::DDS::PublisherQos & publisher_qos,
00190 const DDS::DataWriterQos & qos)
00191 {
00192
00193 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00194
00195 if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00196 if (publisher_qos_ == publisher_qos)
00197 return DDS::RETCODE_OK;
00198
00199
00200 if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00201 return DDS::RETCODE_IMMUTABLE_POLICY;
00202
00203 } else {
00204 publisher_qos_ = publisher_qos;
00205 }
00206 } else {
00207 return DDS::RETCODE_INCONSISTENT_POLICY;
00208 }
00209
00210 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00211 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00212 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00213 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00214 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00215
00216 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00217 if (qos_ == qos)
00218 return DDS::RETCODE_OK;
00219
00220 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00221 return DDS::RETCODE_IMMUTABLE_POLICY;
00222
00223 } else {
00224 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00225
00226
00227 DDS::PublisherQos publisherQos = this->publisher_qos_;
00228 const bool status
00229 = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00230 this->participant_servant_->get_id(),
00231 this->publication_id_,
00232 qos,
00233 publisherQos);
00234
00235 if (!status) {
00236 ACE_ERROR_RETURN((LM_ERROR,
00237 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00238 ACE_TEXT("qos not updated. \n")),
00239 DDS::RETCODE_ERROR);
00240 }
00241 }
00242
00243 if (!(qos_ == qos)) {
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270 qos_ = qos;
00271 }
00272
00273 return DDS::RETCODE_OK;
00274
00275 } else {
00276 return DDS::RETCODE_INCONSISTENT_POLICY;
00277 }
00278 }
00279
00280 DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos & publisher_qos,
00281 DDS::DataWriterQos & qos)
00282 {
00283 qos = qos_;
00284 publisher_qos = publisher_qos_;
00285 return DDS::RETCODE_OK;
00286 }
00287
00288
00289 DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
00290 DDS::StatusMask mask)
00291 {
00292 listener_ = a_listener;
00293 listener_mask_ = mask;
00294 return DDS::RETCODE_OK;
00295 }
00296
00297 ReplayerListener_rch ReplayerImpl::get_listener ()
00298 {
00299 return listener_;
00300 }
00301
00302 DDS::ReturnCode_t
00303 ReplayerImpl::enable()
00304 {
00305
00306
00307
00308
00309
00310
00311 if (this->is_enabled()) {
00312 return DDS::RETCODE_OK;
00313 }
00314
00315
00316
00317
00318
00319 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00320
00321 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00322 n_chunks_ = qos_.resource_limits.max_samples;
00323 }
00324
00325
00326 ACE_auto_ptr_reset(mb_allocator_,
00327 new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00328 ACE_auto_ptr_reset(db_allocator_,
00329 new DataBlockAllocator(n_chunks_+1));
00330 ACE_auto_ptr_reset(header_allocator_,
00331 new DataSampleHeaderAllocator(n_chunks_+1));
00332
00333 ACE_auto_ptr_reset(sample_list_element_allocator_,
00334 new DataSampleElementAllocator(2 * n_chunks_));
00335
00336 ACE_auto_ptr_reset(transport_send_element_allocator_,
00337 new TransportSendElementAllocator(2 * n_chunks_,
00338 sizeof(TransportSendElement)));
00339 ACE_auto_ptr_reset(transport_customized_element_allocator_,
00340 new TransportCustomizedElementAllocator(2 * n_chunks_,
00341 sizeof(TransportCustomizedElement)));
00342
00343 if (DCPS_debug_level >= 2) {
00344 ACE_DEBUG((LM_DEBUG,
00345 "(%P|%t) ReplayerImpl::enable-mb"
00346 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00347 mb_allocator_.get(),
00348 n_chunks_));
00349
00350 ACE_DEBUG((LM_DEBUG,
00351 "(%P|%t) ReplayerImpl::enable-db"
00352 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00353 db_allocator_.get(),
00354 n_chunks_));
00355
00356 ACE_DEBUG((LM_DEBUG,
00357 "(%P|%t) ReplayerImpl::enable-header"
00358 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00359 header_allocator_.get(),
00360 n_chunks_));
00361 }
00362
00363 this->set_enabled();
00364
00365 try {
00366 this->enable_transport(reliable,
00367 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00368
00369 } catch (const Transport::Exception&) {
00370 ACE_ERROR((LM_ERROR,
00371 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00372 ACE_TEXT("Transport Exception.\n")));
00373 return DDS::RETCODE_ERROR;
00374
00375 }
00376
00377 const TransportLocatorSeq& trans_conf_info = connection_info();
00378
00379
00380 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00381 this->publication_id_ =
00382 disco->add_publication(this->domain_id_,
00383 this->participant_servant_->get_id(),
00384 this->topic_servant_->get_id(),
00385 this,
00386 this->qos_,
00387 trans_conf_info,
00388 this->publisher_qos_);
00389
00390 if (this->publication_id_ == GUID_UNKNOWN) {
00391 ACE_ERROR((LM_ERROR,
00392 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00393 ACE_TEXT("add_publication returned invalid id. \n")));
00394 return DDS::RETCODE_ERROR;
00395 }
00396
00397 return DDS::RETCODE_OK;
00398 }
00399
00400
00401
00402 void
00403 ReplayerImpl::add_association(const RepoId& yourId,
00404 const ReaderAssociation& reader,
00405 bool active)
00406 {
00407 DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00408
00409 if (DCPS_debug_level >= 1) {
00410 GuidConverter writer_converter(yourId);
00411 GuidConverter reader_converter(reader.readerId);
00412 ACE_DEBUG((LM_DEBUG,
00413 ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00414 ACE_TEXT("bit %d local %C remote %C\n"),
00415 is_bit_,
00416 OPENDDS_STRING(writer_converter).c_str(),
00417 OPENDDS_STRING(reader_converter).c_str()));
00418 }
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429 if (GUID_UNKNOWN == publication_id_) {
00430 publication_id_ = yourId;
00431 }
00432
00433 {
00434 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00435 reader_info_.insert(std::make_pair(reader.readerId,
00436 ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00437 reader.exprParams, participant_servant_,
00438 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00439 }
00440
00441 if (DCPS_debug_level > 4) {
00442 GuidConverter converter(publication_id_);
00443 ACE_DEBUG((LM_DEBUG,
00444 ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00445 ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00446 OPENDDS_STRING(converter).c_str(),
00447 qos_.transport_priority.value));
00448 }
00449
00450 AssociationData data;
00451 data.remote_id_ = reader.readerId;
00452 data.remote_data_ = reader.readerTransInfo;
00453 data.remote_reliable_ =
00454 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00455 data.remote_durable_ =
00456 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00457
00458 if (!this->associate(data, active)) {
00459
00460 if (DCPS_debug_level) {
00461 ACE_DEBUG((LM_ERROR,
00462 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00463 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00464 }
00465 return;
00466 }
00467
00468 if (active) {
00469 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00470
00471
00472 if (assoc_complete_readers_.count(reader.readerId)) {
00473 assoc_complete_readers_.erase(reader.readerId);
00474 association_complete_i(reader.readerId);
00475
00476
00477
00478 } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00479 GuidConverter converter(reader.readerId);
00480 ACE_ERROR((LM_ERROR,
00481 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00482 ACE_TEXT("failed to mark %C as pending.\n"),
00483 OPENDDS_STRING(converter).c_str()));
00484
00485 } else {
00486 if (DCPS_debug_level > 0) {
00487 GuidConverter converter(reader.readerId);
00488 ACE_DEBUG((LM_DEBUG,
00489 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00490 ACE_TEXT("marked %C as pending.\n"),
00491 OPENDDS_STRING(converter).c_str()));
00492 }
00493 }
00494 } else {
00495
00496
00497 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00498 disco->association_complete(this->domain_id_,
00499 this->participant_servant_->get_id(),
00500 this->publication_id_, reader.readerId);
00501 }
00502 }
00503
00504
00505 ReplayerImpl::ReaderInfo::ReaderInfo(const char* filter,
00506 const DDS::StringSeq& params,
00507 DomainParticipantImpl* participant,
00508 bool durable)
00509 : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00510 , durable_(durable)
00511 {
00512 ACE_UNUSED_ARG(filter);
00513 ACE_UNUSED_ARG(params);
00514 ACE_UNUSED_ARG(participant);
00515 }
00516
00517
00518 ReplayerImpl::ReaderInfo::~ReaderInfo()
00519 {
00520 }
00521
00522
00523 void
00524 ReplayerImpl::association_complete(const RepoId& remote_id)
00525 {
00526 DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00527
00528 if (DCPS_debug_level >= 1) {
00529 GuidConverter writer_converter(this->publication_id_);
00530 GuidConverter reader_converter(remote_id);
00531 ACE_DEBUG((LM_DEBUG,
00532 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00533 ACE_TEXT("bit %d local %C remote %C\n"),
00534 is_bit_,
00535 OPENDDS_STRING(writer_converter).c_str(),
00536 OPENDDS_STRING(reader_converter).c_str()));
00537 }
00538
00539 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00540 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00541
00542
00543 assoc_complete_readers_.insert(remote_id);
00544 } else {
00545 association_complete_i(remote_id);
00546 }
00547 }
00548
00549 void
00550 ReplayerImpl::association_complete_i(const RepoId& remote_id)
00551 {
00552 DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00553
00554 {
00555 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00556 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00557 GuidConverter converter(remote_id);
00558 ACE_ERROR((LM_ERROR,
00559 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00560 ACE_TEXT("insert %C from pending failed.\n"),
00561 OPENDDS_STRING(converter).c_str()));
00562 }
00563
00564
00565
00566
00567 }
00568
00569 if (!is_bit_) {
00570
00571 DDS::InstanceHandle_t handle =
00572 this->participant_servant_->id_to_handle(remote_id);
00573
00574 {
00575
00576 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00577
00578
00579 ++publication_match_status_.total_count;
00580 ++publication_match_status_.total_count_change;
00581 ++publication_match_status_.current_count;
00582 ++publication_match_status_.current_count_change;
00583
00584 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00585 GuidConverter converter(remote_id);
00586 ACE_DEBUG((LM_WARNING,
00587 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00588 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00589 OPENDDS_STRING(converter).c_str(),
00590 handle));
00591 return;
00592
00593 } else if (DCPS_debug_level > 4) {
00594 GuidConverter converter(remote_id);
00595 ACE_DEBUG((LM_DEBUG,
00596 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00597 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00598 OPENDDS_STRING(converter).c_str(),
00599 handle));
00600 }
00601
00602 publication_match_status_.last_subscription_handle = handle;
00603
00604 }
00605
00606
00607 if (listener_.in()) {
00608 listener_->on_replayer_matched(this,
00609 publication_match_status_);
00610
00611
00612
00613 publication_match_status_.total_count_change = 0;
00614 publication_match_status_.current_count_change = 0;
00615 }
00616
00617 }
00618
00619 }
00620
00621 void
00622 ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
00623 CORBA::Boolean notify_lost)
00624 {
00625 if (DCPS_debug_level >= 1) {
00626 GuidConverter writer_converter(publication_id_);
00627 GuidConverter reader_converter(readers[0]);
00628 ACE_DEBUG((LM_DEBUG,
00629 ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00630 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00631 is_bit_,
00632 OPENDDS_STRING(writer_converter).c_str(),
00633 OPENDDS_STRING(reader_converter).c_str(),
00634 readers.length()));
00635 }
00636
00637 this->stop_associating(readers.get_buffer(), readers.length());
00638
00639 ReaderIdSeq fully_associated_readers;
00640 CORBA::ULong fully_associated_len = 0;
00641 ReaderIdSeq rds;
00642 CORBA::ULong rds_len = 0;
00643 DDS::InstanceHandleSeq handles;
00644
00645 {
00646
00647
00648 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00649
00650
00651
00652
00653
00654
00655 CORBA::ULong len = readers.length();
00656
00657 for (CORBA::ULong i = 0; i < len; ++i) {
00658
00659
00660
00661
00662 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00663 ++fully_associated_len;
00664 fully_associated_readers.length(fully_associated_len);
00665 fully_associated_readers [fully_associated_len - 1] = readers[i];
00666
00667
00668
00669
00670 RepoIdToSequenceMap::iterator where
00671 = this->idToSequence_.find(readers[i]);
00672
00673 if (where != this->idToSequence_.end()) {
00674 this->idToSequence_.erase(where);
00675
00676
00677
00678
00679 }
00680
00681 ++rds_len;
00682 rds.length(rds_len);
00683 rds [rds_len - 1] = readers[i];
00684
00685 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00686 ++rds_len;
00687 rds.length(rds_len);
00688 rds [rds_len - 1] = readers[i];
00689
00690 GuidConverter converter(readers[i]);
00691 ACE_DEBUG((LM_WARNING,
00692 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00693 ACE_TEXT("removing reader %C before association_complete() call.\n"),
00694 OPENDDS_STRING(converter).c_str()));
00695 }
00696 reader_info_.erase(readers[i]);
00697
00698
00699 }
00700
00701 if (fully_associated_len > 0 && !is_bit_) {
00702
00703
00704 if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00705 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReplayerImpl::remove_associations: "
00706 "lookup_instance_handles failed, notify %d \n", notify_lost));
00707 return;
00708 }
00709
00710 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00711 id_to_handle_map_.erase(fully_associated_readers[i]);
00712 }
00713 }
00714
00715
00716
00717
00718
00719 if (!this->is_bit_) {
00720
00721
00722 int matchedSubscriptions =
00723 static_cast<int>(this->id_to_handle_map_.size());
00724 this->publication_match_status_.current_count_change =
00725 matchedSubscriptions - this->publication_match_status_.current_count;
00726
00727
00728 if (this->publication_match_status_.current_count_change != 0) {
00729 this->publication_match_status_.current_count = matchedSubscriptions;
00730
00731
00732
00733
00734
00735 this->publication_match_status_.last_subscription_handle =
00736 handles[rds_len - 1];
00737
00738
00739 if (listener_.in()) {
00740 listener_->on_replayer_matched(
00741 this,
00742 this->publication_match_status_);
00743
00744
00745 this->publication_match_status_.total_count_change = 0;
00746 this->publication_match_status_.current_count_change = 0;
00747 }
00748
00749 }
00750 }
00751 }
00752
00753 for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00754 this->disassociate(rds[i]);
00755 }
00756
00757
00758
00759
00760 if (notify_lost && handles.length() > 0) {
00761 this->notify_publication_lost(handles);
00762 }
00763 }
00764
00765 void ReplayerImpl::remove_all_associations()
00766 {
00767 this->stop_associating();
00768
00769 OpenDDS::DCPS::ReaderIdSeq readers;
00770 CORBA::ULong size;
00771 CORBA::ULong num_pending_readers;
00772 {
00773 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00774
00775 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00776 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00777 readers.length(size);
00778
00779 RepoIdSet::iterator itEnd = readers_.end();
00780 int i = 0;
00781
00782 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00783 readers[i++] = *it;
00784 }
00785
00786 itEnd = pending_readers_.end();
00787 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00788 readers[i++] = *it;
00789 }
00790
00791 if (num_pending_readers > 0) {
00792 ACE_DEBUG((LM_WARNING,
00793 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00794 ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00795 num_pending_readers));
00796 }
00797 }
00798
00799 try {
00800 if (0 < size) {
00801 CORBA::Boolean dont_notify_lost = false;
00802 this->remove_associations(readers, dont_notify_lost);
00803 }
00804
00805 } catch (const CORBA::Exception&) {
00806 }
00807 }
00808
00809 void
00810 ReplayerImpl::register_for_reader(const RepoId& participant,
00811 const RepoId& writerid,
00812 const RepoId& readerid,
00813 const TransportLocatorSeq& locators,
00814 DiscoveryListener* listener)
00815 {
00816 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00817 }
00818
00819 void
00820 ReplayerImpl::unregister_for_reader(const RepoId& participant,
00821 const RepoId& writerid,
00822 const RepoId& readerid)
00823 {
00824 TransportClient::unregister_for_reader(participant, writerid, readerid);
00825 }
00826
00827 void
00828 ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00829 {
00830
00831
00832 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00833
00834
00835 offered_incompatible_qos_status_.total_count = status.total_count;
00836 offered_incompatible_qos_status_.total_count_change +=
00837 status.count_since_last_send;
00838 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00839 offered_incompatible_qos_status_.policies = status.policies;
00840
00841 }
00842
00843 void
00844 ReplayerImpl::update_subscription_params(const RepoId& readerId,
00845 const DDS::StringSeq& params)
00846 {
00847 ACE_UNUSED_ARG(readerId);
00848 ACE_UNUSED_ARG(params);
00849 }
00850
00851 void
00852 ReplayerImpl::inconsistent_topic()
00853 {
00854 topic_servant_->inconsistent_topic();
00855 }
00856
00857 bool
00858 ReplayerImpl::check_transport_qos(const TransportInst&)
00859 {
00860
00861
00862 return true;
00863 }
00864
00865 const RepoId&
00866 ReplayerImpl::get_repo_id() const
00867 {
00868 return this->publication_id_;
00869 }
00870
00871 CORBA::Long
00872 ReplayerImpl::get_priority_value(const AssociationData&) const
00873 {
00874 return this->qos_.transport_priority.value;
00875 }
00876
00877 void
00878 ReplayerImpl::data_delivered(const DataSampleElement* sample)
00879 {
00880 DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00881 if (!(sample->get_pub_id() == this->publication_id_)) {
00882 GuidConverter sample_converter(sample->get_pub_id());
00883 GuidConverter writer_converter(publication_id_);
00884 ACE_ERROR((LM_ERROR,
00885 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00886 ACE_TEXT(" The publication id %C from delivered element ")
00887 ACE_TEXT("does not match the datawriter's id %C\n"),
00888 OPENDDS_STRING(sample_converter).c_str(),
00889 OPENDDS_STRING(writer_converter).c_str()));
00890 return;
00891 }
00892 DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00893
00894 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00895 ++data_delivered_count_;
00896
00897 {
00898 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00899 if ((--pending_write_count_) == 0) {
00900 empty_condition_.broadcast();
00901 }
00902 }
00903 }
00904
00905 void
00906 ReplayerImpl::control_delivered(ACE_Message_Block* sample)
00907 {
00908 ACE_UNUSED_ARG(sample);
00909 }
00910
00911 void
00912 ReplayerImpl::data_dropped(const DataSampleElement* sample,
00913 bool dropped_by_transport)
00914 {
00915 DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00916
00917 ACE_UNUSED_ARG(dropped_by_transport);
00918 DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00919 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00920 ++data_dropped_count_;
00921 {
00922 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00923 if ((--pending_write_count_) == 0) {
00924 empty_condition_.broadcast();
00925 }
00926 }
00927 }
00928
00929 void
00930 ReplayerImpl::control_dropped(ACE_Message_Block* sample,
00931 bool )
00932 {
00933 ACE_UNUSED_ARG(sample);
00934 }
00935
00936 void
00937 ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
00938 {
00939 ACE_UNUSED_ARG(subids);
00940 }
00941
00942 void
00943 ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
00944 {
00945 ACE_UNUSED_ARG(subids);
00946 }
00947
00948 void
00949 ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
00950 {
00951 ACE_UNUSED_ARG(subids);
00952 }
00953
00954 void
00955 ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
00956 {
00957 ACE_UNUSED_ARG(handles);
00958 }
00959
00960 void
00961 ReplayerImpl::notify_connection_deleted(const RepoId&)
00962 {
00963 }
00964
00965 void
00966 ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
00967 {
00968 qos_data.pub_qos = this->publisher_qos_;
00969 qos_data.dw_qos = this->qos_;
00970 qos_data.topic_name = this->topic_name_.in();
00971 }
00972
00973 DDS::ReturnCode_t
00974 ReplayerImpl::write (const RawDataSample* samples,
00975 int num_samples,
00976 DDS::InstanceHandle_t* reader_ih_ptr)
00977 {
00978 DBG_ENTRY_LVL("ReplayerImpl","write",6);
00979
00980 OpenDDS::DCPS::RepoId repo_id;
00981 if (reader_ih_ptr) {
00982 repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00983 if (repo_id == GUID_UNKNOWN) {
00984 ACE_ERROR_RETURN((LM_ERROR,
00985 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00986 ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00987 DDS::RETCODE_ERROR);
00988 }
00989 }
00990
00991 SendStateDataSampleList list;
00992
00993 for (int i = 0; i < num_samples; ++i) {
00994 DataSampleElement* element = 0;
00995
00996 ACE_NEW_MALLOC_RETURN(
00997 element,
00998 static_cast<DataSampleElement*>(
00999 sample_list_element_allocator_->malloc(
01000 sizeof(DataSampleElement))),
01001 DataSampleElement(publication_id_,
01002 this,
01003 0,
01004 transport_send_element_allocator_.get(),
01005 transport_customized_element_allocator_.get()),
01006 DDS::RETCODE_ERROR);
01007
01008 element->get_header().byte_order_ = samples[i].sample_byte_order_;
01009 element->get_header().publication_id_ = this->publication_id_;
01010 list.enqueue_tail(element);
01011 DataSample* temp;
01012 DDS::ReturnCode_t ret = create_sample_data_message(samples[i].sample_->duplicate(),
01013 element->get_header(),
01014 temp,
01015 samples[i].source_timestamp_,
01016 false);
01017 element->set_sample(temp);
01018 if (reader_ih_ptr) {
01019 element->set_num_subs(1);
01020 element->set_sub_id(0, repo_id);
01021 }
01022
01023 if (ret != DDS::RETCODE_OK) {
01024
01025 while (list.dequeue(element)) {
01026 ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
01027 }
01028
01029 return ret;
01030 }
01031 }
01032
01033 {
01034 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01035 ++pending_write_count_;
01036 }
01037
01038 this->send(list);
01039
01040 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01041 end = reader_info_.end(); iter != end; ++iter) {
01042 iter->second.expected_sequence_ = sequence_number_;
01043 }
01044
01045 return DDS::RETCODE_OK;
01046 }
01047
01048 DDS::ReturnCode_t
01049 ReplayerImpl::write(const RawDataSample& sample)
01050 {
01051 return this->write(&sample, 1, 0);
01052 }
01053
01054 DDS::ReturnCode_t
01055 ReplayerImpl::create_sample_data_message(DataSample* data,
01056 DataSampleHeader& header_data,
01057 ACE_Message_Block*& message,
01058 const DDS::Time_t& source_timestamp,
01059 bool content_filter)
01060 {
01061 header_data.message_id_ = SAMPLE_DATA;
01062 header_data.coherent_change_ = content_filter;
01063
01064 header_data.content_filter_ = 0;
01065 header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01066 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01067 header_data.sequence_repair_ = need_sequence_repair();
01068 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01069 this->sequence_number_ = SequenceNumber();
01070 } else {
01071 ++this->sequence_number_;
01072 }
01073 header_data.sequence_ = this->sequence_number_;
01074 header_data.source_timestamp_sec_ = source_timestamp.sec;
01075 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01076
01077 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01078 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01079 header_data.lifespan_duration_ = true;
01080 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01081 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01082 }
01083
01084
01085
01086 size_t max_marshaled_size = header_data.max_marshaled_size();
01087
01088 ACE_NEW_MALLOC_RETURN(message,
01089 static_cast<ACE_Message_Block*>(
01090 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01091 ACE_Message_Block(max_marshaled_size,
01092 ACE_Message_Block::MB_DATA,
01093 data,
01094 0,
01095 header_allocator_.get(),
01096 0,
01097 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01098 ACE_Time_Value::zero,
01099 ACE_Time_Value::max_time,
01100 db_allocator_.get(),
01101 mb_allocator_.get()),
01102 DDS::RETCODE_ERROR);
01103
01104 *message << header_data;
01105 return DDS::RETCODE_OK;
01106 }
01107
01108 bool
01109 ReplayerImpl::lookup_instance_handles(const ReaderIdSeq& ids,
01110 DDS::InstanceHandleSeq & hdls)
01111 {
01112 if (DCPS_debug_level > 9) {
01113 CORBA::ULong const size = ids.length();
01114 OPENDDS_STRING separator;
01115 OPENDDS_STRING buffer;
01116
01117 for (unsigned long i = 0; i < size; ++i) {
01118 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01119 separator = ", ";
01120 }
01121
01122 ACE_DEBUG((LM_DEBUG,
01123 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
01124 ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01125 buffer.c_str()));
01126 }
01127
01128 CORBA::ULong const num_rds = ids.length();
01129 hdls.length(num_rds);
01130
01131 for (CORBA::ULong i = 0; i < num_rds; ++i) {
01132 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01133 }
01134
01135 return true;
01136 }
01137
01138 bool
01139 ReplayerImpl::need_sequence_repair() const
01140 {
01141 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01142 end = reader_info_.end(); it != end; ++it) {
01143 if (it->second.expected_sequence_ != sequence_number_) {
01144 return true;
01145 }
01146 }
01147 return false;
01148 }
01149
01150 DDS::InstanceHandle_t
01151 ReplayerImpl::get_instance_handle()
01152 {
01153 return this->participant_servant_->id_to_handle(publication_id_);
01154 }
01155
01156
01157
01158
01159 DDS::ReturnCode_t
01160 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01161 const RawDataSample& sample )
01162 {
01163 return write(&sample, 1, &subscription);
01164 }
01165
01166 DDS::ReturnCode_t
01167 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01168 const RawDataSampleList& samples )
01169 {
01170 if (samples.size())
01171 return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01172 return DDS::RETCODE_ERROR;
01173 }
01174
01175 }
01176 }