00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00043
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046
00047 #include <stdexcept>
00048
00049 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00050
00051 namespace OpenDDS {
00052 namespace DCPS {
00053
00054
00055
00056
00057
00058 DataWriterImpl::DataWriterImpl()
00059 : data_dropped_count_(0),
00060 data_delivered_count_(0),
00061 controlTracker("DataWriterImpl"),
00062 n_chunks_(TheServiceParticipant->n_chunks()),
00063 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00064 qos_(TheServiceParticipant->initial_DataWriterQos()),
00065 db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks())),
00066 topic_id_(GUID_UNKNOWN),
00067 topic_servant_(0),
00068 listener_mask_(DEFAULT_STATUS_MASK),
00069 domain_id_(0),
00070 publication_id_(GUID_UNKNOWN),
00071 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00072 coherent_(false),
00073 coherent_samples_(0),
00074 liveliness_lost_(false),
00075 reactor_(0),
00076 liveliness_check_interval_(ACE_Time_Value::max_time),
00077 last_liveliness_activity_time_(ACE_Time_Value::zero),
00078 last_deadline_missed_total_count_(0),
00079 watchdog_(),
00080 is_bit_(false),
00081 min_suspended_transaction_id_(0),
00082 max_suspended_transaction_id_(0),
00083 monitor_(0),
00084 periodic_monitor_(0),
00085 liveliness_asserted_(false),
00086 liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
00087 {
00088 liveliness_lost_status_.total_count = 0;
00089 liveliness_lost_status_.total_count_change = 0;
00090
00091 offered_deadline_missed_status_.total_count = 0;
00092 offered_deadline_missed_status_.total_count_change = 0;
00093 offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00094
00095 offered_incompatible_qos_status_.total_count = 0;
00096 offered_incompatible_qos_status_.total_count_change = 0;
00097 offered_incompatible_qos_status_.last_policy_id = 0;
00098 offered_incompatible_qos_status_.policies.length(0);
00099
00100 publication_match_status_.total_count = 0;
00101 publication_match_status_.total_count_change = 0;
00102 publication_match_status_.current_count = 0;
00103 publication_match_status_.current_count_change = 0;
00104 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00105
00106 monitor_ =
00107 TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00108 periodic_monitor_ =
00109 TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00110 }
00111
00112
00113
00114 DataWriterImpl::~DataWriterImpl()
00115 {
00116 DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00117 }
00118
00119
00120 void
00121 DataWriterImpl::cleanup()
00122 {
00123
00124
00125
00126 set_listener(0, NO_STATUS_MASK);
00127 topic_servant_ = 0;
00128 }
00129
00130 void
00131 DataWriterImpl::init(
00132 TopicImpl * topic_servant,
00133 const DDS::DataWriterQos & qos,
00134 DDS::DataWriterListener_ptr a_listener,
00135 const DDS::StatusMask & mask,
00136 OpenDDS::DCPS::WeakRcHandle<OpenDDS::DCPS::DomainParticipantImpl> participant_servant,
00137 OpenDDS::DCPS::PublisherImpl * publisher_servant)
00138 {
00139 DBG_ENTRY_LVL("DataWriterImpl","init",6);
00140 topic_servant_ = topic_servant;
00141 topic_name_ = topic_servant_->get_name();
00142 topic_id_ = topic_servant_->get_id();
00143 type_name_ = topic_servant_->get_type_name();
00144
00145 #if !defined (DDS_HAS_MINIMUM_BIT)
00146 is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00147 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00148
00149 qos_ = qos;
00150
00151
00152 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00153 listener_mask_ = mask;
00154
00155
00156
00157 participant_servant_ = participant_servant;
00158
00159 RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
00160 domain_id_ = participant->get_domain_id();
00161
00162
00163
00164 publisher_servant_ = *publisher_servant;
00165
00166 this->reactor_ = TheServiceParticipant->timer();
00167 }
00168
00169 DDS::InstanceHandle_t
00170 DataWriterImpl::get_instance_handle()
00171 {
00172 using namespace OpenDDS::DCPS;
00173 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00174 if (participant)
00175 return participant->id_to_handle(publication_id_);
00176 return 0;
00177 }
00178
00179 DDS::InstanceHandle_t
00180 DataWriterImpl::get_next_handle()
00181 {
00182 using namespace OpenDDS::DCPS;
00183 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00184 if (participant)
00185 return participant->id_to_handle(GUID_UNKNOWN);
00186 return 0;
00187 }
00188
00189 void
00190 DataWriterImpl::add_association(const RepoId& yourId,
00191 const ReaderAssociation& reader,
00192 bool active)
00193 {
00194 DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00195
00196 if (DCPS_debug_level) {
00197 GuidConverter writer_converter(yourId);
00198 GuidConverter reader_converter(reader.readerId);
00199 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00200 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00201 OPENDDS_STRING(writer_converter).c_str(),
00202 OPENDDS_STRING(reader_converter).c_str()));
00203 }
00204
00205 if (entity_deleted_.value()) {
00206 if (DCPS_debug_level)
00207 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00208 ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00209
00210 return;
00211 }
00212
00213 if (GUID_UNKNOWN == publication_id_) {
00214 publication_id_ = yourId;
00215 }
00216
00217 {
00218 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00219 reader_info_.insert(std::make_pair(reader.readerId,
00220 ReaderInfo(reader.filterClassName,
00221 TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00222 reader.exprParams, participant_servant_,
00223 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00224 }
00225
00226 if (DCPS_debug_level > 4) {
00227 GuidConverter converter(get_publication_id());
00228 ACE_DEBUG((LM_DEBUG,
00229 ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00230 ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00231 OPENDDS_STRING(converter).c_str(),
00232 qos_.transport_priority.value));
00233 }
00234
00235 AssociationData data;
00236 data.remote_id_ = reader.readerId;
00237 data.remote_data_ = reader.readerTransInfo;
00238 data.remote_reliable_ =
00239 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00240 data.remote_durable_ =
00241 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00242
00243 if (!associate(data, active)) {
00244
00245 if (DCPS_debug_level) {
00246 ACE_ERROR((LM_ERROR,
00247 ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00248 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00249 }
00250 }
00251 }
00252
00253 void
00254 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00255 {
00256 DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00257
00258 if (!(flags & ASSOC_OK)) {
00259 if (DCPS_debug_level) {
00260 const GuidConverter conv(remote_id);
00261 ACE_ERROR((LM_ERROR,
00262 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00263 ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00264 OPENDDS_STRING(conv).c_str()));
00265 }
00266
00267 return;
00268 }
00269 if (DCPS_debug_level) {
00270 const GuidConverter writer_conv(publication_id_);
00271 const GuidConverter conv(remote_id);
00272 ACE_DEBUG((LM_INFO,
00273 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00274 ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00275 OPENDDS_STRING(writer_conv).c_str(),
00276 OPENDDS_STRING(conv).c_str()));
00277 }
00278 if (flags & ASSOC_ACTIVE) {
00279
00280 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00281
00282
00283 if (assoc_complete_readers_.count(remote_id)) {
00284 if (DCPS_debug_level) {
00285 const GuidConverter writer_conv(publication_id_);
00286 const GuidConverter converter(remote_id);
00287 ACE_DEBUG((LM_DEBUG,
00288 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00289 ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00290 OPENDDS_STRING(writer_conv).c_str(),
00291 OPENDDS_STRING(converter).c_str()));
00292 }
00293 assoc_complete_readers_.erase(remote_id);
00294 association_complete_i(remote_id);
00295
00296
00297
00298
00299 } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00300 const GuidConverter converter(remote_id);
00301 ACE_ERROR((LM_ERROR,
00302 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00303 ACE_TEXT("failed to mark %C as pending.\n"),
00304 OPENDDS_STRING(converter).c_str()));
00305
00306 } else {
00307 if (DCPS_debug_level) {
00308 const GuidConverter converter(remote_id);
00309 ACE_DEBUG((LM_DEBUG,
00310 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00311 ACE_TEXT("marked %C as pending.\n"),
00312 OPENDDS_STRING(converter).c_str()));
00313 }
00314 }
00315
00316 } else {
00317
00318
00319 if (DCPS_debug_level) {
00320 const GuidConverter conv(publication_id_);
00321 ACE_ERROR((LM_ERROR,
00322 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00323 ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00324 OPENDDS_STRING(conv).c_str()));
00325 }
00326 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00327 disco->association_complete(domain_id_, dp_id_,
00328 publication_id_, remote_id);
00329 }
00330 }
00331
00332 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00333 const char* filter,
00334 const DDS::StringSeq& params,
00335 WeakRcHandle<DomainParticipantImpl> participant,
00336 bool durable)
00337 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00338 : participant_(participant)
00339 , filter_class_name_(filterClassName)
00340 , filter_(filter)
00341 , expression_params_(params)
00342 , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00343 , durable_(durable)
00344 {
00345 RcHandle<DomainParticipantImpl> part = participant_.lock();
00346 if (part && *filter) {
00347 eval_ = part->get_filter_eval(filter);
00348 }
00349 }
00350 #else
00351 : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00352 , durable_(durable)
00353 {
00354 ACE_UNUSED_ARG(filterClassName);
00355 ACE_UNUSED_ARG(filter);
00356 ACE_UNUSED_ARG(params);
00357 ACE_UNUSED_ARG(participant);
00358 }
00359 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00360
00361 DataWriterImpl::ReaderInfo::~ReaderInfo()
00362 {
00363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00364 eval_ = RcHandle<FilterEvaluator>();
00365 RcHandle<DomainParticipantImpl> participant = participant_.lock();
00366 if (participant && !filter_.empty()) {
00367 participant->deref_filter_eval(filter_.c_str());
00368 }
00369
00370 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00371 }
00372
00373 void
00374 DataWriterImpl::association_complete(const RepoId& remote_id)
00375 {
00376 DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00377
00378 if (DCPS_debug_level >= 1) {
00379 GuidConverter writer_converter(this->publication_id_);
00380 GuidConverter reader_converter(remote_id);
00381 ACE_DEBUG((LM_DEBUG,
00382 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00383 ACE_TEXT("bit %d local %C remote %C\n"),
00384 is_bit_,
00385 OPENDDS_STRING(writer_converter).c_str(),
00386 OPENDDS_STRING(reader_converter).c_str()));
00387 }
00388
00389 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00390
00391 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00392 if (DCPS_debug_level) {
00393 GuidConverter writer_converter(this->publication_id_);
00394 GuidConverter reader_converter(remote_id);
00395 ACE_DEBUG((LM_DEBUG,
00396 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00397 ACE_TEXT("bit %d local %C did not find pending reader: %C")
00398 ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00399 is_bit_,
00400 OPENDDS_STRING(writer_converter).c_str(),
00401 OPENDDS_STRING(reader_converter).c_str()));
00402 }
00403
00404
00405 assoc_complete_readers_.insert(remote_id);
00406
00407 } else {
00408 association_complete_i(remote_id);
00409 }
00410 }
00411
00412 void
00413 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00414 {
00415 DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00416
00417 if (DCPS_debug_level >= 1) {
00418 GuidConverter writer_converter(this->publication_id_);
00419 GuidConverter reader_converter(remote_id);
00420 ACE_DEBUG((LM_DEBUG,
00421 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00422 ACE_TEXT("bit %d local %C remote %C\n"),
00423 is_bit_,
00424 OPENDDS_STRING(writer_converter).c_str(),
00425 OPENDDS_STRING(reader_converter).c_str()));
00426 }
00427
00428 bool reader_durable = false;
00429 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00430 OPENDDS_STRING filterClassName;
00431 RcHandle<FilterEvaluator> eval;
00432 DDS::StringSeq expression_params;
00433 #endif
00434 {
00435 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00436
00437 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00438 GuidConverter converter(remote_id);
00439 ACE_ERROR((LM_ERROR,
00440 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00441 ACE_TEXT("insert %C from pending failed.\n"),
00442 OPENDDS_STRING(converter).c_str()));
00443 }
00444 }
00445 {
00446 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00447 RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00448
00449 if (it != reader_info_.end()) {
00450 reader_durable = it->second.durable_;
00451 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00452 filterClassName = it->second.filter_class_name_;
00453 eval = it->second.eval_;
00454 expression_params = it->second.expression_params_;
00455 #endif
00456 }
00457 }
00458
00459 if (this->monitor_) {
00460 this->monitor_->report();
00461 }
00462
00463 if (!is_bit_) {
00464
00465 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00466
00467 if (!participant)
00468 return;
00469
00470 DDS::InstanceHandle_t handle =
00471 participant->id_to_handle(remote_id);
00472
00473 {
00474
00475 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00476
00477 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00478 GuidConverter converter(remote_id);
00479 ACE_DEBUG((LM_WARNING,
00480 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
00481 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00482 OPENDDS_STRING(converter).c_str(),
00483 handle));
00484 return;
00485
00486 } else if (DCPS_debug_level > 4) {
00487 GuidConverter converter(remote_id);
00488 ACE_DEBUG((LM_DEBUG,
00489 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00490 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00491 OPENDDS_STRING(converter).c_str(),
00492 handle));
00493 }
00494
00495 ++publication_match_status_.total_count;
00496 ++publication_match_status_.total_count_change;
00497 ++publication_match_status_.current_count;
00498 ++publication_match_status_.current_count_change;
00499 publication_match_status_.last_subscription_handle = handle;
00500 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00501 }
00502
00503 DDS::DataWriterListener_var listener =
00504 listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00505
00506 if (!CORBA::is_nil(listener.in())) {
00507
00508 listener->on_publication_matched(this, publication_match_status_);
00509
00510
00511
00512 publication_match_status_.total_count_change = 0;
00513 publication_match_status_.current_count_change = 0;
00514 }
00515
00516 notify_status_condition();
00517 }
00518
00519
00520 if (reader_durable) {
00521
00522
00523 this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00524 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00525 , filterClassName, eval.in(), expression_params
00526 #endif
00527 );
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00538 guard,
00539 this->get_lock());
00540
00541 SendStateDataSampleList list = this->get_resend_data();
00542 {
00543 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00544
00545 SequenceNumber& seq =
00546 reader_info_.find(remote_id)->second.expected_sequence_;
00547
00548 for (SendStateDataSampleList::iterator list_el = list.begin();
00549 list_el != list.end(); ++list_el) {
00550 list_el->get_header().historic_sample_ = true;
00551
00552 if (list_el->get_header().sequence_ > seq) {
00553 seq = list_el->get_header().sequence_;
00554 }
00555 }
00556 }
00557
00558 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00559 if (!publisher || publisher->is_suspended()) {
00560 this->available_data_list_.enqueue_tail(list);
00561
00562 } else {
00563 if (DCPS_debug_level >= 4) {
00564 ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00565 }
00566
00567 size_t size = 0, padding = 0;
00568 gen_find_size(remote_id, size, padding);
00569 Message_Block_Ptr data(
00570 new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00571 get_db_lock()));
00572 Serializer ser(data.get());
00573 ser << remote_id;
00574
00575 const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00576 DataSampleHeader header;
00577 Message_Block_Ptr end_historic_samples(
00578 create_control_message(END_HISTORIC_SAMPLES, header, move(data), timestamp));
00579
00580 this->controlTracker.message_sent();
00581 guard.release();
00582 SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
00583 if (ret == SEND_CONTROL_ERROR) {
00584 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00585 ACE_TEXT("DataWriterImpl::association_complete_i: ")
00586 ACE_TEXT("send_w_control failed.\n")));
00587 this->controlTracker.message_dropped();
00588 }
00589 }
00590 }
00591 }
00592
00593 void
00594 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00595 CORBA::Boolean notify_lost)
00596 {
00597 if (readers.length() == 0) {
00598 return;
00599 }
00600
00601 if (DCPS_debug_level >= 1) {
00602 GuidConverter writer_converter(publication_id_);
00603 GuidConverter reader_converter(readers[0]);
00604 ACE_DEBUG((LM_DEBUG,
00605 ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00606 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00607 is_bit_,
00608 OPENDDS_STRING(writer_converter).c_str(),
00609 OPENDDS_STRING(reader_converter).c_str(),
00610 readers.length()));
00611 }
00612
00613
00614 this->stop_associating(readers.get_buffer(), readers.length());
00615
00616 ReaderIdSeq fully_associated_readers;
00617 CORBA::ULong fully_associated_len = 0;
00618 ReaderIdSeq rds;
00619 CORBA::ULong rds_len = 0;
00620 DDS::InstanceHandleSeq handles;
00621
00622 ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00623 {
00624
00625 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00626
00627
00628
00629
00630
00631 CORBA::ULong len = readers.length();
00632
00633 for (CORBA::ULong i = 0; i < len; ++i) {
00634
00635
00636
00637
00638 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00639 ++ fully_associated_len;
00640 fully_associated_readers.length(fully_associated_len);
00641 fully_associated_readers [fully_associated_len - 1] = readers[i];
00642
00643 ++ rds_len;
00644 rds.length(rds_len);
00645 rds [rds_len - 1] = readers[i];
00646
00647 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00648 ++ rds_len;
00649 rds.length(rds_len);
00650 rds [rds_len - 1] = readers[i];
00651
00652 GuidConverter converter(readers[i]);
00653 ACE_DEBUG((LM_WARNING,
00654 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00655 ACE_TEXT("removing reader %C before association_complete() call.\n"),
00656 OPENDDS_STRING(converter).c_str()));
00657 }
00658
00659 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00660 reader_info_.erase(readers[i]);
00661
00662
00663 }
00664
00665 if (fully_associated_len > 0 && !is_bit_) {
00666
00667 this->lookup_instance_handles(fully_associated_readers, handles);
00668
00669 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00670 id_to_handle_map_.erase(fully_associated_readers[i]);
00671 }
00672 }
00673
00674
00675
00676 if (!this->is_bit_) {
00677
00678
00679 int matchedSubscriptions =
00680 static_cast<int>(this->id_to_handle_map_.size());
00681 this->publication_match_status_.current_count_change =
00682 matchedSubscriptions - this->publication_match_status_.current_count;
00683
00684
00685 if (this->publication_match_status_.current_count_change != 0) {
00686 this->publication_match_status_.current_count = matchedSubscriptions;
00687
00688
00689
00690
00691 this->publication_match_status_.last_subscription_handle =
00692 handles[fully_associated_len - 1];
00693
00694 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00695
00696 DDS::DataWriterListener_var listener =
00697 this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00698
00699 if (!CORBA::is_nil(listener.in())) {
00700 listener->on_publication_matched(this, this->publication_match_status_);
00701
00702
00703 this->publication_match_status_.total_count_change = 0;
00704 this->publication_match_status_.current_count_change = 0;
00705 }
00706
00707 this->notify_status_condition();
00708 }
00709 }
00710 }
00711
00712 for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00713 this->disassociate(rds[i]);
00714 }
00715
00716
00717
00718
00719 if (notify_lost && handles.length() > 0) {
00720 this->notify_publication_lost(handles);
00721 }
00722 }
00723
00724 void DataWriterImpl::remove_all_associations()
00725 {
00726 DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00727
00728 this->stop_associating();
00729
00730 OpenDDS::DCPS::ReaderIdSeq readers;
00731 CORBA::ULong size;
00732 CORBA::ULong num_pending_readers;
00733 {
00734 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00735
00736 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00737 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00738 readers.length(size);
00739
00740 RepoIdSet::iterator itEnd = readers_.end();
00741 int i = 0;
00742
00743 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00744 readers[i ++] = *it;
00745 }
00746
00747 itEnd = pending_readers_.end();
00748
00749 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00750 readers[i ++] = *it;
00751 }
00752
00753 if (num_pending_readers > 0) {
00754 ACE_DEBUG((LM_WARNING,
00755 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00756 ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00757 num_pending_readers));
00758 }
00759 }
00760
00761 try {
00762 if (0 < size) {
00763 CORBA::Boolean dont_notify_lost = false;
00764
00765 this->remove_associations(readers, dont_notify_lost);
00766 }
00767
00768 } catch (const CORBA::Exception&) {
00769 ACE_DEBUG((LM_WARNING,
00770 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00771 ACE_TEXT("caught exception from remove_associations.\n")));
00772 }
00773 }
00774
00775 void
00776 DataWriterImpl::register_for_reader(const RepoId& participant,
00777 const RepoId& writerid,
00778 const RepoId& readerid,
00779 const TransportLocatorSeq& locators,
00780 DiscoveryListener* listener)
00781 {
00782 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00783 }
00784
00785 void
00786 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00787 const RepoId& writerid,
00788 const RepoId& readerid)
00789 {
00790 TransportClient::unregister_for_reader(participant, writerid, readerid);
00791 }
00792
00793 void
00794 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00795 {
00796 DDS::DataWriterListener_var listener =
00797 listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00798
00799 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00800
00801 #if 0
00802
00803 if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00804
00805 return;
00806 }
00807
00808 #endif
00809
00810 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00811
00812
00813 offered_incompatible_qos_status_.total_count = status.total_count;
00814 offered_incompatible_qos_status_.total_count_change +=
00815 status.count_since_last_send;
00816 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00817 offered_incompatible_qos_status_.policies = status.policies;
00818
00819 if (!CORBA::is_nil(listener.in())) {
00820 listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
00821
00822
00823
00824 offered_incompatible_qos_status_.total_count_change = 0;
00825 }
00826
00827 notify_status_condition();
00828 }
00829
00830 void
00831 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00832 const DDS::StringSeq& params)
00833 {
00834 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00835 ACE_UNUSED_ARG(readerId);
00836 ACE_UNUSED_ARG(params);
00837 #else
00838 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00839 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00840 RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00841
00842 if (iter != reader_info_.end()) {
00843 iter->second.expression_params_ = params;
00844
00845 } else if (DCPS_debug_level > 4 &&
00846 TheServiceParticipant->publisher_content_filter()) {
00847 GuidConverter pubConv(this->publication_id_), subConv(readerId);
00848 ACE_DEBUG((LM_WARNING,
00849 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00850 ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00851 OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00852 }
00853
00854 #endif
00855 }
00856
00857 void
00858 DataWriterImpl::inconsistent_topic()
00859 {
00860 topic_servant_->inconsistent_topic();
00861 }
00862
00863 DDS::ReturnCode_t
00864 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00865 {
00866
00867 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00868 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00869 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00870 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00871 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00872
00873 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00874 if (qos_ == qos)
00875 return DDS::RETCODE_OK;
00876
00877 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00878 return DDS::RETCODE_IMMUTABLE_POLICY;
00879
00880 } else {
00881 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00882 DDS::PublisherQos publisherQos;
00883 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00884
00885 bool status = false;
00886 if (publisher) {
00887 publisher->get_qos(publisherQos);
00888 status
00889 = disco->update_publication_qos(domain_id_,
00890 dp_id_,
00891 this->publication_id_,
00892 qos,
00893 publisherQos);
00894 }
00895 if (!status) {
00896 ACE_ERROR_RETURN((LM_ERROR,
00897 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00898 ACE_TEXT("qos not updated. \n")),
00899 DDS::RETCODE_ERROR);
00900 }
00901 }
00902
00903 if (!(qos_ == qos)) {
00904
00905 if (qos_.deadline.period.sec != qos.deadline.period.sec
00906 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00907 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00908 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00909 this->watchdog_= make_rch<OfferedDeadlineWatchdog>(
00910 ref(this->lock_),
00911 qos.deadline,
00912 ref(*this),
00913 ref(this->offered_deadline_missed_status_),
00914 ref(this->last_deadline_missed_total_count_));
00915
00916 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00917 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00918 this->watchdog_->cancel_all();
00919 this->watchdog_.reset();
00920
00921 } else {
00922 this->watchdog_->reset_interval(
00923 duration_to_time_value(qos.deadline.period));
00924 }
00925 }
00926
00927 qos_ = qos;
00928 }
00929
00930 return DDS::RETCODE_OK;
00931
00932 } else {
00933 return DDS::RETCODE_INCONSISTENT_POLICY;
00934 }
00935 }
00936
00937 DDS::ReturnCode_t
00938 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00939 {
00940 qos = qos_;
00941 return DDS::RETCODE_OK;
00942 }
00943
00944 DDS::ReturnCode_t
00945 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00946 DDS::StatusMask mask)
00947 {
00948 listener_mask_ = mask;
00949
00950 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00951 return DDS::RETCODE_OK;
00952 }
00953
00954 DDS::DataWriterListener_ptr
00955 DataWriterImpl::get_listener()
00956 {
00957 return DDS::DataWriterListener::_duplicate(listener_.in());
00958 }
00959
00960 DDS::Topic_ptr
00961 DataWriterImpl::get_topic()
00962 {
00963 return DDS::Topic::_duplicate(topic_servant_.get());
00964 }
00965
00966 bool
00967 DataWriterImpl::should_ack() const
00968 {
00969
00970
00971
00972 return this->readers_.size() != 0;
00973 }
00974
00975 DataWriterImpl::AckToken
00976 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
00977 {
00978 if (DCPS_debug_level > 0) {
00979 ACE_DEBUG((LM_DEBUG,
00980 ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
00981 ACE_TEXT("for sequence %q \n"),
00982 this->sequence_number_.getValue()));
00983 }
00984 return AckToken(max_wait, this->sequence_number_);
00985 }
00986
00987
00988
00989 DDS::ReturnCode_t
00990 DataWriterImpl::send_request_ack()
00991 {
00992 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00993 guard,
00994 get_lock(),
00995 DDS::RETCODE_ERROR);
00996
00997
00998 DataSampleElement* element = 0;
00999 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
01000
01001 if (ret != DDS::RETCODE_OK) {
01002 ACE_ERROR_RETURN((LM_ERROR,
01003 ACE_TEXT("(%P|%t) ERROR: ")
01004 ACE_TEXT("DataWriterImpl::send_request_ack: ")
01005 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01006 ret),
01007 ret);
01008 }
01009
01010 Message_Block_Ptr blk;
01011
01012 Message_Block_Ptr sample(create_control_message(REQUEST_ACK,
01013 element->get_header(),
01014 move(blk),
01015 time_value_to_time( ACE_OS::gettimeofday() )));
01016 element->set_sample(move(sample));
01017
01018 ret = this->data_container_->enqueue_control(element);
01019
01020 if (ret != DDS::RETCODE_OK) {
01021 ACE_ERROR_RETURN((LM_ERROR,
01022 ACE_TEXT("(%P|%t) ERROR: ")
01023 ACE_TEXT("DataWriterImpl::send_request_ack: ")
01024 ACE_TEXT("enqueue_control failed.\n")),
01025 ret);
01026 }
01027
01028
01029 send_all_to_flush_control(guard);
01030
01031 return DDS::RETCODE_OK;
01032 }
01033
01034 DDS::ReturnCode_t
01035 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01036 {
01037 if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01038 return DDS::RETCODE_OK;
01039
01040 DDS::ReturnCode_t ret = send_request_ack();
01041
01042 if (ret != DDS::RETCODE_OK)
01043 return ret;
01044
01045 DataWriterImpl::AckToken token = create_ack_token(max_wait);
01046 if (DCPS_debug_level) {
01047 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01048 ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01049 token.sequence_.getValue()));
01050 }
01051 return wait_for_specific_ack(token);
01052 }
01053
01054 DDS::ReturnCode_t
01055 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01056 {
01057 return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01058 }
01059
01060 DDS::Publisher_ptr
01061 DataWriterImpl::get_publisher()
01062 {
01063 return publisher_servant_.lock()._retn();
01064 }
01065
01066 DDS::ReturnCode_t
01067 DataWriterImpl::get_liveliness_lost_status(
01068 DDS::LivelinessLostStatus & status)
01069 {
01070 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01071 guard,
01072 this->lock_,
01073 DDS::RETCODE_ERROR);
01074 set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01075 status = liveliness_lost_status_;
01076 liveliness_lost_status_.total_count_change = 0;
01077 return DDS::RETCODE_OK;
01078 }
01079
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_deadline_missed_status(
01082 DDS::OfferedDeadlineMissedStatus & status)
01083 {
01084 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085 guard,
01086 this->lock_,
01087 DDS::RETCODE_ERROR);
01088
01089 set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01090
01091 this->offered_deadline_missed_status_.total_count_change =
01092 this->offered_deadline_missed_status_.total_count
01093 - this->last_deadline_missed_total_count_;
01094
01095
01096 this->last_deadline_missed_total_count_ =
01097 this->offered_deadline_missed_status_.total_count;
01098
01099 status = offered_deadline_missed_status_;
01100
01101 this->offered_deadline_missed_status_.total_count_change = 0;
01102
01103 return DDS::RETCODE_OK;
01104 }
01105
01106 DDS::ReturnCode_t
01107 DataWriterImpl::get_offered_incompatible_qos_status(
01108 DDS::OfferedIncompatibleQosStatus & status)
01109 {
01110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01111 guard,
01112 this->lock_,
01113 DDS::RETCODE_ERROR);
01114 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01115 status = offered_incompatible_qos_status_;
01116 offered_incompatible_qos_status_.total_count_change = 0;
01117 return DDS::RETCODE_OK;
01118 }
01119
01120 DDS::ReturnCode_t
01121 DataWriterImpl::get_publication_matched_status(
01122 DDS::PublicationMatchedStatus & status)
01123 {
01124 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01125 guard,
01126 this->lock_,
01127 DDS::RETCODE_ERROR);
01128 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01129 status = publication_match_status_;
01130 publication_match_status_.total_count_change = 0;
01131 publication_match_status_.current_count_change = 0;
01132 return DDS::RETCODE_OK;
01133 }
01134
01135 DDS::ReturnCode_t
01136 DataWriterImpl::assert_liveliness()
01137 {
01138 switch (this->qos_.liveliness.kind) {
01139 case DDS::AUTOMATIC_LIVELINESS_QOS:
01140
01141 break;
01142 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01143 {
01144 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01145 if (participant)
01146 return participant->assert_liveliness();
01147 return DDS::RETCODE_OK;
01148 }
01149 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01150 if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01151 return DDS::RETCODE_ERROR;
01152 }
01153 break;
01154 }
01155
01156 return DDS::RETCODE_OK;
01157 }
01158
01159 DDS::ReturnCode_t
01160 DataWriterImpl::assert_liveliness_by_participant()
01161 {
01162
01163
01164 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01165
01166 liveliness_asserted_ = true;
01167 }
01168
01169 return DDS::RETCODE_OK;
01170 }
01171
01172 ACE_Time_Value
01173 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01174 {
01175 if (this->qos_.liveliness.kind == kind) {
01176 return liveliness_check_interval_;
01177 } else {
01178 return ACE_Time_Value::max_time;
01179 }
01180 }
01181
01182 bool
01183 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01184 {
01185 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01186 return last_liveliness_activity_time_ > tv;
01187 } else {
01188 return false;
01189 }
01190 }
01191
01192 DDS::ReturnCode_t
01193 DataWriterImpl::get_matched_subscriptions(
01194 DDS::InstanceHandleSeq & subscription_handles)
01195 {
01196 if (enabled_ == false) {
01197 ACE_ERROR_RETURN((LM_ERROR,
01198 ACE_TEXT("(%P|%t) ERROR: ")
01199 ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01200 ACE_TEXT(" Entity is not enabled. \n")),
01201 DDS::RETCODE_NOT_ENABLED);
01202 }
01203
01204 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01205 guard,
01206 this->lock_,
01207 DDS::RETCODE_ERROR);
01208
01209
01210 int index = 0;
01211 subscription_handles.length(
01212 static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01213
01214 for (RepoIdToHandleMap::iterator
01215 current = this->id_to_handle_map_.begin();
01216 current != this->id_to_handle_map_.end();
01217 ++current, ++index) {
01218 subscription_handles[index] = current->second;
01219 }
01220
01221 return DDS::RETCODE_OK;
01222 }
01223
01224 #if !defined (DDS_HAS_MINIMUM_BIT)
01225 DDS::ReturnCode_t
01226 DataWriterImpl::get_matched_subscription_data(
01227 DDS::SubscriptionBuiltinTopicData & subscription_data,
01228 DDS::InstanceHandle_t subscription_handle)
01229 {
01230 if (enabled_ == false) {
01231 ACE_ERROR_RETURN((LM_ERROR,
01232 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01233 ACE_TEXT("get_matched_subscription_data: ")
01234 ACE_TEXT("Entity is not enabled. \n")),
01235 DDS::RETCODE_NOT_ENABLED);
01236 }
01237 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01238
01239 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01240 DDS::SubscriptionBuiltinTopicDataSeq data;
01241
01242 if (participant) {
01243 ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
01244 participant.in(),
01245 BUILT_IN_SUBSCRIPTION_TOPIC,
01246 subscription_handle,
01247 data);
01248 }
01249
01250 if (ret == DDS::RETCODE_OK) {
01251 subscription_data = data[0];
01252 }
01253
01254 return ret;
01255 }
01256 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01257
01258 DDS::ReturnCode_t
01259 DataWriterImpl::enable()
01260 {
01261
01262
01263
01264
01265
01266
01267 if (this->is_enabled()) {
01268 return DDS::RETCODE_OK;
01269 }
01270
01271 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01272 if (!publisher || !publisher->is_enabled()) {
01273 return DDS::RETCODE_PRECONDITION_NOT_MET;
01274 }
01275
01276 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01277 if (participant) {
01278 dp_id_ = participant->get_id();
01279 }
01280
01281
01282
01283
01284
01285
01286
01287 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01288
01289 CORBA::Long const max_samples_per_instance =
01290 (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED)
01291 ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance;
01292
01293 CORBA::Long max_instances = 0, max_total_samples = 0;
01294
01295 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01296 n_chunks_ = qos_.resource_limits.max_samples;
01297
01298 if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED ||
01299 (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01300 || (qos_.resource_limits.max_samples <
01301 (qos_.resource_limits.max_instances * max_samples_per_instance))) {
01302 max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
01303 }
01304 }
01305
01306 if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01307 max_instances = qos_.resource_limits.max_instances;
01308
01309 const CORBA::Long history_depth =
01310 (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS ||
01311 qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth;
01312
01313 const CORBA::Long max_durable_per_instance =
01314 qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
01315
01316
01317 this->enable_specific();
01318
01319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01320
01321
01322 DataDurabilityCache* const durability_cache =
01323 TheServiceParticipant->get_data_durability_cache(qos_.durability);
01324 #endif
01325
01326
01327
01328 data_container_ .reset(new WriteDataContainer(this,
01329 max_samples_per_instance,
01330 history_depth,
01331 max_durable_per_instance,
01332 qos_.reliability.max_blocking_time,
01333 n_chunks_,
01334 domain_id_,
01335 topic_name_,
01336 get_type_name(),
01337 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01338 durability_cache,
01339 qos_.durability_service,
01340 #endif
01341 max_instances,
01342 max_total_samples));
01343
01344
01345
01346 mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
01347 db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
01348 header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
01349
01350 if (DCPS_debug_level >= 2) {
01351 ACE_DEBUG((LM_DEBUG,
01352 "(%P|%t) DataWriterImpl::enable-mb"
01353 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01354 mb_allocator_.get(),
01355 n_chunks_));
01356
01357 ACE_DEBUG((LM_DEBUG,
01358 "(%P|%t) DataWriterImpl::enable-db"
01359 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01360 db_allocator_.get(),
01361 n_chunks_));
01362
01363 ACE_DEBUG((LM_DEBUG,
01364 "(%P|%t) DataWriterImpl::enable-header"
01365 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01366 header_allocator_.get(),
01367 n_chunks_));
01368 }
01369
01370 if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01371 qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01372 liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01373 liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01374
01375 if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01376 liveliness_check_interval_ = ACE_Time_Value (0, 1);
01377 }
01378
01379 if (reactor_->schedule_timer(liveness_timer_.in(),
01380 0,
01381 liveliness_check_interval_,
01382 liveliness_check_interval_) == -1) {
01383 ACE_ERROR((LM_ERROR,
01384 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01385 ACE_TEXT("schedule_timer")));
01386
01387 }
01388 }
01389
01390 if (!participant)
01391 return DDS::RETCODE_ERROR;
01392
01393 participant->add_adjust_liveliness_timers(this);
01394
01395
01396
01397 DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01398
01399 if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01400 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01401 this->watchdog_ = make_rch<OfferedDeadlineWatchdog>(
01402 ref(this->lock_),
01403 this->qos_.deadline,
01404 ref(*this),
01405 ref(this->offered_deadline_missed_status_),
01406 ref(this->last_deadline_missed_total_count_));
01407 }
01408
01409 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01410 disco->pre_writer(this);
01411
01412 this->set_enabled();
01413
01414 try {
01415 this->enable_transport(reliable,
01416 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01417
01418 } catch (const Transport::Exception&) {
01419 ACE_ERROR((LM_ERROR,
01420 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01421 ACE_TEXT("Transport Exception.\n")));
01422 data_container_->shutdown_ = true;
01423 return DDS::RETCODE_ERROR;
01424 }
01425
01426 const TransportLocatorSeq& trans_conf_info = connection_info();
01427
01428 DDS::PublisherQos pub_qos;
01429
01430 publisher->get_qos(pub_qos);
01431
01432 this->publication_id_ =
01433 disco->add_publication(this->domain_id_,
01434 this->dp_id_,
01435 this->topic_servant_->get_id(),
01436 this,
01437 this->qos_,
01438 trans_conf_info,
01439 pub_qos);
01440
01441
01442 if (!publisher || this->publication_id_ == GUID_UNKNOWN) {
01443 ACE_DEBUG((LM_WARNING,
01444 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::enable, ")
01445 ACE_TEXT("add_publication returned invalid id. \n")));
01446 data_container_->shutdown_ = true;
01447 return DDS::RETCODE_ERROR;
01448 }
01449
01450 this->data_container_->publication_id_ = this->publication_id_;
01451
01452 const DDS::ReturnCode_t writer_enabled_result =
01453 publisher->writer_enabled(topic_name_.in(), this);
01454
01455 if (this->monitor_) {
01456 this->monitor_->report();
01457 }
01458
01459 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01460
01461
01462
01463 if (durability_cache != 0) {
01464
01465 if (!durability_cache->get_data(this->domain_id_,
01466 this->topic_name_,
01467 get_type_name(),
01468 this,
01469 this->mb_allocator_.get(),
01470 this->db_allocator_.get(),
01471 this->qos_.lifespan)) {
01472 ACE_ERROR((LM_ERROR,
01473 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01474 ACE_TEXT("unable to retrieve durable data\n")));
01475 }
01476 }
01477
01478 #endif
01479
01480 return writer_enabled_result;
01481 }
01482
01483 void
01484 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01485 {
01486 DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01487
01488 SendStateDataSampleList list;
01489
01490 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01491
01492 controlTracker.message_sent();
01493
01494
01495 guard.release();
01496
01497 this->send(list, transaction_id);
01498 }
01499
01500 DDS::ReturnCode_t
01501 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01502 Message_Block_Ptr data,
01503 const DDS::Time_t& source_timestamp)
01504 {
01505 DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01506
01507 if (enabled_ == false) {
01508 ACE_ERROR_RETURN((LM_ERROR,
01509 ACE_TEXT("(%P|%t) ERROR: ")
01510 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01511 ACE_TEXT(" Entity is not enabled. \n")),
01512 DDS::RETCODE_NOT_ENABLED);
01513 }
01514
01515 DDS::ReturnCode_t ret =
01516 this->data_container_->register_instance(handle, data);
01517
01518 if (ret != DDS::RETCODE_OK) {
01519 ACE_ERROR_RETURN((LM_ERROR,
01520 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01521 ACE_TEXT("register instance with container failed.\n")),
01522 ret);
01523 }
01524
01525 if (this->monitor_) {
01526 this->monitor_->report();
01527 }
01528
01529 DataSampleElement* element = 0;
01530 ret = this->data_container_->obtain_buffer_for_control(element);
01531
01532 if (ret != DDS::RETCODE_OK) {
01533 ACE_ERROR_RETURN((LM_ERROR,
01534 ACE_TEXT("(%P|%t) ERROR: ")
01535 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01536 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01537 ret),
01538 ret);
01539 }
01540
01541
01542 Message_Block_Ptr sample(create_control_message(INSTANCE_REGISTRATION,
01543 element->get_header(),
01544 move(data),
01545 source_timestamp));
01546
01547 element->set_sample(move(sample));
01548
01549 ret = this->data_container_->enqueue_control(element);
01550
01551 if (ret != DDS::RETCODE_OK) {
01552 ACE_ERROR_RETURN((LM_ERROR,
01553 ACE_TEXT("(%P|%t) ERROR: ")
01554 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01555 ACE_TEXT("enqueue_control failed.\n")),
01556 ret);
01557 }
01558
01559 return ret;
01560 }
01561
01562 DDS::ReturnCode_t
01563 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01564 Message_Block_Ptr data,
01565 const DDS::Time_t & source_timestamp)
01566 {
01567 DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01568
01569 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01570 guard,
01571 get_lock(),
01572 DDS::RETCODE_ERROR);
01573
01574 DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
01575 if (ret != DDS::RETCODE_OK) {
01576 ACE_ERROR_RETURN((LM_ERROR,
01577 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01578 ACE_TEXT("register instance with container failed.\n")),
01579 ret);
01580 }
01581
01582 send_all_to_flush_control(guard);
01583
01584 return ret;
01585 }
01586
01587 DDS::ReturnCode_t
01588 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01589 const DDS::Time_t& source_timestamp)
01590 {
01591 DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01592
01593 if (enabled_ == false) {
01594 ACE_ERROR_RETURN((LM_ERROR,
01595 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01596 ACE_TEXT(" Entity is not enabled.\n")),
01597 DDS::RETCODE_NOT_ENABLED);
01598 }
01599
01600
01601
01602 if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01603 return this->dispose_and_unregister(handle, source_timestamp);
01604 }
01605
01606 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01607 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01608 Message_Block_Ptr unregistered_sample_data;
01609 ret = this->data_container_->unregister(handle, unregistered_sample_data);
01610
01611 if (ret != DDS::RETCODE_OK) {
01612 ACE_ERROR_RETURN((LM_ERROR,
01613 ACE_TEXT("(%P|%t) ERROR: ")
01614 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01615 ACE_TEXT(" unregister with container failed. \n")),
01616 ret);
01617 }
01618
01619 DataSampleElement* element = 0;
01620 ret = this->data_container_->obtain_buffer_for_control(element);
01621
01622 if (ret != DDS::RETCODE_OK) {
01623 ACE_ERROR_RETURN((LM_ERROR,
01624 ACE_TEXT("(%P|%t) ERROR: ")
01625 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01626 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01627 ret),
01628 ret);
01629 }
01630
01631 Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE,
01632 element->get_header(),
01633 move(unregistered_sample_data),
01634 source_timestamp));
01635 element->set_sample(move(sample));
01636 ret = this->data_container_->enqueue_control(element);
01637
01638 if (ret != DDS::RETCODE_OK) {
01639 ACE_ERROR_RETURN((LM_ERROR,
01640 ACE_TEXT("(%P|%t) ERROR: ")
01641 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01642 ACE_TEXT("enqueue_control failed.\n")),
01643 ret);
01644 }
01645
01646 send_all_to_flush_control(guard);
01647 return DDS::RETCODE_OK;
01648 }
01649
01650 DDS::ReturnCode_t
01651 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01652 const DDS::Time_t& source_timestamp)
01653 {
01654 DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01655
01656 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01657 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01658
01659 Message_Block_Ptr data_sample;
01660 ret = this->data_container_->dispose(handle, data_sample);
01661
01662 if (ret != DDS::RETCODE_OK) {
01663 ACE_ERROR_RETURN((LM_ERROR,
01664 ACE_TEXT("(%P|%t) ERROR: ")
01665 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01666 ACE_TEXT("dispose on container failed. \n")),
01667 ret);
01668 }
01669
01670 ret = this->data_container_->unregister(handle, data_sample, false);
01671
01672 if (ret != DDS::RETCODE_OK) {
01673 ACE_ERROR_RETURN((LM_ERROR,
01674 ACE_TEXT("(%P|%t) ERROR: ")
01675 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01676 ACE_TEXT("unregister with container failed. \n")),
01677 ret);
01678 }
01679
01680 DataSampleElement* element = 0;
01681 ret = this->data_container_->obtain_buffer_for_control(element);
01682
01683 if (ret != DDS::RETCODE_OK) {
01684 ACE_ERROR_RETURN((LM_ERROR,
01685 ACE_TEXT("(%P|%t) ERROR: ")
01686 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01687 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01688 ret),
01689 ret);
01690 }
01691
01692 Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01693 element->get_header(),
01694 move(data_sample),
01695 source_timestamp));
01696 element->set_sample(move(sample));
01697 ret = this->data_container_->enqueue_control(element);
01698
01699 if (ret != DDS::RETCODE_OK) {
01700 ACE_ERROR_RETURN((LM_ERROR,
01701 ACE_TEXT("(%P|%t) ERROR: ")
01702 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01703 ACE_TEXT("enqueue_control failed.\n")),
01704 ret);
01705 }
01706
01707 send_all_to_flush_control(guard);
01708 return DDS::RETCODE_OK;
01709 }
01710
01711 void
01712 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01713 {
01714 {
01715 ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01716
01717 PublicationInstanceMapType::iterator it =
01718 this->data_container_->instances_.begin();
01719
01720 while (it != this->data_container_->instances_.end()) {
01721 if (!it->second->unregistered_) {
01722 const DDS::InstanceHandle_t handle = it->first;
01723 ++it;
01724 this->unregister_instance_i(handle, source_timestamp);
01725 } else {
01726 ++it;
01727 }
01728 }
01729 }
01730 }
01731
01732 DDS::ReturnCode_t
01733 DataWriterImpl::write(Message_Block_Ptr data,
01734 DDS::InstanceHandle_t handle,
01735 const DDS::Time_t& source_timestamp,
01736 GUIDSeq* filter_out)
01737 {
01738 DBG_ENTRY_LVL("DataWriterImpl","write",6);
01739
01740 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01741 guard,
01742 get_lock (),
01743 DDS::RETCODE_ERROR);
01744
01745
01746 GUIDSeq_var filter_out_var(filter_out);
01747
01748 if (enabled_ == false) {
01749 ACE_ERROR_RETURN((LM_ERROR,
01750 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01751 ACE_TEXT(" Entity is not enabled. \n")),
01752 DDS::RETCODE_NOT_ENABLED);
01753 }
01754
01755 DataSampleElement* element = 0;
01756 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01757
01758 if (ret == DDS::RETCODE_TIMEOUT) {
01759 return ret;
01760
01761 } else if (ret != DDS::RETCODE_OK) {
01762 ACE_ERROR_RETURN((LM_ERROR,
01763 ACE_TEXT("(%P|%t) ERROR: ")
01764 ACE_TEXT("DataWriterImpl::write: ")
01765 ACE_TEXT("obtain_buffer returned %d.\n"),
01766 ret),
01767 ret);
01768 }
01769
01770 Message_Block_Ptr temp;
01771 ret = create_sample_data_message(move(data),
01772 handle,
01773 element->get_header(),
01774 temp,
01775 source_timestamp,
01776 (filter_out != 0));
01777 element->set_sample(move(temp));
01778
01779 if (ret != DDS::RETCODE_OK) {
01780 return ret;
01781 }
01782
01783 element->set_filter_out(filter_out_var._retn());
01784
01785 ret = this->data_container_->enqueue(element, handle);
01786
01787 if (ret != DDS::RETCODE_OK) {
01788 ACE_ERROR_RETURN((LM_ERROR,
01789 ACE_TEXT("(%P|%t) ERROR: ")
01790 ACE_TEXT("DataWriterImpl::write: ")
01791 ACE_TEXT("enqueue failed.\n")),
01792 ret);
01793 }
01794 this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01795
01796 track_sequence_number(filter_out);
01797
01798 if (this->coherent_) {
01799 ++this->coherent_samples_;
01800 }
01801 SendStateDataSampleList list;
01802
01803 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01804
01805 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01806 if (!publisher || publisher->is_suspended()) {
01807 if (min_suspended_transaction_id_ == 0) {
01808
01809
01810 min_suspended_transaction_id_ = transaction_id;
01811 } else {
01812
01813
01814 max_suspended_transaction_id_ = transaction_id;
01815 }
01816 this->available_data_list_.enqueue_tail(list);
01817
01818 } else {
01819 guard.release();
01820
01821 this->send(list, transaction_id);
01822 }
01823
01824 return DDS::RETCODE_OK;
01825 }
01826
01827 void
01828 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01829 {
01830 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01831
01832 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01833
01834 RepoIdSet excluded;
01835
01836 if (filter_out && !reader_info_.empty()) {
01837 const GUID_t* buf = filter_out->get_buffer();
01838 excluded.insert(buf, buf + filter_out->length());
01839 }
01840
01841 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01842 end = reader_info_.end(); iter != end; ++iter) {
01843
01844 if (excluded.count(iter->first) == 0) {
01845 iter->second.expected_sequence_ = sequence_number_;
01846 }
01847 }
01848
01849 #else
01850 ACE_UNUSED_ARG(filter_out);
01851 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01852 end = reader_info_.end(); iter != end; ++iter) {
01853 iter->second.expected_sequence_ = sequence_number_;
01854 }
01855
01856 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01857
01858 }
01859
01860 void
01861 DataWriterImpl::send_suspended_data()
01862 {
01863
01864
01865 if (max_suspended_transaction_id_ != 0) {
01866 this->send(this->available_data_list_, max_suspended_transaction_id_);
01867 max_suspended_transaction_id_ = 0;
01868 }
01869
01870
01871
01872
01873
01874 this->send(this->available_data_list_, min_suspended_transaction_id_);
01875 min_suspended_transaction_id_ = 0;
01876 this->available_data_list_.reset();
01877 }
01878
01879 DDS::ReturnCode_t
01880 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01881 const DDS::Time_t & source_timestamp)
01882 {
01883 DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01884
01885 if (enabled_ == false) {
01886 ACE_ERROR_RETURN((LM_ERROR,
01887 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01888 ACE_TEXT(" Entity is not enabled. \n")),
01889 DDS::RETCODE_NOT_ENABLED);
01890 }
01891
01892 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01893
01894 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01895
01896 Message_Block_Ptr registered_sample_data;
01897 ret = this->data_container_->dispose(handle, registered_sample_data);
01898
01899 if (ret != DDS::RETCODE_OK) {
01900 ACE_ERROR_RETURN((LM_ERROR,
01901 ACE_TEXT("(%P|%t) ERROR: ")
01902 ACE_TEXT("DataWriterImpl::dispose: ")
01903 ACE_TEXT("dispose failed.\n")),
01904 ret);
01905 }
01906
01907 DataSampleElement* element = 0;
01908 ret = this->data_container_->obtain_buffer_for_control(element);
01909
01910 if (ret != DDS::RETCODE_OK) {
01911 ACE_ERROR_RETURN((LM_ERROR,
01912 ACE_TEXT("(%P|%t) ERROR: ")
01913 ACE_TEXT("DataWriterImpl::dispose: ")
01914 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01915 ret),
01916 ret);
01917 }
01918
01919 Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE,
01920 element->get_header(),
01921 move(registered_sample_data),
01922 source_timestamp));
01923 element->set_sample(move(sample));
01924 ret = this->data_container_->enqueue_control(element);
01925
01926 if (ret != DDS::RETCODE_OK) {
01927 ACE_ERROR_RETURN((LM_ERROR,
01928 ACE_TEXT("(%P|%t) ERROR: ")
01929 ACE_TEXT("DataWriterImpl::dispose: ")
01930 ACE_TEXT("enqueue_control failed.\n")),
01931 ret);
01932 }
01933
01934 send_all_to_flush_control(guard);
01935
01936 return DDS::RETCODE_OK;
01937 }
01938
01939 DDS::ReturnCode_t
01940 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01941 size_t& size)
01942 {
01943 return data_container_->num_samples(handle, size);
01944 }
01945
01946 void
01947 DataWriterImpl::unregister_all()
01948 {
01949 data_container_->unregister_all();
01950 }
01951
01952 RepoId
01953 DataWriterImpl::get_publication_id()
01954 {
01955 return publication_id_;
01956 }
01957
01958 RepoId
01959 DataWriterImpl::get_dp_id()
01960 {
01961 return dp_id_;
01962 }
01963
01964 char const *
01965 DataWriterImpl::get_type_name() const
01966 {
01967 return type_name_.in();
01968 }
01969
01970 ACE_Message_Block*
01971 DataWriterImpl::create_control_message(MessageId message_id,
01972 DataSampleHeader& header_data,
01973 Message_Block_Ptr data,
01974 const DDS::Time_t& source_timestamp)
01975 {
01976 header_data.message_id_ = message_id;
01977 header_data.byte_order_ =
01978 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01979 header_data.coherent_change_ = 0;
01980
01981 if (data) {
01982 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01983 }
01984
01985 header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01986 header_data.sequence_repair_ = false;
01987 header_data.source_timestamp_sec_ = source_timestamp.sec;
01988 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01989 header_data.publication_id_ = publication_id_;
01990
01991 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01992 if (!publisher) {
01993 return 0;
01994 }
01995
01996 header_data.publisher_id_ = publisher->publisher_id_;
01997
01998 if (message_id == INSTANCE_REGISTRATION
01999 || message_id == DISPOSE_INSTANCE
02000 || message_id == UNREGISTER_INSTANCE
02001 || message_id == DISPOSE_UNREGISTER_INSTANCE
02002 || message_id == REQUEST_ACK) {
02003
02004 header_data.sequence_repair_ = need_sequence_repair();
02005
02006
02007
02008 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02009 this->sequence_number_ = SequenceNumber();
02010
02011 } else {
02012 ++this->sequence_number_;
02013 }
02014
02015 header_data.sequence_ = this->sequence_number_;
02016 header_data.key_fields_only_ = true;
02017 }
02018
02019 ACE_Message_Block* message = 0;
02020 ACE_NEW_MALLOC_RETURN(message,
02021 static_cast<ACE_Message_Block*>(
02022 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02023 ACE_Message_Block(
02024 DataSampleHeader::max_marshaled_size(),
02025 ACE_Message_Block::MB_DATA,
02026 header_data.message_length_ ? data.release() : 0,
02027 0,
02028 0,
02029 get_db_lock(),
02030 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02031 ACE_Time_Value::zero,
02032 ACE_Time_Value::max_time,
02033 db_allocator_.get(),
02034 mb_allocator_.get()),
02035 0);
02036
02037 *message << header_data;
02038
02039
02040 if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02041 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02042
02043 RepoIdToReaderInfoMap::iterator reader;
02044
02045 for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02046 reader->second.expected_sequence_ = sequence_number_;
02047 }
02048 }
02049 if (DCPS_debug_level >= 4) {
02050 const GuidConverter converter(publication_id_);
02051 ACE_DEBUG((LM_DEBUG,
02052 ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02053 ACE_TEXT("from publication %C sending control sample: %C .\n"),
02054 OPENDDS_STRING(converter).c_str(),
02055 to_string(header_data).c_str()));
02056 }
02057 return message;
02058 }
02059
02060 DDS::ReturnCode_t
02061 DataWriterImpl::create_sample_data_message(Message_Block_Ptr data,
02062 DDS::InstanceHandle_t instance_handle,
02063 DataSampleHeader& header_data,
02064 Message_Block_Ptr& message,
02065 const DDS::Time_t& source_timestamp,
02066 bool content_filter)
02067 {
02068 PublicationInstance_rch instance =
02069 data_container_->get_handle_instance(instance_handle);
02070
02071 if (0 == instance) {
02072 ACE_ERROR_RETURN((LM_ERROR,
02073 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02074 ACE_TEXT("failed to find instance for handle %d\n"),
02075 instance_handle),
02076 DDS::RETCODE_ERROR);
02077 }
02078
02079 header_data.message_id_ = SAMPLE_DATA;
02080 header_data.byte_order_ =
02081 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02082 header_data.coherent_change_ = this->coherent_;
02083
02084 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02085
02086 if (!publisher)
02087 return DDS::RETCODE_ERROR;
02088
02089 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02090 header_data.group_coherent_ =
02091 publisher->qos_.presentation.access_scope
02092 == DDS::GROUP_PRESENTATION_QOS;
02093 #endif
02094 header_data.content_filter_ = content_filter;
02095 header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02096 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02097 header_data.sequence_repair_ = need_sequence_repair();
02098
02099 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02100 this->sequence_number_ = SequenceNumber();
02101
02102 } else {
02103 ++this->sequence_number_;
02104 }
02105
02106 header_data.sequence_ = this->sequence_number_;
02107 header_data.source_timestamp_sec_ = source_timestamp.sec;
02108 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02109
02110 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02111 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02112 header_data.lifespan_duration_ = true;
02113 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02114 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02115 }
02116
02117 header_data.publication_id_ = publication_id_;
02118 header_data.publisher_id_ = publisher->publisher_id_;
02119 size_t max_marshaled_size = header_data.max_marshaled_size();
02120
02121 ACE_Message_Block* tmp_message;
02122 ACE_NEW_MALLOC_RETURN(tmp_message,
02123 static_cast<ACE_Message_Block*>(
02124 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02125 ACE_Message_Block(max_marshaled_size,
02126 ACE_Message_Block::MB_DATA,
02127 data.release(),
02128 0,
02129 header_allocator_.get(),
02130 get_db_lock(),
02131 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02132 ACE_Time_Value::zero,
02133 ACE_Time_Value::max_time,
02134 db_allocator_.get(),
02135 mb_allocator_.get()),
02136 DDS::RETCODE_ERROR);
02137 message.reset(tmp_message);
02138 *message << header_data;
02139 if (DCPS_debug_level >= 4) {
02140 const GuidConverter converter(publication_id_);
02141 ACE_DEBUG((LM_DEBUG,
02142 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02143 ACE_TEXT("from publication %C sending data sample: %C .\n"),
02144 OPENDDS_STRING(converter).c_str(),
02145 to_string(header_data).c_str()));
02146 }
02147 return DDS::RETCODE_OK;
02148 }
02149
02150 void
02151 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02152 {
02153 DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02154
02155 if (!(sample->get_pub_id() == this->publication_id_)) {
02156 GuidConverter sample_converter(sample->get_pub_id());
02157 GuidConverter writer_converter(publication_id_);
02158 ACE_ERROR((LM_ERROR,
02159 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02160 ACE_TEXT(" The publication id %C from delivered element ")
02161 ACE_TEXT("does not match the datawriter's id %C\n"),
02162 OPENDDS_STRING(sample_converter).c_str(),
02163 OPENDDS_STRING(writer_converter).c_str()));
02164 return;
02165 }
02166
02167 ++data_delivered_count_;
02168
02169 this->data_container_->data_delivered(sample);
02170 }
02171
02172 void
02173 DataWriterImpl::control_delivered(const Message_Block_Ptr&)
02174 {
02175 DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02176 controlTracker.message_delivered();
02177 }
02178
02179 RcHandle<EntityImpl>
02180 DataWriterImpl::parent() const
02181 {
02182 return this->publisher_servant_.lock();
02183 }
02184
02185 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02186 bool
02187 DataWriterImpl::filter_out(const DataSampleElement& elt,
02188 const OPENDDS_STRING& filterClassName,
02189 const FilterEvaluator& evaluator,
02190 const DDS::StringSeq& expression_params) const
02191 {
02192 TypeSupportImpl* const typesupport =
02193 dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02194
02195 if (!typesupport) {
02196 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02197 return false;
02198 }
02199
02200 if (filterClassName == "DDSSQL" ||
02201 filterClassName == "OPENDDSSQL") {
02202 return !evaluator.eval(elt.get_sample()->cont(),
02203 elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02204 elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02205 expression_params);
02206 }
02207 else {
02208 return false;
02209 }
02210 }
02211 #endif
02212
02213 bool
02214 DataWriterImpl::check_transport_qos(const TransportInst&)
02215 {
02216
02217
02218 return true;
02219 }
02220
02221 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02222
02223 bool
02224 DataWriterImpl::coherent_changes_pending()
02225 {
02226 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02227 guard,
02228 get_lock(),
02229 false);
02230
02231 return this->coherent_;
02232 }
02233
02234 void
02235 DataWriterImpl::begin_coherent_changes()
02236 {
02237 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02238 guard,
02239 get_lock());
02240
02241 this->coherent_ = true;
02242 }
02243
02244 void
02245 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02246 {
02247
02248 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02249 guard,
02250 get_lock());
02251
02252 CoherentChangeControl end_msg;
02253 end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02254 end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02255
02256 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02257
02258 if (publisher) {
02259 end_msg.group_coherent_
02260 = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02261 }
02262
02263 if (publisher && end_msg.group_coherent_) {
02264 end_msg.publisher_id_ = publisher->publisher_id_;
02265 end_msg.group_coherent_samples_ = group_samples;
02266 }
02267
02268 size_t max_marshaled_size = end_msg.max_marshaled_size();
02269
02270 Message_Block_Ptr data( new ACE_Message_Block(max_marshaled_size,
02271 ACE_Message_Block::MB_DATA,
02272 0,
02273 0,
02274 0,
02275 get_db_lock()));
02276
02277 Serializer serializer(
02278 data.get(),
02279 this->swap_bytes());
02280
02281 serializer << end_msg;
02282
02283 DDS::Time_t source_timestamp =
02284 time_value_to_time(ACE_OS::gettimeofday());
02285
02286 DataSampleHeader header;
02287 Message_Block_Ptr control(
02288 create_control_message(END_COHERENT_CHANGES, header, move(data), source_timestamp));
02289
02290
02291 this->coherent_ = false;
02292 this->coherent_samples_ = 0;
02293
02294 guard.release();
02295 if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
02296 ACE_ERROR((LM_ERROR,
02297 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02298 ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02299 }
02300 }
02301
02302 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02303
02304 void
02305 DataWriterImpl::data_dropped(const DataSampleElement* element,
02306 bool dropped_by_transport)
02307 {
02308 DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02309
02310
02311 ++data_dropped_count_;
02312
02313 this->data_container_->data_dropped(element, dropped_by_transport);
02314 }
02315
02316 void
02317 DataWriterImpl::control_dropped(const Message_Block_Ptr&,
02318 bool )
02319 {
02320 DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02321 controlTracker.message_dropped();
02322 }
02323
02324 DDS::DataWriterListener_ptr
02325 DataWriterImpl::listener_for(DDS::StatusKind kind)
02326 {
02327
02328
02329
02330 RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
02331 if (!publisher)
02332 return 0;
02333
02334 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02335 return publisher->listener_for(kind);
02336
02337 } else {
02338 return DDS::DataWriterListener::_duplicate(listener_.in());
02339 }
02340 }
02341
02342 int
02343 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02344 const void * )
02345 {
02346 bool liveliness_lost = false;
02347
02348 ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02349
02350
02351 if (elapsed >= liveliness_check_interval_) {
02352 switch (this->qos_.liveliness.kind) {
02353 case DDS::AUTOMATIC_LIVELINESS_QOS:
02354 if (this->send_liveliness(tv) == false) {
02355 liveliness_lost = true;
02356 }
02357 break;
02358
02359 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02360 if (liveliness_asserted_) {
02361 if (this->send_liveliness(tv) == false) {
02362 liveliness_lost = true;
02363 }
02364 }
02365 break;
02366
02367 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02368
02369 break;
02370 }
02371 }
02372 else {
02373
02374 if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
02375 ACE_ERROR((LM_ERROR,
02376 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02377 ACE_TEXT("cancel_timer")));
02378 }
02379 if (reactor_->schedule_timer(liveness_timer_.in(), 0, liveliness_check_interval_ - elapsed,
02380 liveliness_check_interval_) == -1)
02381 {
02382 ACE_ERROR((LM_ERROR,
02383 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02384 ACE_TEXT("schedule_timer")));
02385 }
02386 return 0;
02387 }
02388
02389 liveliness_asserted_ = false;
02390 elapsed = tv - last_liveliness_activity_time_;
02391
02392
02393 if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02394 liveliness_lost = true;
02395 }
02396
02397 if (!this->liveliness_lost_ && liveliness_lost) {
02398 ++ this->liveliness_lost_status_.total_count;
02399 ++ this->liveliness_lost_status_.total_count_change;
02400
02401 DDS::DataWriterListener_var listener =
02402 listener_for(DDS::LIVELINESS_LOST_STATUS);
02403
02404 if (!CORBA::is_nil(listener.in())) {
02405 listener->on_liveliness_lost(this, this->liveliness_lost_status_);
02406 this->liveliness_lost_status_.total_count_change = 0;
02407 }
02408 }
02409
02410 this->liveliness_lost_ = liveliness_lost;
02411 return 0;
02412 }
02413
02414 bool
02415 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02416 {
02417 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02418 !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02419 DDS::Time_t t = time_value_to_time(now);
02420 DataSampleHeader header;
02421 Message_Block_Ptr empty;
02422 Message_Block_Ptr liveliness_msg(
02423 this->create_control_message(DATAWRITER_LIVELINESS, header, move(empty), t));
02424
02425 if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
02426 ACE_ERROR_RETURN((LM_ERROR,
02427 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02428 ACE_TEXT(" send_control failed. \n")),
02429 false);
02430
02431 } else {
02432 last_liveliness_activity_time_ = now;
02433 return true;
02434 }
02435 } else {
02436 last_liveliness_activity_time_ = now;
02437 return true;
02438 }
02439 }
02440
02441 void
02442 DataWriterImpl::prepare_to_delete()
02443 {
02444 this->set_deleted(true);
02445 this->stop_associating();
02446 }
02447
02448 PublicationInstance_rch
02449 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02450 {
02451
02452 if (0 != data_container_) {
02453 return data_container_->get_handle_instance(handle);
02454 }
02455
02456 return PublicationInstance_rch();
02457 }
02458
02459 void
02460 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02461 {
02462 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02463
02464 if (!is_bit_) {
02465
02466
02467 DataWriterListener_var the_listener =
02468 DataWriterListener::_narrow(this->listener_.in());
02469
02470 if (!CORBA::is_nil(the_listener.in())) {
02471 PublicationDisconnectedStatus status;
02472
02473
02474
02475 this->lookup_instance_handles(subids,
02476 status.subscription_handles);
02477 the_listener->on_publication_disconnected(this, status);
02478 }
02479 }
02480 }
02481
02482 void
02483 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02484 {
02485 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02486
02487 if (!is_bit_) {
02488
02489
02490
02491 DataWriterListener_var the_listener =
02492 DataWriterListener::_narrow(this->listener_.in());
02493
02494 if (!CORBA::is_nil(the_listener.in())) {
02495 PublicationDisconnectedStatus status;
02496
02497
02498 this->lookup_instance_handles(subids, status.subscription_handles);
02499
02500 the_listener->on_publication_reconnected(this, status);
02501 }
02502 }
02503 }
02504
02505 void
02506 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02507 {
02508 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02509
02510 if (!is_bit_) {
02511
02512
02513
02514 DataWriterListener_var the_listener =
02515 DataWriterListener::_narrow(this->listener_.in());
02516
02517 if (!CORBA::is_nil(the_listener.in())) {
02518 PublicationLostStatus status;
02519
02520
02521
02522 this->lookup_instance_handles(subids,
02523 status.subscription_handles);
02524 the_listener->on_publication_lost(this, status);
02525 }
02526 }
02527 }
02528
02529 void
02530 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02531 {
02532 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02533
02534 if (!is_bit_) {
02535
02536
02537
02538 DataWriterListener_var the_listener =
02539 DataWriterListener::_narrow(this->listener_.in());
02540
02541 if (!CORBA::is_nil(the_listener.in())) {
02542 PublicationLostStatus status;
02543
02544 CORBA::ULong len = handles.length();
02545 status.subscription_handles.length(len);
02546
02547 for (CORBA::ULong i = 0; i < len; ++ i) {
02548 status.subscription_handles[i] = handles[i];
02549 }
02550
02551 the_listener->on_publication_lost(this, status);
02552 }
02553 }
02554 }
02555
02556
02557 void
02558 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02559 DDS::InstanceHandleSeq & hdls)
02560 {
02561 CORBA::ULong const num_rds = ids.length();
02562 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563
02564 if (!participant)
02565 return;
02566
02567 if (DCPS_debug_level > 9) {
02568 OPENDDS_STRING separator;
02569 OPENDDS_STRING buffer;
02570
02571 for (CORBA::ULong i = 0; i < num_rds; ++i) {
02572 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02573 separator = ", ";
02574 }
02575
02576 ACE_DEBUG((LM_DEBUG,
02577 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02578 ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02579 buffer.c_str()));
02580 }
02581
02582 hdls.length(num_rds);
02583
02584 for (CORBA::ULong i = 0; i < num_rds; ++i) {
02585 hdls[i] = participant->id_to_handle(ids[i]);
02586 }
02587 }
02588
02589 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02590 bool
02591 DataWriterImpl::persist_data()
02592 {
02593 return this->data_container_->persist_data();
02594 }
02595 #endif
02596
02597 void
02598 DataWriterImpl::reschedule_deadline()
02599 {
02600 if (this->watchdog_.in()) {
02601 this->data_container_->reschedule_deadline();
02602 }
02603 }
02604
02605 void
02606 DataWriterImpl::wait_control_pending()
02607 {
02608 if (!TransportRegistry::instance()->released()) {
02609 OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02610 controlTracker.wait_messages_pending(caller_string);
02611 }
02612 }
02613
02614 void
02615 DataWriterImpl::wait_pending()
02616 {
02617 if (!TransportRegistry::instance()->released()) {
02618 data_container_->wait_pending();
02619 }
02620 }
02621
02622 void
02623 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02624 {
02625 this->data_container_->get_instance_handles(instance_handles);
02626 }
02627
02628 void
02629 DataWriterImpl::get_readers(RepoIdSet& readers)
02630 {
02631 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02632 readers = this->readers_;
02633 }
02634
02635 void
02636 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02637 {
02638 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02639 if (publisher) {
02640 publisher->get_qos(qos_data.pub_qos);
02641 }
02642 qos_data.dw_qos = this->qos_;
02643 qos_data.topic_name = this->topic_name_.in();
02644 }
02645
02646 #if defined(OPENDDS_SECURITY)
02647 DDS::Security::ParticipantCryptoHandle DataWriterImpl::get_crypto_handle() const
02648 {
02649 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
02650 return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
02651 }
02652 #endif
02653
02654 bool
02655 DataWriterImpl::need_sequence_repair()
02656 {
02657 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02658 return need_sequence_repair_i();
02659 }
02660
02661 bool
02662 DataWriterImpl::need_sequence_repair_i() const
02663 {
02664 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02665 end = reader_info_.end(); it != end; ++it) {
02666 if (it->second.expected_sequence_ != sequence_number_) {
02667 return true;
02668 }
02669 }
02670
02671 return false;
02672 }
02673
02674 SendControlStatus
02675 DataWriterImpl::send_control(const DataSampleHeader& header,
02676 Message_Block_Ptr msg)
02677 {
02678 controlTracker.message_sent();
02679
02680 SendControlStatus status = TransportClient::send_control(header, move(msg));
02681
02682 if (status != SEND_CONTROL_OK) {
02683 controlTracker.message_dropped();
02684 }
02685
02686 return status;
02687 }
02688
02689 int
02690 LivenessTimer::handle_timeout(const ACE_Time_Value &tv,
02691 const void *arg)
02692 {
02693 DataWriterImpl_rch writer = this->writer_.lock();
02694 if (writer) {
02695 writer->handle_timeout(tv, arg);
02696 }
02697 else {
02698 this->reactor()->cancel_timer(this);
02699 }
02700 return 0;
02701 }
02702
02703
02704 }
02705 }
02706
02707 OPENDDS_END_VERSIONED_NAMESPACE_DECL