00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042
00043 #include "tao/ORB_Core.h"
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046
00047 #include <stdexcept>
00048
00049 namespace OpenDDS {
00050 namespace DCPS {
00051
00052
00053
00054
00055
00056 DataWriterImpl::DataWriterImpl()
00057 : data_dropped_count_(0),
00058 data_delivered_count_(0),
00059 controlTracker("DataWriterImpl"),
00060 n_chunks_(TheServiceParticipant->n_chunks()),
00061 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00062 qos_(TheServiceParticipant->initial_DataWriterQos()),
00063 participant_servant_(0),
00064 topic_id_(GUID_UNKNOWN),
00065 topic_servant_(0),
00066 listener_mask_(DEFAULT_STATUS_MASK),
00067 domain_id_(0),
00068 publisher_servant_(0),
00069 publication_id_(GUID_UNKNOWN),
00070 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00071 coherent_(false),
00072 coherent_samples_(0),
00073 data_container_(0),
00074 liveliness_lost_(false),
00075 mb_allocator_(0),
00076 db_allocator_(0),
00077 header_allocator_(0),
00078 reactor_(0),
00079 liveliness_check_interval_(ACE_Time_Value::max_time),
00080 last_liveliness_activity_time_(ACE_Time_Value::zero),
00081 last_deadline_missed_total_count_(0),
00082 watchdog_(),
00083 cancel_timer_(false),
00084 is_bit_(false),
00085 initialized_(false),
00086 min_suspended_transaction_id_(0),
00087 max_suspended_transaction_id_(0),
00088 monitor_(0),
00089 periodic_monitor_(0),
00090 db_lock_pool_(0),
00091 liveliness_asserted_(false)
00092 {
00093 liveliness_lost_status_.total_count = 0;
00094 liveliness_lost_status_.total_count_change = 0;
00095
00096 offered_deadline_missed_status_.total_count = 0;
00097 offered_deadline_missed_status_.total_count_change = 0;
00098 offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00099
00100 offered_incompatible_qos_status_.total_count = 0;
00101 offered_incompatible_qos_status_.total_count_change = 0;
00102 offered_incompatible_qos_status_.last_policy_id = 0;
00103 offered_incompatible_qos_status_.policies.length(0);
00104
00105 publication_match_status_.total_count = 0;
00106 publication_match_status_.total_count_change = 0;
00107 publication_match_status_.current_count = 0;
00108 publication_match_status_.current_count_change = 0;
00109 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00110
00111 monitor_ =
00112 TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00113 periodic_monitor_ =
00114 TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00115
00116 db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_);
00117 }
00118
00119
00120
00121 DataWriterImpl::~DataWriterImpl()
00122 {
00123 DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00124
00125 if (initialized_) {
00126 delete data_container_;
00127 delete mb_allocator_;
00128 delete db_allocator_;
00129 delete header_allocator_;
00130 }
00131 delete db_lock_pool_;
00132 }
00133
00134
00135 void
00136 DataWriterImpl::cleanup()
00137 {
00138 if (cancel_timer_) {
00139
00140
00141 (void) reactor_->cancel_timer(this, 0);
00142 cancel_timer_ = false;
00143 }
00144
00145
00146 topic_objref_ = DDS::Topic::_nil();
00147 topic_servant_->remove_entity_ref();
00148 topic_servant_->_remove_ref();
00149 topic_servant_ = 0;
00150
00151 dw_local_objref_ = DDS::DataWriter::_nil();
00152 }
00153
00154 void
00155 DataWriterImpl::init(
00156 DDS::Topic_ptr topic,
00157 TopicImpl * topic_servant,
00158 const DDS::DataWriterQos & qos,
00159 DDS::DataWriterListener_ptr a_listener,
00160 const DDS::StatusMask & mask,
00161 OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00162 OpenDDS::DCPS::PublisherImpl * publisher_servant,
00163 DDS::DataWriter_ptr dw_local)
00164 {
00165 DBG_ENTRY_LVL("DataWriterImpl","init",6);
00166 topic_objref_ = DDS::Topic::_duplicate(topic);
00167 topic_servant_ = topic_servant;
00168 topic_servant_->_add_ref();
00169 topic_servant_->add_entity_ref();
00170 topic_name_ = topic_servant_->get_name();
00171 topic_id_ = topic_servant_->get_id();
00172 type_name_ = topic_servant_->get_type_name();
00173
00174 #if !defined (DDS_HAS_MINIMUM_BIT)
00175 is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00176 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00177 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00178 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00179 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00180
00181 qos_ = qos;
00182
00183
00184 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00185 listener_mask_ = mask;
00186
00187
00188
00189 participant_servant_ = participant_servant;
00190 domain_id_ = participant_servant_->get_domain_id();
00191
00192
00193
00194 publisher_servant_ = publisher_servant;
00195 dw_local_objref_ = DDS::DataWriter::_duplicate(dw_local);
00196
00197 this->reactor_ = TheServiceParticipant->timer();
00198
00199 initialized_ = true;
00200 }
00201
00202 DDS::InstanceHandle_t
00203 DataWriterImpl::get_instance_handle()
00204 {
00205 return this->participant_servant_->id_to_handle(publication_id_);
00206 }
00207
00208 DDS::InstanceHandle_t
00209 DataWriterImpl::get_next_handle()
00210 {
00211 return this->participant_servant_->id_to_handle(GUID_UNKNOWN);
00212 }
00213
00214 void
00215 DataWriterImpl::add_association(const RepoId& yourId,
00216 const ReaderAssociation& reader,
00217 bool active)
00218 {
00219 DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00220
00221 if (DCPS_debug_level) {
00222 GuidConverter writer_converter(yourId);
00223 GuidConverter reader_converter(reader.readerId);
00224 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00225 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00226 OPENDDS_STRING(writer_converter).c_str(),
00227 OPENDDS_STRING(reader_converter).c_str()));
00228 }
00229
00230 if (entity_deleted_.value()) {
00231 if (DCPS_debug_level)
00232 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00233 ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00234
00235 return;
00236 }
00237
00238 if (GUID_UNKNOWN == publication_id_) {
00239 publication_id_ = yourId;
00240 }
00241
00242 {
00243 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00244 reader_info_.insert(std::make_pair(reader.readerId,
00245 ReaderInfo(reader.filterClassName,
00246 TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00247 reader.exprParams, participant_servant_,
00248 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00249 }
00250
00251 if (DCPS_debug_level > 4) {
00252 GuidConverter converter(get_publication_id());
00253 ACE_DEBUG((LM_DEBUG,
00254 ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00255 ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00256 OPENDDS_STRING(converter).c_str(),
00257 qos_.transport_priority.value));
00258 }
00259
00260 AssociationData data;
00261 data.remote_id_ = reader.readerId;
00262 data.remote_data_ = reader.readerTransInfo;
00263 data.remote_reliable_ =
00264 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00265 data.remote_durable_ =
00266 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00267
00268 if (!associate(data, active)) {
00269
00270 if (DCPS_debug_level) {
00271 ACE_DEBUG((LM_ERROR,
00272 ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00273 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00274 }
00275 }
00276 }
00277
00278 void
00279 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00280 {
00281 DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00282
00283 if (!(flags & ASSOC_OK)) {
00284 if (DCPS_debug_level) {
00285 const GuidConverter conv(remote_id);
00286 ACE_DEBUG((LM_ERROR,
00287 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00288 ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00289 OPENDDS_STRING(conv).c_str()));
00290 }
00291
00292 return;
00293 }
00294 if (DCPS_debug_level) {
00295 const GuidConverter writer_conv(publication_id_);
00296 const GuidConverter conv(remote_id);
00297 ACE_DEBUG((LM_INFO,
00298 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00299 ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00300 OPENDDS_STRING(writer_conv).c_str(),
00301 OPENDDS_STRING(conv).c_str()));
00302 }
00303 if (flags & ASSOC_ACTIVE) {
00304
00305 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00306
00307
00308 if (assoc_complete_readers_.count(remote_id)) {
00309 if (DCPS_debug_level) {
00310 const GuidConverter writer_conv(publication_id_);
00311 const GuidConverter converter(remote_id);
00312 ACE_DEBUG((LM_DEBUG,
00313 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00314 ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00315 OPENDDS_STRING(writer_conv).c_str(),
00316 OPENDDS_STRING(converter).c_str()));
00317 }
00318 assoc_complete_readers_.erase(remote_id);
00319 association_complete_i(remote_id);
00320
00321
00322
00323
00324 } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00325 const GuidConverter converter(remote_id);
00326 ACE_ERROR((LM_ERROR,
00327 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00328 ACE_TEXT("failed to mark %C as pending.\n"),
00329 OPENDDS_STRING(converter).c_str()));
00330
00331 } else {
00332 if (DCPS_debug_level) {
00333 const GuidConverter converter(remote_id);
00334 ACE_DEBUG((LM_DEBUG,
00335 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00336 ACE_TEXT("marked %C as pending.\n"),
00337 OPENDDS_STRING(converter).c_str()));
00338 }
00339 }
00340
00341 } else {
00342
00343
00344 if (DCPS_debug_level) {
00345 const GuidConverter conv(publication_id_);
00346 ACE_DEBUG((LM_ERROR,
00347 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00348 ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00349 OPENDDS_STRING(conv).c_str()));
00350 }
00351 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00352 disco->association_complete(domain_id_, participant_servant_->get_id(),
00353 publication_id_, remote_id);
00354 }
00355 }
00356
00357 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00358 const char* filter,
00359 const DDS::StringSeq& params,
00360 DomainParticipantImpl* participant,
00361 bool durable)
00362 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00363 : participant_(participant)
00364 , filter_class_name_(filterClassName)
00365 , filter_(filter)
00366 , expression_params_(params)
00367 , eval_(*filter ? participant->get_filter_eval(filter) : 0)
00368 , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00369 , durable_(durable)
00370 {}
00371 #else
00372 : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00373 , durable_(durable)
00374 {
00375 ACE_UNUSED_ARG(filterClassName);
00376 ACE_UNUSED_ARG(filter);
00377 ACE_UNUSED_ARG(params);
00378 ACE_UNUSED_ARG(participant);
00379 }
00380 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00381
00382 DataWriterImpl::ReaderInfo::~ReaderInfo()
00383 {
00384 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00385 eval_ = RcHandle<FilterEvaluator>();
00386
00387 if (!filter_.empty()) {
00388 participant_->deref_filter_eval(filter_.c_str());
00389 }
00390
00391 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00392 }
00393
00394 void
00395 DataWriterImpl::association_complete(const RepoId& remote_id)
00396 {
00397 DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00398
00399 if (DCPS_debug_level >= 1) {
00400 GuidConverter writer_converter(this->publication_id_);
00401 GuidConverter reader_converter(remote_id);
00402 ACE_DEBUG((LM_DEBUG,
00403 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00404 ACE_TEXT("bit %d local %C remote %C\n"),
00405 is_bit_,
00406 OPENDDS_STRING(writer_converter).c_str(),
00407 OPENDDS_STRING(reader_converter).c_str()));
00408 }
00409
00410 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00411
00412 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00413 if (DCPS_debug_level) {
00414 GuidConverter writer_converter(this->publication_id_);
00415 GuidConverter reader_converter(remote_id);
00416 ACE_DEBUG((LM_DEBUG,
00417 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00418 ACE_TEXT("bit %d local %C did not find pending reader: %C")
00419 ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00420 is_bit_,
00421 OPENDDS_STRING(writer_converter).c_str(),
00422 OPENDDS_STRING(reader_converter).c_str()));
00423 }
00424
00425
00426 assoc_complete_readers_.insert(remote_id);
00427
00428 } else {
00429 association_complete_i(remote_id);
00430 }
00431 }
00432
00433 void
00434 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00435 {
00436 DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00437
00438 if (DCPS_debug_level >= 1) {
00439 GuidConverter writer_converter(this->publication_id_);
00440 GuidConverter reader_converter(remote_id);
00441 ACE_DEBUG((LM_DEBUG,
00442 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00443 ACE_TEXT("bit %d local %C remote %C\n"),
00444 is_bit_,
00445 OPENDDS_STRING(writer_converter).c_str(),
00446 OPENDDS_STRING(reader_converter).c_str()));
00447 }
00448
00449 bool reader_durable = false;
00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00451 OPENDDS_STRING filterClassName;
00452 RcHandle<FilterEvaluator> eval;
00453 DDS::StringSeq expression_params;
00454 #endif
00455 {
00456 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00457
00458 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00459 GuidConverter converter(remote_id);
00460 ACE_ERROR((LM_ERROR,
00461 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00462 ACE_TEXT("insert %C from pending failed.\n"),
00463 OPENDDS_STRING(converter).c_str()));
00464 }
00465 }
00466 {
00467 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00468 RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00469
00470 if (it != reader_info_.end()) {
00471 reader_durable = it->second.durable_;
00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00473 filterClassName = it->second.filter_class_name_;
00474 eval = it->second.eval_;
00475 expression_params = it->second.expression_params_;
00476 #endif
00477 }
00478 }
00479
00480 if (this->monitor_) {
00481 this->monitor_->report();
00482 }
00483
00484 if (!is_bit_) {
00485
00486 DDS::InstanceHandle_t handle =
00487 this->participant_servant_->id_to_handle(remote_id);
00488
00489 {
00490
00491 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00492
00493
00494 ++publication_match_status_.total_count;
00495 ++publication_match_status_.total_count_change;
00496 ++publication_match_status_.current_count;
00497 ++publication_match_status_.current_count_change;
00498
00499 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00500 GuidConverter converter(remote_id);
00501 ACE_DEBUG((LM_WARNING,
00502 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00503 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00504 OPENDDS_STRING(converter).c_str(),
00505 handle));
00506 return;
00507
00508 } else if (DCPS_debug_level > 4) {
00509 GuidConverter converter(remote_id);
00510 ACE_DEBUG((LM_DEBUG,
00511 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00512 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00513 OPENDDS_STRING(converter).c_str(),
00514 handle));
00515 }
00516
00517 publication_match_status_.last_subscription_handle = handle;
00518
00519 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00520 }
00521
00522 DDS::DataWriterListener_var listener =
00523 listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00524
00525 if (!CORBA::is_nil(listener.in())) {
00526
00527 listener->on_publication_matched(dw_local_objref_.in(),
00528 publication_match_status_);
00529
00530
00531
00532 publication_match_status_.total_count_change = 0;
00533 publication_match_status_.current_count_change = 0;
00534 }
00535
00536 notify_status_condition();
00537 }
00538
00539
00540 if (reader_durable) {
00541
00542
00543 this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00545 , filterClassName, eval.in(), expression_params
00546 #endif
00547 );
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00558 guard,
00559 this->get_lock());
00560
00561 SendStateDataSampleList list = this->get_resend_data();
00562 {
00563 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00564
00565 SequenceNumber& seq =
00566 reader_info_.find(remote_id)->second.expected_sequence_;
00567
00568 for (SendStateDataSampleList::iterator list_el = list.begin();
00569 list_el != list.end(); ++list_el) {
00570 list_el->get_header().historic_sample_ = true;
00571
00572 if (list_el->get_header().sequence_ > seq) {
00573 seq = list_el->get_header().sequence_;
00574 }
00575 }
00576 }
00577
00578 if (this->publisher_servant_->is_suspended()) {
00579 this->available_data_list_.enqueue_tail(list);
00580
00581 } else {
00582 if (DCPS_debug_level >= 4) {
00583 ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00584 }
00585
00586 size_t size = 0, padding = 0;
00587 gen_find_size(remote_id, size, padding);
00588 ACE_Message_Block* const data =
00589 new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00590 get_db_lock());
00591 Serializer ser(data);
00592 ser << remote_id;
00593
00594 const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00595 DataSampleHeader header;
00596 ACE_Message_Block* const end_historic_samples =
00597 create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp);
00598
00599 this->controlTracker.message_sent();
00600 guard.release();
00601 SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id);
00602 if (ret == SEND_CONTROL_ERROR) {
00603 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00604 ACE_TEXT("DataWriterImpl::association_complete_i: ")
00605 ACE_TEXT("send_w_control failed.\n")));
00606 this->controlTracker.message_dropped();
00607 }
00608 }
00609 }
00610 }
00611
00612 void
00613 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00614 CORBA::Boolean notify_lost)
00615 {
00616 if (readers.length() == 0) {
00617 return;
00618 }
00619
00620 if (DCPS_debug_level >= 1) {
00621 GuidConverter writer_converter(publication_id_);
00622 GuidConverter reader_converter(readers[0]);
00623 ACE_DEBUG((LM_DEBUG,
00624 ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00625 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00626 is_bit_,
00627 OPENDDS_STRING(writer_converter).c_str(),
00628 OPENDDS_STRING(reader_converter).c_str(),
00629 readers.length()));
00630 }
00631
00632
00633 this->stop_associating(readers.get_buffer(), readers.length());
00634
00635 ReaderIdSeq fully_associated_readers;
00636 CORBA::ULong fully_associated_len = 0;
00637 ReaderIdSeq rds;
00638 CORBA::ULong rds_len = 0;
00639 DDS::InstanceHandleSeq handles;
00640
00641 ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00642 {
00643
00644 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00645
00646
00647
00648
00649
00650 CORBA::ULong len = readers.length();
00651
00652 for (CORBA::ULong i = 0; i < len; ++i) {
00653
00654
00655
00656
00657 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00658 ++ fully_associated_len;
00659 fully_associated_readers.length(fully_associated_len);
00660 fully_associated_readers [fully_associated_len - 1] = readers[i];
00661
00662 ++ rds_len;
00663 rds.length(rds_len);
00664 rds [rds_len - 1] = readers[i];
00665
00666 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00667 ++ rds_len;
00668 rds.length(rds_len);
00669 rds [rds_len - 1] = readers[i];
00670
00671 GuidConverter converter(readers[i]);
00672 ACE_DEBUG((LM_WARNING,
00673 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00674 ACE_TEXT("removing reader %C before association_complete() call.\n"),
00675 OPENDDS_STRING(converter).c_str()));
00676 }
00677
00678 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00679 reader_info_.erase(readers[i]);
00680
00681
00682 }
00683
00684 if (fully_associated_len > 0 && !is_bit_) {
00685
00686
00687 if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00688 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::remove_associations: "
00689 "lookup_instance_handles failed, notify %d \n", notify_lost));
00690
00691 return;
00692 }
00693
00694 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00695 id_to_handle_map_.erase(fully_associated_readers[i]);
00696 }
00697 }
00698
00699
00700
00701 if (!this->is_bit_) {
00702
00703
00704 int matchedSubscriptions =
00705 static_cast<int>(this->id_to_handle_map_.size());
00706 this->publication_match_status_.current_count_change =
00707 matchedSubscriptions - this->publication_match_status_.current_count;
00708
00709
00710 if (this->publication_match_status_.current_count_change != 0) {
00711 this->publication_match_status_.current_count = matchedSubscriptions;
00712
00713
00714
00715
00716 this->publication_match_status_.last_subscription_handle =
00717 handles[fully_associated_len - 1];
00718
00719 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00720
00721 DDS::DataWriterListener_var listener =
00722 this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00723
00724 if (!CORBA::is_nil(listener.in())) {
00725 listener->on_publication_matched(
00726 this->dw_local_objref_.in(),
00727 this->publication_match_status_);
00728
00729
00730 this->publication_match_status_.total_count_change = 0;
00731 this->publication_match_status_.current_count_change = 0;
00732 }
00733
00734 this->notify_status_condition();
00735 }
00736 }
00737 }
00738
00739 for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00740 this->disassociate(rds[i]);
00741 }
00742
00743
00744
00745
00746 if (notify_lost && handles.length() > 0) {
00747 this->notify_publication_lost(handles);
00748 }
00749 }
00750
00751 void DataWriterImpl::remove_all_associations()
00752 {
00753 DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00754
00755 this->stop_associating();
00756
00757 OpenDDS::DCPS::ReaderIdSeq readers;
00758 CORBA::ULong size;
00759 CORBA::ULong num_pending_readers;
00760 {
00761 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00762
00763 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00764 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00765 readers.length(size);
00766
00767 RepoIdSet::iterator itEnd = readers_.end();
00768 int i = 0;
00769
00770 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00771 readers[i ++] = *it;
00772 }
00773
00774 itEnd = pending_readers_.end();
00775
00776 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00777 readers[i ++] = *it;
00778 }
00779
00780 if (num_pending_readers > 0) {
00781 ACE_DEBUG((LM_WARNING,
00782 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00783 ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00784 num_pending_readers));
00785 }
00786 }
00787
00788 try {
00789 if (0 < size) {
00790 CORBA::Boolean dont_notify_lost = false;
00791
00792 this->remove_associations(readers, dont_notify_lost);
00793 }
00794
00795 } catch (const CORBA::Exception&) {
00796 ACE_DEBUG((LM_WARNING,
00797 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00798 ACE_TEXT("caught exception from remove_associations.\n")));
00799 }
00800 }
00801
00802 void
00803 DataWriterImpl::register_for_reader(const RepoId& participant,
00804 const RepoId& writerid,
00805 const RepoId& readerid,
00806 const TransportLocatorSeq& locators,
00807 DiscoveryListener* listener)
00808 {
00809 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00810 }
00811
00812 void
00813 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00814 const RepoId& writerid,
00815 const RepoId& readerid)
00816 {
00817 TransportClient::unregister_for_reader(participant, writerid, readerid);
00818 }
00819
00820 void
00821 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00822 {
00823 DDS::DataWriterListener_var listener =
00824 listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00825
00826 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00827
00828 #if 0
00829
00830 if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00831
00832 return;
00833 }
00834
00835 #endif
00836
00837 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00838
00839
00840 offered_incompatible_qos_status_.total_count = status.total_count;
00841 offered_incompatible_qos_status_.total_count_change +=
00842 status.count_since_last_send;
00843 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00844 offered_incompatible_qos_status_.policies = status.policies;
00845
00846 if (!CORBA::is_nil(listener.in())) {
00847 listener->on_offered_incompatible_qos(dw_local_objref_.in(),
00848 offered_incompatible_qos_status_);
00849
00850
00851
00852 offered_incompatible_qos_status_.total_count_change = 0;
00853 }
00854
00855 notify_status_condition();
00856 }
00857
00858 void
00859 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00860 const DDS::StringSeq& params)
00861 {
00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00863 ACE_UNUSED_ARG(readerId);
00864 ACE_UNUSED_ARG(params);
00865 #else
00866 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00867 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00868 RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00869
00870 if (iter != reader_info_.end()) {
00871 iter->second.expression_params_ = params;
00872
00873 } else if (DCPS_debug_level > 4 &&
00874 TheServiceParticipant->publisher_content_filter()) {
00875 GuidConverter pubConv(this->publication_id_), subConv(readerId);
00876 ACE_DEBUG((LM_WARNING,
00877 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00878 ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00879 OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00880 }
00881
00882 #endif
00883 }
00884
00885 void
00886 DataWriterImpl::inconsistent_topic()
00887 {
00888 topic_servant_->inconsistent_topic();
00889 }
00890
00891 DDS::ReturnCode_t
00892 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00893 {
00894
00895 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00896 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00897 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00898 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00899 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00900
00901 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00902 if (qos_ == qos)
00903 return DDS::RETCODE_OK;
00904
00905 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00906 return DDS::RETCODE_IMMUTABLE_POLICY;
00907
00908 } else {
00909 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00910 DDS::PublisherQos publisherQos;
00911 this->publisher_servant_->get_qos(publisherQos);
00912 const bool status
00913 = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00914 this->participant_servant_->get_id(),
00915 this->publication_id_,
00916 qos,
00917 publisherQos);
00918
00919 if (!status) {
00920 ACE_ERROR_RETURN((LM_ERROR,
00921 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00922 ACE_TEXT("qos not updated. \n")),
00923 DDS::RETCODE_ERROR);
00924 }
00925 }
00926
00927 if (!(qos_ == qos)) {
00928
00929 if (qos_.deadline.period.sec != qos.deadline.period.sec
00930 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00931 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00932 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00933 this->watchdog_ =
00934 new OfferedDeadlineWatchdog(
00935 this->lock_,
00936 qos.deadline,
00937 this,
00938 this->dw_local_objref_.in(),
00939 this->offered_deadline_missed_status_,
00940 this->last_deadline_missed_total_count_);
00941
00942 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00943 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00944 this->watchdog_->cancel_all();
00945 this->watchdog_->destroy();
00946 this->watchdog_ = 0;
00947
00948 } else {
00949 this->watchdog_->reset_interval(
00950 duration_to_time_value(qos.deadline.period));
00951 }
00952 }
00953
00954 qos_ = qos;
00955 }
00956
00957 return DDS::RETCODE_OK;
00958
00959 } else {
00960 return DDS::RETCODE_INCONSISTENT_POLICY;
00961 }
00962 }
00963
00964 DDS::ReturnCode_t
00965 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00966 {
00967 qos = qos_;
00968 return DDS::RETCODE_OK;
00969 }
00970
00971 DDS::ReturnCode_t
00972 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00973 DDS::StatusMask mask)
00974 {
00975 listener_mask_ = mask;
00976
00977 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00978 return DDS::RETCODE_OK;
00979 }
00980
00981 DDS::DataWriterListener_ptr
00982 DataWriterImpl::get_listener()
00983 {
00984 return DDS::DataWriterListener::_duplicate(listener_.in());
00985 }
00986
00987 DDS::Topic_ptr
00988 DataWriterImpl::get_topic()
00989 {
00990 return DDS::Topic::_duplicate(topic_objref_.in());
00991 }
00992
00993 bool
00994 DataWriterImpl::should_ack() const
00995 {
00996
00997
00998
00999 return this->readers_.size() != 0;
01000 }
01001
01002 DataWriterImpl::AckToken
01003 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
01004 {
01005 if (DCPS_debug_level > 0) {
01006 ACE_DEBUG((LM_DEBUG,
01007 ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
01008 ACE_TEXT("for sequence %q \n"),
01009 this->sequence_number_.getValue()));
01010 }
01011 return AckToken(max_wait, this->sequence_number_);
01012 }
01013
01014 DDS::ReturnCode_t
01015 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01016 {
01017 if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01018 return DDS::RETCODE_OK;
01019 DataWriterImpl::AckToken token = create_ack_token(max_wait);
01020 if (DCPS_debug_level) {
01021 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01022 ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01023 token.sequence_.getValue()));
01024 }
01025 return wait_for_specific_ack(token);
01026 }
01027
01028 DDS::ReturnCode_t
01029 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01030 {
01031 return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01032 }
01033
01034 DDS::Publisher_ptr
01035 DataWriterImpl::get_publisher()
01036 {
01037 return DDS::Publisher::_duplicate(publisher_servant_);
01038 }
01039
01040 DDS::ReturnCode_t
01041 DataWriterImpl::get_liveliness_lost_status(
01042 DDS::LivelinessLostStatus & status)
01043 {
01044 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01045 guard,
01046 this->lock_,
01047 DDS::RETCODE_ERROR);
01048 set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01049 status = liveliness_lost_status_;
01050 liveliness_lost_status_.total_count_change = 0;
01051 return DDS::RETCODE_OK;
01052 }
01053
01054 DDS::ReturnCode_t
01055 DataWriterImpl::get_offered_deadline_missed_status(
01056 DDS::OfferedDeadlineMissedStatus & status)
01057 {
01058 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01059 guard,
01060 this->lock_,
01061 DDS::RETCODE_ERROR);
01062
01063 set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01064
01065 this->offered_deadline_missed_status_.total_count_change =
01066 this->offered_deadline_missed_status_.total_count
01067 - this->last_deadline_missed_total_count_;
01068
01069
01070 this->last_deadline_missed_total_count_ =
01071 this->offered_deadline_missed_status_.total_count;
01072
01073 status = offered_deadline_missed_status_;
01074
01075 this->offered_deadline_missed_status_.total_count_change = 0;
01076
01077 return DDS::RETCODE_OK;
01078 }
01079
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_incompatible_qos_status(
01082 DDS::OfferedIncompatibleQosStatus & status)
01083 {
01084 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085 guard,
01086 this->lock_,
01087 DDS::RETCODE_ERROR);
01088 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01089 status = offered_incompatible_qos_status_;
01090 offered_incompatible_qos_status_.total_count_change = 0;
01091 return DDS::RETCODE_OK;
01092 }
01093
01094 DDS::ReturnCode_t
01095 DataWriterImpl::get_publication_matched_status(
01096 DDS::PublicationMatchedStatus & status)
01097 {
01098 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01099 guard,
01100 this->lock_,
01101 DDS::RETCODE_ERROR);
01102 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01103 status = publication_match_status_;
01104 publication_match_status_.total_count_change = 0;
01105 publication_match_status_.current_count_change = 0;
01106 return DDS::RETCODE_OK;
01107 }
01108
01109 DDS::ReturnCode_t
01110 DataWriterImpl::assert_liveliness()
01111 {
01112 switch (this->qos_.liveliness.kind) {
01113 case DDS::AUTOMATIC_LIVELINESS_QOS:
01114
01115 break;
01116 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01117 return participant_servant_->assert_liveliness();
01118 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01119 if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01120 return DDS::RETCODE_ERROR;
01121 }
01122 break;
01123 }
01124
01125 return DDS::RETCODE_OK;
01126 }
01127
01128 DDS::ReturnCode_t
01129 DataWriterImpl::assert_liveliness_by_participant()
01130 {
01131
01132
01133 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01134
01135 liveliness_asserted_ = true;
01136 }
01137
01138 return DDS::RETCODE_OK;
01139 }
01140
01141 ACE_Time_Value
01142 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01143 {
01144 if (this->qos_.liveliness.kind == kind) {
01145 return liveliness_check_interval_;
01146 } else {
01147 return ACE_Time_Value::max_time;
01148 }
01149 }
01150
01151 bool
01152 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01153 {
01154 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01155 return last_liveliness_activity_time_ > tv;
01156 } else {
01157 return false;
01158 }
01159 }
01160
01161 DDS::ReturnCode_t
01162 DataWriterImpl::get_matched_subscriptions(
01163 DDS::InstanceHandleSeq & subscription_handles)
01164 {
01165 if (enabled_ == false) {
01166 ACE_ERROR_RETURN((LM_ERROR,
01167 ACE_TEXT("(%P|%t) ERROR: ")
01168 ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01169 ACE_TEXT(" Entity is not enabled. \n")),
01170 DDS::RETCODE_NOT_ENABLED);
01171 }
01172
01173 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01174 guard,
01175 this->lock_,
01176 DDS::RETCODE_ERROR);
01177
01178
01179 int index = 0;
01180 subscription_handles.length(
01181 static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01182
01183 for (RepoIdToHandleMap::iterator
01184 current = this->id_to_handle_map_.begin();
01185 current != this->id_to_handle_map_.end();
01186 ++current, ++index) {
01187 subscription_handles[index] = current->second;
01188 }
01189
01190 return DDS::RETCODE_OK;
01191 }
01192
01193 #if !defined (DDS_HAS_MINIMUM_BIT)
01194 DDS::ReturnCode_t
01195 DataWriterImpl::get_matched_subscription_data(
01196 DDS::SubscriptionBuiltinTopicData & subscription_data,
01197 DDS::InstanceHandle_t subscription_handle)
01198 {
01199 if (enabled_ == false) {
01200 ACE_ERROR_RETURN((LM_ERROR,
01201 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01202 ACE_TEXT("get_matched_subscription_data: ")
01203 ACE_TEXT("Entity is not enabled. \n")),
01204 DDS::RETCODE_NOT_ENABLED);
01205 }
01206
01207 BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader,
01208 DDS::SubscriptionBuiltinTopicDataDataReader_var,
01209 DDS::SubscriptionBuiltinTopicDataSeq > hh;
01210
01211 DDS::SubscriptionBuiltinTopicDataSeq data;
01212
01213 DDS::ReturnCode_t ret =
01214 hh.instance_handle_to_bit_data(participant_servant_,
01215 BUILT_IN_SUBSCRIPTION_TOPIC,
01216 subscription_handle,
01217 data);
01218
01219 if (ret == DDS::RETCODE_OK) {
01220 subscription_data = data[0];
01221 }
01222
01223 return ret;
01224 }
01225 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01226
01227 DDS::ReturnCode_t
01228 DataWriterImpl::enable()
01229 {
01230
01231
01232
01233
01234
01235
01236 if (this->is_enabled()) {
01237 return DDS::RETCODE_OK;
01238 }
01239
01240 if (this->publisher_servant_->is_enabled() == false) {
01241 return DDS::RETCODE_PRECONDITION_NOT_MET;
01242 }
01243
01244
01245
01246
01247
01248
01249
01250 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01251
01252 CORBA::Long const depth =
01253 get_instance_sample_list_depth(
01254 qos_.history.kind,
01255 qos_.history.depth,
01256 qos_.resource_limits.max_samples_per_instance);
01257
01258 bool resource_blocking = false;
01259
01260 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01261 n_chunks_ = qos_.resource_limits.max_samples;
01262
01263 if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) {
01264 resource_blocking = true;
01265
01266 } else {
01267 resource_blocking =
01268 (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01269 || (qos_.resource_limits.max_samples <
01270 (qos_.resource_limits.max_instances * depth));
01271 }
01272 }
01273
01274
01275
01276
01277 this->enable_specific();
01278
01279 CORBA::Long max_instances = 0;
01280
01281 if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01282 max_instances = qos_.resource_limits.max_instances;
01283
01284 CORBA::Long max_total_samples = 0;
01285
01286 if (reliable && resource_blocking)
01287 max_total_samples = qos_.resource_limits.max_samples;
01288
01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01290
01291
01292 DataDurabilityCache* const durability_cache =
01293 TheServiceParticipant->get_data_durability_cache(qos_.durability);
01294 #endif
01295
01296
01297
01298 data_container_ = new WriteDataContainer(this,
01299 depth,
01300 qos_.reliability.max_blocking_time,
01301 n_chunks_,
01302 domain_id_,
01303 get_topic_name(),
01304 get_type_name(),
01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01306 durability_cache,
01307 qos_.durability_service,
01308 #endif
01309 max_instances,
01310 max_total_samples);
01311
01312
01313
01314 mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_);
01315 db_allocator_ = new DataBlockAllocator(n_chunks_+1);
01316 header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1);
01317
01318 if (DCPS_debug_level >= 2) {
01319 ACE_DEBUG((LM_DEBUG,
01320 "(%P|%t) DataWriterImpl::enable-mb"
01321 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01322 mb_allocator_,
01323 n_chunks_));
01324
01325 ACE_DEBUG((LM_DEBUG,
01326 "(%P|%t) DataWriterImpl::enable-db"
01327 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01328 db_allocator_,
01329 n_chunks_));
01330
01331 ACE_DEBUG((LM_DEBUG,
01332 "(%P|%t) DataWriterImpl::enable-header"
01333 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01334 header_allocator_,
01335 n_chunks_));
01336 }
01337
01338 if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01339 qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01340 liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01341 liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01342
01343 if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01344 liveliness_check_interval_ = ACE_Time_Value (0, 1);
01345 }
01346
01347 last_liveliness_check_time_ = ACE_OS::gettimeofday();
01348
01349 if (reactor_->schedule_timer(this,
01350 0,
01351 liveliness_check_interval_,
01352 liveliness_check_interval_) == -1) {
01353 ACE_ERROR((LM_ERROR,
01354 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01355 ACE_TEXT("schedule_timer")));
01356
01357 } else {
01358 cancel_timer_ = true;
01359 this->_add_ref();
01360 }
01361 }
01362
01363 participant_servant_->add_adjust_liveliness_timers(this);
01364
01365
01366
01367 DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01368
01369 if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01370 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01371 this->watchdog_ = new OfferedDeadlineWatchdog(
01372 this->lock_,
01373 this->qos_.deadline,
01374 this,
01375 this->dw_local_objref_.in(),
01376 this->offered_deadline_missed_status_,
01377 this->last_deadline_missed_total_count_);
01378 }
01379
01380 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01381 disco->pre_writer(this);
01382
01383 this->set_enabled();
01384
01385 try {
01386 this->enable_transport(reliable,
01387 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01388
01389 } catch (const Transport::Exception&) {
01390 ACE_ERROR((LM_ERROR,
01391 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01392 ACE_TEXT("Transport Exception.\n")));
01393 return DDS::RETCODE_ERROR;
01394
01395 }
01396
01397 const TransportLocatorSeq& trans_conf_info = connection_info();
01398
01399 DDS::PublisherQos pub_qos;
01400 this->publisher_servant_->get_qos(pub_qos);
01401
01402 this->publication_id_ =
01403 disco->add_publication(this->domain_id_,
01404 this->participant_servant_->get_id(),
01405 this->topic_servant_->get_id(),
01406 this,
01407 this->qos_,
01408 trans_conf_info,
01409 pub_qos);
01410
01411 if (this->publication_id_ == GUID_UNKNOWN) {
01412 ACE_ERROR((LM_ERROR,
01413 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01414 ACE_TEXT("add_publication returned invalid id. \n")));
01415 return DDS::RETCODE_ERROR;
01416 }
01417
01418 this->data_container_->publication_id_ = this->publication_id_;
01419
01420 const DDS::ReturnCode_t writer_enabled_result =
01421 publisher_servant_->writer_enabled(topic_name_.in(), this);
01422
01423 if (this->monitor_) {
01424 this->monitor_->report();
01425 }
01426
01427 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01428
01429
01430
01431 if (durability_cache != 0) {
01432
01433 if (!durability_cache->get_data(this->domain_id_,
01434 get_topic_name(),
01435 get_type_name(),
01436 this,
01437 this->mb_allocator_,
01438 this->db_allocator_,
01439 this->qos_.lifespan)) {
01440 ACE_ERROR((LM_ERROR,
01441 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01442 ACE_TEXT("unable to retrieve durable data\n")));
01443 }
01444 }
01445
01446 #endif
01447
01448 return writer_enabled_result;
01449 }
01450
01451 void
01452 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01453 {
01454 DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01455
01456 SendStateDataSampleList list;
01457
01458 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01459
01460 controlTracker.message_sent();
01461
01462
01463 guard.release();
01464
01465 this->send(list, transaction_id);
01466 }
01467
01468 DDS::ReturnCode_t
01469 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01470 DataSample* data,
01471 const DDS::Time_t& source_timestamp)
01472 {
01473 DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01474
01475 if (enabled_ == false) {
01476 ACE_ERROR_RETURN((LM_ERROR,
01477 ACE_TEXT("(%P|%t) ERROR: ")
01478 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01479 ACE_TEXT(" Entity is not enabled. \n")),
01480 DDS::RETCODE_NOT_ENABLED);
01481 }
01482
01483 DDS::ReturnCode_t ret =
01484 this->data_container_->register_instance(handle, data);
01485
01486 if (ret != DDS::RETCODE_OK) {
01487 ACE_ERROR_RETURN((LM_ERROR,
01488 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01489 ACE_TEXT("register instance with container failed.\n")),
01490 ret);
01491 }
01492
01493 if (this->monitor_) {
01494 this->monitor_->report();
01495 }
01496
01497 DataSampleElement* element = 0;
01498 ret = this->data_container_->obtain_buffer_for_control(element);
01499
01500 if (ret != DDS::RETCODE_OK) {
01501 ACE_ERROR_RETURN((LM_ERROR,
01502 ACE_TEXT("(%P|%t) ERROR: ")
01503 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01504 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01505 ret),
01506 ret);
01507 }
01508
01509
01510 element->set_sample(create_control_message(INSTANCE_REGISTRATION,
01511 element->get_header(),
01512 data,
01513 source_timestamp));
01514
01515 ret = this->data_container_->enqueue_control(element);
01516
01517 if (ret != DDS::RETCODE_OK) {
01518 ACE_ERROR_RETURN((LM_ERROR,
01519 ACE_TEXT("(%P|%t) ERROR: ")
01520 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01521 ACE_TEXT("enqueue_control failed.\n")),
01522 ret);
01523 }
01524
01525 return ret;
01526 }
01527
01528 DDS::ReturnCode_t
01529 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01530 DataSample* data,
01531 const DDS::Time_t & source_timestamp)
01532 {
01533 DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01534
01535 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01536 guard,
01537 get_lock(),
01538 ::DDS::RETCODE_ERROR);
01539
01540 DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp);
01541 if (ret != DDS::RETCODE_OK) {
01542 ACE_ERROR_RETURN((LM_ERROR,
01543 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01544 ACE_TEXT("register instance with container failed.\n")),
01545 ret);
01546 }
01547
01548 send_all_to_flush_control(guard);
01549
01550 return ret;
01551 }
01552
01553 DDS::ReturnCode_t
01554 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01555 const DDS::Time_t& source_timestamp)
01556 {
01557 DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01558
01559 if (enabled_ == false) {
01560 ACE_ERROR_RETURN((LM_ERROR,
01561 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01562 ACE_TEXT(" Entity is not enabled.\n")),
01563 DDS::RETCODE_NOT_ENABLED);
01564 }
01565
01566
01567
01568 if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01569 return this->dispose_and_unregister(handle, source_timestamp);
01570 }
01571
01572 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01573 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01574 DataSample* unregistered_sample_data = 0;
01575 ret = this->data_container_->unregister(handle, unregistered_sample_data);
01576
01577 if (ret != DDS::RETCODE_OK) {
01578 ACE_ERROR_RETURN((LM_ERROR,
01579 ACE_TEXT("(%P|%t) ERROR: ")
01580 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01581 ACE_TEXT(" unregister with container failed. \n")),
01582 ret);
01583 }
01584
01585 DataSampleElement* element = 0;
01586 ret = this->data_container_->obtain_buffer_for_control(element);
01587
01588 if (ret != DDS::RETCODE_OK) {
01589 ACE_ERROR_RETURN((LM_ERROR,
01590 ACE_TEXT("(%P|%t) ERROR: ")
01591 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01592 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01593 ret),
01594 ret);
01595 }
01596
01597 element->set_sample(create_control_message(UNREGISTER_INSTANCE,
01598 element->get_header(),
01599 unregistered_sample_data,
01600 source_timestamp));
01601 ret = this->data_container_->enqueue_control(element);
01602
01603 if (ret != DDS::RETCODE_OK) {
01604 ACE_ERROR_RETURN((LM_ERROR,
01605 ACE_TEXT("(%P|%t) ERROR: ")
01606 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01607 ACE_TEXT("enqueue_control failed.\n")),
01608 ret);
01609 }
01610
01611 send_all_to_flush_control(guard);
01612 return DDS::RETCODE_OK;
01613 }
01614
01615 DDS::ReturnCode_t
01616 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01617 const DDS::Time_t& source_timestamp)
01618 {
01619 DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01620
01621 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01622 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01623
01624 DataSample* data_sample = 0;
01625 ret = this->data_container_->dispose(handle, data_sample);
01626
01627 if (ret != DDS::RETCODE_OK) {
01628 ACE_ERROR_RETURN((LM_ERROR,
01629 ACE_TEXT("(%P|%t) ERROR: ")
01630 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01631 ACE_TEXT("dispose on container failed. \n")),
01632 ret);
01633 }
01634
01635 ret = this->data_container_->unregister(handle, data_sample, false);
01636
01637 if (ret != DDS::RETCODE_OK) {
01638 ACE_ERROR_RETURN((LM_ERROR,
01639 ACE_TEXT("(%P|%t) ERROR: ")
01640 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01641 ACE_TEXT("unregister with container failed. \n")),
01642 ret);
01643 }
01644
01645 DataSampleElement* element = 0;
01646 ret = this->data_container_->obtain_buffer_for_control(element);
01647
01648 if (ret != DDS::RETCODE_OK) {
01649 ACE_ERROR_RETURN((LM_ERROR,
01650 ACE_TEXT("(%P|%t) ERROR: ")
01651 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01652 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01653 ret),
01654 ret);
01655 }
01656
01657 element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01658 element->get_header(),
01659 data_sample,
01660 source_timestamp));
01661
01662 ret = this->data_container_->enqueue_control(element);
01663
01664 if (ret != DDS::RETCODE_OK) {
01665 ACE_ERROR_RETURN((LM_ERROR,
01666 ACE_TEXT("(%P|%t) ERROR: ")
01667 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01668 ACE_TEXT("enqueue_control failed.\n")),
01669 ret);
01670 }
01671
01672 send_all_to_flush_control(guard);
01673 return DDS::RETCODE_OK;
01674 }
01675
01676 void
01677 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01678 {
01679 {
01680 ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01681
01682 PublicationInstanceMapType::iterator it =
01683 this->data_container_->instances_.begin();
01684
01685 while (it != this->data_container_->instances_.end()) {
01686 DDS::InstanceHandle_t handle = it->first;
01687 ++it;
01688
01689 this->unregister_instance_i(handle, source_timestamp);
01690 }
01691 }
01692 }
01693
01694 DDS::ReturnCode_t
01695 DataWriterImpl::write(DataSample* data,
01696 DDS::InstanceHandle_t handle,
01697 const DDS::Time_t& source_timestamp,
01698 GUIDSeq* filter_out)
01699 {
01700 DBG_ENTRY_LVL("DataWriterImpl","write",6);
01701
01702 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01703 guard,
01704 get_lock (),
01705 ::DDS::RETCODE_ERROR);
01706
01707
01708 GUIDSeq_var filter_out_var(filter_out);
01709
01710 if (enabled_ == false) {
01711 ACE_ERROR_RETURN((LM_ERROR,
01712 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01713 ACE_TEXT(" Entity is not enabled. \n")),
01714 DDS::RETCODE_NOT_ENABLED);
01715 }
01716
01717 DataSampleElement* element = 0;
01718 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01719
01720 if (ret == DDS::RETCODE_TIMEOUT) {
01721 return ret;
01722
01723 } else if (ret != DDS::RETCODE_OK) {
01724 ACE_ERROR_RETURN((LM_ERROR,
01725 ACE_TEXT("(%P|%t) ERROR: ")
01726 ACE_TEXT("DataWriterImpl::write: ")
01727 ACE_TEXT("obtain_buffer returned %d.\n"),
01728 ret),
01729 ret);
01730 }
01731
01732 DataSample* temp;
01733 ret = create_sample_data_message(data,
01734 handle,
01735 element->get_header(),
01736 temp,
01737 source_timestamp,
01738 (filter_out != 0));
01739 element->set_sample(temp);
01740
01741 if (ret != DDS::RETCODE_OK) {
01742 return ret;
01743 }
01744
01745 element->set_filter_out(filter_out_var._retn());
01746
01747 ret = this->data_container_->enqueue(element, handle);
01748
01749 if (ret != DDS::RETCODE_OK) {
01750 ACE_ERROR_RETURN((LM_ERROR,
01751 ACE_TEXT("(%P|%t) ERROR: ")
01752 ACE_TEXT("DataWriterImpl::write: ")
01753 ACE_TEXT("enqueue failed.\n")),
01754 ret);
01755 }
01756 this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01757
01758 track_sequence_number(filter_out);
01759
01760 if (this->coherent_) {
01761 ++this->coherent_samples_;
01762 }
01763 SendStateDataSampleList list;
01764
01765 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01766
01767 if (this->publisher_servant_->is_suspended()) {
01768 if (min_suspended_transaction_id_ == 0) {
01769
01770
01771 min_suspended_transaction_id_ = transaction_id;
01772 } else {
01773
01774
01775 max_suspended_transaction_id_ = transaction_id;
01776 }
01777 this->available_data_list_.enqueue_tail(list);
01778
01779 } else {
01780 guard.release();
01781
01782 this->send(list, transaction_id);
01783 }
01784
01785 return DDS::RETCODE_OK;
01786 }
01787
01788 void
01789 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01790 {
01791 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01792
01793 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01794
01795 RepoIdSet excluded;
01796
01797 if (filter_out && !reader_info_.empty()) {
01798 const GUID_t* buf = filter_out->get_buffer();
01799 excluded.insert(buf, buf + filter_out->length());
01800 }
01801
01802 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01803 end = reader_info_.end(); iter != end; ++iter) {
01804
01805 if (excluded.count(iter->first) == 0) {
01806 iter->second.expected_sequence_ = sequence_number_;
01807 }
01808 }
01809
01810 #else
01811 ACE_UNUSED_ARG(filter_out);
01812 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01813 end = reader_info_.end(); iter != end; ++iter) {
01814 iter->second.expected_sequence_ = sequence_number_;
01815 }
01816
01817 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01818
01819 }
01820
01821 void
01822 DataWriterImpl::send_suspended_data()
01823 {
01824
01825
01826 if (max_suspended_transaction_id_ != 0) {
01827 this->send(this->available_data_list_, max_suspended_transaction_id_);
01828 max_suspended_transaction_id_ = 0;
01829 }
01830
01831
01832
01833
01834
01835 this->send(this->available_data_list_, min_suspended_transaction_id_);
01836 min_suspended_transaction_id_ = 0;
01837 this->available_data_list_.reset();
01838 }
01839
01840 DDS::ReturnCode_t
01841 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01842 const DDS::Time_t & source_timestamp)
01843 {
01844 DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01845
01846 if (enabled_ == false) {
01847 ACE_ERROR_RETURN((LM_ERROR,
01848 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01849 ACE_TEXT(" Entity is not enabled. \n")),
01850 DDS::RETCODE_NOT_ENABLED);
01851 }
01852
01853 DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR;
01854
01855 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01856
01857 DataSample* registered_sample_data = 0;
01858 ret = this->data_container_->dispose(handle, registered_sample_data);
01859
01860 if (ret != DDS::RETCODE_OK) {
01861 ACE_ERROR_RETURN((LM_ERROR,
01862 ACE_TEXT("(%P|%t) ERROR: ")
01863 ACE_TEXT("DataWriterImpl::dispose: ")
01864 ACE_TEXT("dispose failed.\n")),
01865 ret);
01866 }
01867
01868 DataSampleElement* element = 0;
01869 ret = this->data_container_->obtain_buffer_for_control(element);
01870
01871 if (ret != DDS::RETCODE_OK) {
01872 ACE_ERROR_RETURN((LM_ERROR,
01873 ACE_TEXT("(%P|%t) ERROR: ")
01874 ACE_TEXT("DataWriterImpl::dispose: ")
01875 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01876 ret),
01877 ret);
01878 }
01879
01880 element->set_sample(create_control_message(DISPOSE_INSTANCE,
01881 element->get_header(),
01882 registered_sample_data,
01883 source_timestamp));
01884 ret = this->data_container_->enqueue_control(element);
01885
01886 if (ret != DDS::RETCODE_OK) {
01887 ACE_ERROR_RETURN((LM_ERROR,
01888 ACE_TEXT("(%P|%t) ERROR: ")
01889 ACE_TEXT("DataWriterImpl::dispose: ")
01890 ACE_TEXT("enqueue_control failed.\n")),
01891 ret);
01892 }
01893
01894 send_all_to_flush_control(guard);
01895
01896 return DDS::RETCODE_OK;
01897 }
01898
01899 DDS::ReturnCode_t
01900 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01901 size_t& size)
01902 {
01903 return data_container_->num_samples(handle, size);
01904 }
01905
01906 void
01907 DataWriterImpl::unregister_all()
01908 {
01909 if (cancel_timer_) {
01910
01911 (void) reactor_->cancel_timer(this, 0);
01912 cancel_timer_ = false;
01913 }
01914
01915 data_container_->unregister_all();
01916 }
01917
01918 RepoId
01919 DataWriterImpl::get_publication_id()
01920 {
01921 return publication_id_;
01922 }
01923
01924 RepoId
01925 DataWriterImpl::get_dp_id()
01926 {
01927 return participant_servant_->get_id();
01928 }
01929
01930 const char*
01931 DataWriterImpl::get_topic_name()
01932 {
01933 return topic_name_.in();
01934 }
01935
01936 char const *
01937 DataWriterImpl::get_type_name() const
01938 {
01939 return type_name_.in();
01940 }
01941
01942 ACE_Message_Block*
01943 DataWriterImpl::create_control_message(MessageId message_id,
01944 DataSampleHeader& header_data,
01945 ACE_Message_Block* data,
01946 const DDS::Time_t& source_timestamp)
01947 {
01948 header_data.message_id_ = message_id;
01949 header_data.byte_order_ =
01950 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01951 header_data.coherent_change_ = 0;
01952
01953 if (data) {
01954 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01955
01956 if (header_data.message_length_ == 0) {
01957 data->release();
01958 }
01959 }
01960
01961 header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01962 header_data.sequence_repair_ = false;
01963 header_data.source_timestamp_sec_ = source_timestamp.sec;
01964 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01965 header_data.publication_id_ = publication_id_;
01966 header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01967
01968 if (message_id == INSTANCE_REGISTRATION
01969 || message_id == DISPOSE_INSTANCE
01970 || message_id == UNREGISTER_INSTANCE
01971 || message_id == DISPOSE_UNREGISTER_INSTANCE) {
01972
01973 header_data.sequence_repair_ = need_sequence_repair();
01974
01975
01976
01977 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01978 this->sequence_number_ = SequenceNumber();
01979
01980 } else {
01981 ++this->sequence_number_;
01982 }
01983
01984 header_data.sequence_ = this->sequence_number_;
01985 header_data.key_fields_only_ = true;
01986 }
01987
01988 ACE_Message_Block* message = 0;
01989 ACE_NEW_MALLOC_RETURN(message,
01990 static_cast<ACE_Message_Block*>(
01991 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01992 ACE_Message_Block(
01993 DataSampleHeader::max_marshaled_size(),
01994 ACE_Message_Block::MB_DATA,
01995 header_data.message_length_ ? data : 0,
01996 0,
01997 0,
01998 get_db_lock(),
01999 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02000 ACE_Time_Value::zero,
02001 ACE_Time_Value::max_time,
02002 db_allocator_,
02003 mb_allocator_),
02004 0);
02005
02006 *message << header_data;
02007
02008
02009 if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02010 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02011
02012 RepoIdToReaderInfoMap::iterator reader;
02013
02014 for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02015 reader->second.expected_sequence_ = sequence_number_;
02016 }
02017 }
02018 if (DCPS_debug_level >= 4) {
02019 const GuidConverter converter(publication_id_);
02020 ACE_DEBUG((LM_DEBUG,
02021 ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02022 ACE_TEXT("from publication %C sending control sample: %C .\n"),
02023 OPENDDS_STRING(converter).c_str(),
02024 to_string(header_data).c_str()));
02025 }
02026 return message;
02027 }
02028
02029 DDS::ReturnCode_t
02030 DataWriterImpl::create_sample_data_message(DataSample* data,
02031 DDS::InstanceHandle_t instance_handle,
02032 DataSampleHeader& header_data,
02033 ACE_Message_Block*& message,
02034 const DDS::Time_t& source_timestamp,
02035 bool content_filter)
02036 {
02037 PublicationInstance* const instance =
02038 data_container_->get_handle_instance(instance_handle);
02039
02040 if (0 == instance) {
02041 ACE_ERROR_RETURN((LM_ERROR,
02042 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02043 ACE_TEXT("failed to find instance for handle %d\n"),
02044 instance_handle),
02045 DDS::RETCODE_ERROR);
02046 }
02047
02048 header_data.message_id_ = SAMPLE_DATA;
02049 header_data.byte_order_ =
02050 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02051 header_data.coherent_change_ = this->coherent_;
02052 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02053 header_data.group_coherent_ =
02054 this->publisher_servant_->qos_.presentation.access_scope
02055 == DDS::GROUP_PRESENTATION_QOS;
02056 #endif
02057 header_data.content_filter_ = content_filter;
02058 header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02059 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02060 header_data.sequence_repair_ = need_sequence_repair();
02061
02062 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02063 this->sequence_number_ = SequenceNumber();
02064
02065 } else {
02066 ++this->sequence_number_;
02067 }
02068
02069 header_data.sequence_ = this->sequence_number_;
02070 header_data.source_timestamp_sec_ = source_timestamp.sec;
02071 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02072
02073 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02074 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02075 header_data.lifespan_duration_ = true;
02076 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02077 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02078 }
02079
02080 header_data.publication_id_ = publication_id_;
02081 header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
02082 size_t max_marshaled_size = header_data.max_marshaled_size();
02083
02084 ACE_NEW_MALLOC_RETURN(message,
02085 static_cast<ACE_Message_Block*>(
02086 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02087 ACE_Message_Block(max_marshaled_size,
02088 ACE_Message_Block::MB_DATA,
02089 data,
02090 0,
02091 header_allocator_,
02092 get_db_lock(),
02093 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02094 ACE_Time_Value::zero,
02095 ACE_Time_Value::max_time,
02096 db_allocator_,
02097 mb_allocator_),
02098 DDS::RETCODE_ERROR);
02099
02100 *message << header_data;
02101 if (DCPS_debug_level >= 4) {
02102 const GuidConverter converter(publication_id_);
02103 ACE_DEBUG((LM_DEBUG,
02104 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02105 ACE_TEXT("from publication %C sending data sample: %C .\n"),
02106 OPENDDS_STRING(converter).c_str(),
02107 to_string(header_data).c_str()));
02108 }
02109 return DDS::RETCODE_OK;
02110 }
02111
02112 void
02113 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02114 {
02115 DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02116
02117 if (!(sample->get_pub_id() == this->publication_id_)) {
02118 GuidConverter sample_converter(sample->get_pub_id());
02119 GuidConverter writer_converter(publication_id_);
02120 ACE_ERROR((LM_ERROR,
02121 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02122 ACE_TEXT(" The publication id %C from delivered element ")
02123 ACE_TEXT("does not match the datawriter's id %C\n"),
02124 OPENDDS_STRING(sample_converter).c_str(),
02125 OPENDDS_STRING(writer_converter).c_str()));
02126 return;
02127 }
02128
02129 ++data_delivered_count_;
02130
02131 this->data_container_->data_delivered(sample);
02132 }
02133
02134 void
02135 DataWriterImpl::control_delivered(ACE_Message_Block* sample)
02136 {
02137 DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02138 sample->release();
02139 controlTracker.message_delivered();
02140 }
02141
02142 EntityImpl*
02143 DataWriterImpl::parent() const
02144 {
02145 return this->publisher_servant_;
02146 }
02147
02148 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02149 bool
02150 DataWriterImpl::filter_out(const DataSampleElement& elt,
02151 const OPENDDS_STRING& filterClassName,
02152 const FilterEvaluator& evaluator,
02153 const DDS::StringSeq& expression_params) const
02154 {
02155 TypeSupportImpl* const typesupport =
02156 dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02157
02158 if (!typesupport) {
02159 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02160 return false;
02161 }
02162
02163 if (filterClassName == "DDSSQL" ||
02164 filterClassName == "OPENDDSSQL") {
02165 return !evaluator.eval(elt.get_sample()->cont(),
02166 elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02167 elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02168 expression_params);
02169 }
02170 else {
02171 return false;
02172 }
02173 }
02174 #endif
02175
02176 bool
02177 DataWriterImpl::check_transport_qos(const TransportInst&)
02178 {
02179
02180
02181 return true;
02182 }
02183
02184 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02185
02186 bool
02187 DataWriterImpl::coherent_changes_pending()
02188 {
02189 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02190 guard,
02191 get_lock(),
02192 false);
02193
02194 return this->coherent_;
02195 }
02196
02197 void
02198 DataWriterImpl::begin_coherent_changes()
02199 {
02200 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02201 guard,
02202 get_lock());
02203
02204 this->coherent_ = true;
02205 }
02206
02207 void
02208 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02209 {
02210
02211 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02212 guard,
02213 get_lock());
02214
02215 CoherentChangeControl end_msg;
02216 end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02217 end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02218 end_msg.group_coherent_
02219 = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02220
02221 if (end_msg.group_coherent_) {
02222 end_msg.publisher_id_ = this->publisher_servant_->publisher_id_;
02223 end_msg.group_coherent_samples_ = group_samples;
02224 }
02225
02226 ACE_Message_Block* data = 0;
02227 size_t max_marshaled_size = end_msg.max_marshaled_size();
02228
02229 ACE_NEW(data, ACE_Message_Block(max_marshaled_size,
02230 ACE_Message_Block::MB_DATA,
02231 0,
02232 0,
02233 0,
02234 get_db_lock()));
02235
02236 Serializer serializer(
02237 data,
02238 this->swap_bytes());
02239
02240 serializer << end_msg;
02241
02242 DDS::Time_t source_timestamp =
02243 time_value_to_time(ACE_OS::gettimeofday());
02244
02245 DataSampleHeader header;
02246 ACE_Message_Block* control =
02247 create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp);
02248
02249
02250 this->coherent_ = false;
02251 this->coherent_samples_ = 0;
02252
02253 guard.release();
02254 if (this->send_control(header, control) == SEND_CONTROL_ERROR) {
02255 ACE_ERROR((LM_ERROR,
02256 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02257 ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02258 }
02259 }
02260
02261 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02262
02263 void
02264 DataWriterImpl::data_dropped(const DataSampleElement* element,
02265 bool dropped_by_transport)
02266 {
02267 DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02268
02269
02270 ++data_dropped_count_;
02271
02272 this->data_container_->data_dropped(element, dropped_by_transport);
02273 }
02274
02275 void
02276 DataWriterImpl::control_dropped(ACE_Message_Block* sample,
02277 bool )
02278 {
02279 DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02280 sample->release();
02281 controlTracker.message_dropped();
02282 }
02283
02284 DDS::DataWriterListener_ptr
02285 DataWriterImpl::listener_for(DDS::StatusKind kind)
02286 {
02287
02288
02289
02290 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02291 return publisher_servant_->listener_for(kind);
02292
02293 } else {
02294 return DDS::DataWriterListener::_duplicate(listener_.in());
02295 }
02296 }
02297
02298 int
02299 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02300 const void * )
02301 {
02302 const ACE_Time_Value delta = tv - last_liveliness_check_time_;
02303 if (delta < liveliness_check_interval_) {
02304
02305 if (reactor_->cancel_timer(this) == -1) {
02306 ACE_ERROR((LM_ERROR,
02307 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02308 ACE_TEXT("cancel_timer")));
02309 }
02310 if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) {
02311 ACE_ERROR((LM_ERROR,
02312 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02313 ACE_TEXT("schedule_timer")));
02314 }
02315 return 0;
02316 }
02317
02318 bool liveliness_lost = false;
02319
02320 ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02321
02322
02323 if (elapsed >= liveliness_check_interval_) {
02324 switch (this->qos_.liveliness.kind) {
02325 case DDS::AUTOMATIC_LIVELINESS_QOS:
02326 if (this->send_liveliness(tv) == false) {
02327 liveliness_lost = true;
02328 }
02329 break;
02330
02331 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02332 if (liveliness_asserted_) {
02333 if (this->send_liveliness(tv) == false) {
02334 liveliness_lost = true;
02335 }
02336 }
02337 break;
02338
02339 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02340
02341 break;
02342 }
02343 }
02344
02345 liveliness_asserted_ = false;
02346 last_liveliness_check_time_ = tv;
02347 elapsed = tv - last_liveliness_activity_time_;
02348
02349
02350 if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02351 liveliness_lost = true;
02352 }
02353
02354 if (!this->liveliness_lost_ && liveliness_lost) {
02355 ++ this->liveliness_lost_status_.total_count;
02356 ++ this->liveliness_lost_status_.total_count_change;
02357
02358 DDS::DataWriterListener_var listener =
02359 listener_for(DDS::LIVELINESS_LOST_STATUS);
02360
02361 if (!CORBA::is_nil(listener.in())) {
02362 listener->on_liveliness_lost(this->dw_local_objref_.in(),
02363 this->liveliness_lost_status_);
02364 }
02365 }
02366
02367 this->liveliness_lost_ = liveliness_lost;
02368 return 0;
02369 }
02370
02371 int
02372 DataWriterImpl::handle_close(ACE_HANDLE,
02373 ACE_Reactor_Mask)
02374 {
02375 this->_remove_ref();
02376 return 0;
02377 }
02378
02379 bool
02380 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02381 {
02382 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02383 !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02384 DDS::Time_t t = time_value_to_time(now);
02385 DataSampleHeader header;
02386 ACE_Message_Block* liveliness_msg =
02387 this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t);
02388
02389 if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) {
02390 ACE_ERROR_RETURN((LM_ERROR,
02391 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02392 ACE_TEXT(" send_control failed. \n")),
02393 false);
02394
02395 } else {
02396 last_liveliness_activity_time_ = now;
02397 return true;
02398 }
02399 } else {
02400 last_liveliness_activity_time_ = now;
02401 return true;
02402 }
02403 }
02404
02405 void
02406 DataWriterImpl::prepare_to_delete()
02407 {
02408 this->set_deleted(true);
02409 this->stop_associating();
02410 }
02411
02412 PublicationInstance*
02413 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02414 {
02415 PublicationInstance* instance = 0;
02416
02417 if (0 != data_container_) {
02418 instance = data_container_->get_handle_instance(handle);
02419 }
02420
02421 return instance;
02422 }
02423
02424 void
02425 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02426 {
02427 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02428
02429 if (!is_bit_) {
02430
02431
02432 DataWriterListener_var the_listener =
02433 DataWriterListener::_narrow(this->listener_.in());
02434
02435 if (!CORBA::is_nil(the_listener.in())) {
02436 PublicationDisconnectedStatus status;
02437
02438
02439
02440 this->lookup_instance_handles(subids,
02441 status.subscription_handles);
02442 the_listener->on_publication_disconnected(this->dw_local_objref_.in(),
02443 status);
02444 }
02445 }
02446 }
02447
02448 void
02449 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02450 {
02451 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02452
02453 if (!is_bit_) {
02454
02455
02456
02457 DataWriterListener_var the_listener =
02458 DataWriterListener::_narrow(this->listener_.in());
02459
02460 if (!CORBA::is_nil(the_listener.in())) {
02461 PublicationDisconnectedStatus status;
02462
02463
02464
02465 if (this->lookup_instance_handles(subids,
02466 status.subscription_handles) == false) {
02467 ACE_ERROR((LM_ERROR,
02468 "(%P|%t) ERROR: DataWriterImpl::"
02469 "notify_publication_reconnected: "
02470 "lookup_instance_handles failed\n"));
02471 }
02472
02473 the_listener->on_publication_reconnected(this->dw_local_objref_.in(),
02474 status);
02475 }
02476 }
02477 }
02478
02479 void
02480 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02481 {
02482 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02483
02484 if (!is_bit_) {
02485
02486
02487
02488 DataWriterListener_var the_listener =
02489 DataWriterListener::_narrow(this->listener_.in());
02490
02491 if (!CORBA::is_nil(the_listener.in())) {
02492 PublicationLostStatus status;
02493
02494
02495
02496 this->lookup_instance_handles(subids,
02497 status.subscription_handles);
02498 the_listener->on_publication_lost(this->dw_local_objref_.in(),
02499 status);
02500 }
02501 }
02502 }
02503
02504 void
02505 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02506 {
02507 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02508
02509 if (!is_bit_) {
02510
02511
02512
02513 DataWriterListener_var the_listener =
02514 DataWriterListener::_narrow(this->listener_.in());
02515
02516 if (!CORBA::is_nil(the_listener.in())) {
02517 PublicationLostStatus status;
02518
02519 CORBA::ULong len = handles.length();
02520 status.subscription_handles.length(len);
02521
02522 for (CORBA::ULong i = 0; i < len; ++ i) {
02523 status.subscription_handles[i] = handles[i];
02524 }
02525
02526 the_listener->on_publication_lost(this->dw_local_objref_.in(),
02527 status);
02528 }
02529 }
02530 }
02531
02532 void
02533 DataWriterImpl::notify_connection_deleted(const RepoId& peerId)
02534 {
02535 DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6);
02536 on_notification_of_connection_deletion(peerId);
02537
02538
02539 DataWriterListener_var the_listener =
02540 DataWriterListener::_narrow(this->listener_.in());
02541
02542 if (!CORBA::is_nil(the_listener.in()))
02543 the_listener->on_connection_deleted(this->dw_local_objref_.in());
02544 }
02545
02546 bool
02547 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02548 DDS::InstanceHandleSeq & hdls)
02549 {
02550 if (DCPS_debug_level > 9) {
02551 CORBA::ULong const size = ids.length();
02552 OPENDDS_STRING separator;
02553 OPENDDS_STRING buffer;
02554
02555 for (unsigned long i = 0; i < size; ++i) {
02556 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02557 separator = ", ";
02558 }
02559
02560 ACE_DEBUG((LM_DEBUG,
02561 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02562 ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02563 buffer.c_str()));
02564 }
02565
02566 CORBA::ULong const num_rds = ids.length();
02567 hdls.length(num_rds);
02568
02569 for (CORBA::ULong i = 0; i < num_rds; ++i) {
02570 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02571 }
02572
02573 return true;
02574 }
02575
02576 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02577 bool
02578 DataWriterImpl::persist_data()
02579 {
02580 return this->data_container_->persist_data();
02581 }
02582 #endif
02583
02584 void
02585 DataWriterImpl::reschedule_deadline()
02586 {
02587 if (this->watchdog_ != 0) {
02588 this->data_container_->reschedule_deadline();
02589 }
02590 }
02591
02592 bool
02593 DataWriterImpl::pending_control()
02594 {
02595 return controlTracker.pending_messages();
02596 }
02597
02598 void
02599 DataWriterImpl::wait_control_pending()
02600 {
02601 OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02602 controlTracker.wait_messages_pending(caller_string);
02603 }
02604
02605 void
02606 DataWriterImpl::wait_pending()
02607 {
02608 this->data_container_->wait_pending();
02609 }
02610
02611 void
02612 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02613 {
02614 this->data_container_->get_instance_handles(instance_handles);
02615 }
02616
02617 void
02618 DataWriterImpl::get_readers(RepoIdSet& readers)
02619 {
02620 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02621 readers = this->readers_;
02622 }
02623
02624 void
02625 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02626 {
02627 this->publisher_servant_->get_qos(qos_data.pub_qos);
02628 qos_data.dw_qos = this->qos_;
02629 qos_data.topic_name = this->topic_name_.in();
02630 }
02631
02632 bool
02633 DataWriterImpl::need_sequence_repair()
02634 {
02635 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02636 return need_sequence_repair_i();
02637 }
02638
02639 bool
02640 DataWriterImpl::need_sequence_repair_i() const
02641 {
02642 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02643 end = reader_info_.end(); it != end; ++it) {
02644 if (it->second.expected_sequence_ != sequence_number_) {
02645 return true;
02646 }
02647 }
02648
02649 return false;
02650 }
02651
02652 SendControlStatus
02653 DataWriterImpl::send_control(const DataSampleHeader& header,
02654 ACE_Message_Block* msg)
02655 {
02656 controlTracker.message_sent();
02657
02658 SendControlStatus status = TransportClient::send_control(header, msg);
02659
02660 if (status != SEND_CONTROL_OK) {
02661 controlTracker.message_dropped();
02662 }
02663
02664 return status;
02665 }
02666
02667 }
02668 }