00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042
00043 #include "tao/ORB_Core.h"
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046
00047 #include <stdexcept>
00048
00049 namespace OpenDDS {
00050 namespace DCPS {
00051
00052
00053
00054
00055
00056 DataWriterImpl::DataWriterImpl()
00057 : data_dropped_count_(0),
00058 data_delivered_count_(0),
00059 controlTracker("DataWriterImpl"),
00060 n_chunks_(TheServiceParticipant->n_chunks()),
00061 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00062 qos_(TheServiceParticipant->initial_DataWriterQos()),
00063 participant_servant_(0),
00064 topic_id_(GUID_UNKNOWN),
00065 topic_servant_(0),
00066 listener_mask_(DEFAULT_STATUS_MASK),
00067 domain_id_(0),
00068 publisher_servant_(0),
00069 publication_id_(GUID_UNKNOWN),
00070 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00071 coherent_(false),
00072 coherent_samples_(0),
00073 data_container_(0),
00074 liveliness_lost_(false),
00075 mb_allocator_(0),
00076 db_allocator_(0),
00077 header_allocator_(0),
00078 reactor_(0),
00079 liveliness_check_interval_(ACE_Time_Value::max_time),
00080 last_liveliness_activity_time_(ACE_Time_Value::zero),
00081 last_deadline_missed_total_count_(0),
00082 watchdog_(),
00083 cancel_timer_(false),
00084 is_bit_(false),
00085 initialized_(false),
00086 min_suspended_transaction_id_(0),
00087 max_suspended_transaction_id_(0),
00088 monitor_(0),
00089 periodic_monitor_(0),
00090 db_lock_pool_(0),
00091 liveliness_asserted_(false)
00092 {
00093 liveliness_lost_status_.total_count = 0;
00094 liveliness_lost_status_.total_count_change = 0;
00095
00096 offered_deadline_missed_status_.total_count = 0;
00097 offered_deadline_missed_status_.total_count_change = 0;
00098 offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00099
00100 offered_incompatible_qos_status_.total_count = 0;
00101 offered_incompatible_qos_status_.total_count_change = 0;
00102 offered_incompatible_qos_status_.last_policy_id = 0;
00103 offered_incompatible_qos_status_.policies.length(0);
00104
00105 publication_match_status_.total_count = 0;
00106 publication_match_status_.total_count_change = 0;
00107 publication_match_status_.current_count = 0;
00108 publication_match_status_.current_count_change = 0;
00109 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00110
00111 monitor_ =
00112 TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00113 periodic_monitor_ =
00114 TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00115
00116 db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_);
00117 }
00118
00119
00120
00121 DataWriterImpl::~DataWriterImpl()
00122 {
00123 DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00124
00125 if (initialized_) {
00126 delete data_container_;
00127 delete mb_allocator_;
00128 delete db_allocator_;
00129 delete header_allocator_;
00130 }
00131 delete db_lock_pool_;
00132 }
00133
00134
00135 void
00136 DataWriterImpl::cleanup()
00137 {
00138 if (cancel_timer_) {
00139
00140
00141 (void) reactor_->cancel_timer(this, 0);
00142 cancel_timer_ = false;
00143 }
00144
00145
00146 topic_objref_ = DDS::Topic::_nil();
00147 topic_servant_->remove_entity_ref();
00148 topic_servant_->_remove_ref();
00149 topic_servant_ = 0;
00150
00151 dw_local_objref_ = DDS::DataWriter::_nil();
00152 }
00153
00154 void
00155 DataWriterImpl::init(
00156 DDS::Topic_ptr topic,
00157 TopicImpl * topic_servant,
00158 const DDS::DataWriterQos & qos,
00159 DDS::DataWriterListener_ptr a_listener,
00160 const DDS::StatusMask & mask,
00161 OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00162 OpenDDS::DCPS::PublisherImpl * publisher_servant,
00163 DDS::DataWriter_ptr dw_local)
00164 {
00165 DBG_ENTRY_LVL("DataWriterImpl","init",6);
00166 topic_objref_ = DDS::Topic::_duplicate(topic);
00167 topic_servant_ = topic_servant;
00168 topic_servant_->_add_ref();
00169 topic_servant_->add_entity_ref();
00170 topic_name_ = topic_servant_->get_name();
00171 topic_id_ = topic_servant_->get_id();
00172 type_name_ = topic_servant_->get_type_name();
00173
00174 #if !defined (DDS_HAS_MINIMUM_BIT)
00175 is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00176 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00177 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00178 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00179 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00180
00181 qos_ = qos;
00182
00183
00184 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00185 listener_mask_ = mask;
00186
00187
00188
00189 participant_servant_ = participant_servant;
00190 domain_id_ = participant_servant_->get_domain_id();
00191
00192
00193
00194 publisher_servant_ = publisher_servant;
00195 dw_local_objref_ = DDS::DataWriter::_duplicate(dw_local);
00196
00197 this->reactor_ = TheServiceParticipant->timer();
00198
00199 initialized_ = true;
00200 }
00201
00202 DDS::InstanceHandle_t
00203 DataWriterImpl::get_instance_handle()
00204 {
00205 return this->participant_servant_->id_to_handle(publication_id_);
00206 }
00207
00208 DDS::InstanceHandle_t
00209 DataWriterImpl::get_next_handle()
00210 {
00211 return this->participant_servant_->id_to_handle(GUID_UNKNOWN);
00212 }
00213
00214 void
00215 DataWriterImpl::add_association(const RepoId& yourId,
00216 const ReaderAssociation& reader,
00217 bool active)
00218 {
00219 DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00220
00221 if (DCPS_debug_level) {
00222 GuidConverter writer_converter(yourId);
00223 GuidConverter reader_converter(reader.readerId);
00224 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00225 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00226 OPENDDS_STRING(writer_converter).c_str(),
00227 OPENDDS_STRING(reader_converter).c_str()));
00228 }
00229
00230 if (entity_deleted_.value()) {
00231 if (DCPS_debug_level)
00232 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00233 ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00234
00235 return;
00236 }
00237
00238 if (GUID_UNKNOWN == publication_id_) {
00239 publication_id_ = yourId;
00240 }
00241
00242 {
00243 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00244 reader_info_.insert(std::make_pair(reader.readerId,
00245 ReaderInfo(reader.filterClassName,
00246 TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00247 reader.exprParams, participant_servant_,
00248 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00249 }
00250
00251 if (DCPS_debug_level > 4) {
00252 GuidConverter converter(get_publication_id());
00253 ACE_DEBUG((LM_DEBUG,
00254 ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00255 ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00256 OPENDDS_STRING(converter).c_str(),
00257 qos_.transport_priority.value));
00258 }
00259
00260 AssociationData data;
00261 data.remote_id_ = reader.readerId;
00262 data.remote_data_ = reader.readerTransInfo;
00263 data.remote_reliable_ =
00264 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00265 data.remote_durable_ =
00266 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00267
00268 if (!associate(data, active)) {
00269
00270 if (DCPS_debug_level) {
00271 ACE_DEBUG((LM_ERROR,
00272 ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00273 ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00274 }
00275 }
00276 }
00277
00278 void
00279 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00280 {
00281 DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00282
00283 if (!(flags & ASSOC_OK)) {
00284 if (DCPS_debug_level) {
00285 const GuidConverter conv(remote_id);
00286 ACE_DEBUG((LM_ERROR,
00287 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00288 ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00289 OPENDDS_STRING(conv).c_str()));
00290 }
00291
00292 return;
00293 }
00294 if (DCPS_debug_level) {
00295 const GuidConverter writer_conv(publication_id_);
00296 const GuidConverter conv(remote_id);
00297 ACE_DEBUG((LM_INFO,
00298 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00299 ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00300 OPENDDS_STRING(writer_conv).c_str(),
00301 OPENDDS_STRING(conv).c_str()));
00302 }
00303 if (flags & ASSOC_ACTIVE) {
00304
00305 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00306
00307
00308 if (assoc_complete_readers_.count(remote_id)) {
00309 if (DCPS_debug_level) {
00310 const GuidConverter writer_conv(publication_id_);
00311 const GuidConverter converter(remote_id);
00312 ACE_DEBUG((LM_DEBUG,
00313 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00314 ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00315 OPENDDS_STRING(writer_conv).c_str(),
00316 OPENDDS_STRING(converter).c_str()));
00317 }
00318 assoc_complete_readers_.erase(remote_id);
00319 association_complete_i(remote_id);
00320
00321
00322
00323
00324 } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00325 const GuidConverter converter(remote_id);
00326 ACE_ERROR((LM_ERROR,
00327 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00328 ACE_TEXT("failed to mark %C as pending.\n"),
00329 OPENDDS_STRING(converter).c_str()));
00330
00331 } else {
00332 if (DCPS_debug_level) {
00333 const GuidConverter converter(remote_id);
00334 ACE_DEBUG((LM_DEBUG,
00335 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00336 ACE_TEXT("marked %C as pending.\n"),
00337 OPENDDS_STRING(converter).c_str()));
00338 }
00339 }
00340
00341 } else {
00342
00343
00344 if (DCPS_debug_level) {
00345 const GuidConverter conv(publication_id_);
00346 ACE_DEBUG((LM_ERROR,
00347 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00348 ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00349 OPENDDS_STRING(conv).c_str()));
00350 }
00351 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00352 disco->association_complete(domain_id_, participant_servant_->get_id(),
00353 publication_id_, remote_id);
00354 }
00355 }
00356
00357 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00358 const char* filter,
00359 const DDS::StringSeq& params,
00360 DomainParticipantImpl* participant,
00361 bool durable)
00362 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00363 : participant_(participant)
00364 , filter_class_name_(filterClassName)
00365 , filter_(filter)
00366 , expression_params_(params)
00367 , eval_(*filter ? participant->get_filter_eval(filter) : 0)
00368 , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00369 , durable_(durable)
00370 {}
00371 #else
00372 : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00373 , durable_(durable)
00374 {
00375 ACE_UNUSED_ARG(filterClassName);
00376 ACE_UNUSED_ARG(filter);
00377 ACE_UNUSED_ARG(params);
00378 ACE_UNUSED_ARG(participant);
00379 }
00380 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00381
00382 DataWriterImpl::ReaderInfo::~ReaderInfo()
00383 {
00384 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00385 eval_ = RcHandle<FilterEvaluator>();
00386
00387 if (!filter_.empty()) {
00388 participant_->deref_filter_eval(filter_.c_str());
00389 }
00390
00391 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00392 }
00393
00394 void
00395 DataWriterImpl::association_complete(const RepoId& remote_id)
00396 {
00397 DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00398
00399 if (DCPS_debug_level >= 1) {
00400 GuidConverter writer_converter(this->publication_id_);
00401 GuidConverter reader_converter(remote_id);
00402 ACE_DEBUG((LM_DEBUG,
00403 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00404 ACE_TEXT("bit %d local %C remote %C\n"),
00405 is_bit_,
00406 OPENDDS_STRING(writer_converter).c_str(),
00407 OPENDDS_STRING(reader_converter).c_str()));
00408 }
00409
00410 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00411
00412 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00413 if (DCPS_debug_level) {
00414 GuidConverter writer_converter(this->publication_id_);
00415 GuidConverter reader_converter(remote_id);
00416 ACE_DEBUG((LM_DEBUG,
00417 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00418 ACE_TEXT("bit %d local %C did not find pending reader: %C")
00419 ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00420 is_bit_,
00421 OPENDDS_STRING(writer_converter).c_str(),
00422 OPENDDS_STRING(reader_converter).c_str()));
00423 }
00424
00425
00426 assoc_complete_readers_.insert(remote_id);
00427
00428 } else {
00429 association_complete_i(remote_id);
00430 }
00431 }
00432
00433 void
00434 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00435 {
00436 DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00437
00438 if (DCPS_debug_level >= 1) {
00439 GuidConverter writer_converter(this->publication_id_);
00440 GuidConverter reader_converter(remote_id);
00441 ACE_DEBUG((LM_DEBUG,
00442 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00443 ACE_TEXT("bit %d local %C remote %C\n"),
00444 is_bit_,
00445 OPENDDS_STRING(writer_converter).c_str(),
00446 OPENDDS_STRING(reader_converter).c_str()));
00447 }
00448
00449 bool reader_durable = false;
00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00451 OPENDDS_STRING filterClassName;
00452 RcHandle<FilterEvaluator> eval;
00453 DDS::StringSeq expression_params;
00454 #endif
00455 {
00456 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00457
00458 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00459 GuidConverter converter(remote_id);
00460 ACE_ERROR((LM_ERROR,
00461 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00462 ACE_TEXT("insert %C from pending failed.\n"),
00463 OPENDDS_STRING(converter).c_str()));
00464 }
00465 }
00466 {
00467 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00468 RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00469
00470 if (it != reader_info_.end()) {
00471 reader_durable = it->second.durable_;
00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00473 filterClassName = it->second.filter_class_name_;
00474 eval = it->second.eval_;
00475 expression_params = it->second.expression_params_;
00476 #endif
00477 }
00478 }
00479
00480 if (this->monitor_) {
00481 this->monitor_->report();
00482 }
00483
00484 if (!is_bit_) {
00485
00486 DDS::InstanceHandle_t handle =
00487 this->participant_servant_->id_to_handle(remote_id);
00488
00489 {
00490
00491 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00492
00493
00494 ++publication_match_status_.total_count;
00495 ++publication_match_status_.total_count_change;
00496 ++publication_match_status_.current_count;
00497 ++publication_match_status_.current_count_change;
00498
00499 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00500 GuidConverter converter(remote_id);
00501 ACE_DEBUG((LM_WARNING,
00502 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00503 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00504 OPENDDS_STRING(converter).c_str(),
00505 handle));
00506 return;
00507
00508 } else if (DCPS_debug_level > 4) {
00509 GuidConverter converter(remote_id);
00510 ACE_DEBUG((LM_DEBUG,
00511 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00512 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00513 OPENDDS_STRING(converter).c_str(),
00514 handle));
00515 }
00516
00517 publication_match_status_.last_subscription_handle = handle;
00518
00519 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00520 }
00521
00522 DDS::DataWriterListener_var listener =
00523 listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00524
00525 if (!CORBA::is_nil(listener.in())) {
00526
00527 listener->on_publication_matched(dw_local_objref_.in(),
00528 publication_match_status_);
00529
00530
00531
00532 publication_match_status_.total_count_change = 0;
00533 publication_match_status_.current_count_change = 0;
00534 }
00535
00536 notify_status_condition();
00537 }
00538
00539
00540 if (reader_durable) {
00541
00542
00543 this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00545 , filterClassName, eval.in(), expression_params
00546 #endif
00547 );
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00558 guard,
00559 this->get_lock());
00560
00561 SendStateDataSampleList list = this->get_resend_data();
00562 {
00563 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00564
00565 SequenceNumber& seq =
00566 reader_info_.find(remote_id)->second.expected_sequence_;
00567
00568 for (SendStateDataSampleList::iterator list_el = list.begin();
00569 list_el != list.end(); ++list_el) {
00570 list_el->get_header().historic_sample_ = true;
00571
00572 if (list_el->get_header().sequence_ > seq) {
00573 seq = list_el->get_header().sequence_;
00574 }
00575 }
00576 }
00577
00578 if (this->publisher_servant_->is_suspended()) {
00579 this->available_data_list_.enqueue_tail(list);
00580
00581 } else {
00582 if (DCPS_debug_level >= 4) {
00583 ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00584 }
00585
00586 size_t size = 0, padding = 0;
00587 gen_find_size(remote_id, size, padding);
00588 ACE_Message_Block* const data =
00589 new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00590 get_db_lock());
00591 Serializer ser(data);
00592 ser << remote_id;
00593
00594 const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00595 DataSampleHeader header;
00596 ACE_Message_Block* const end_historic_samples =
00597 create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp);
00598
00599 this->controlTracker.message_sent();
00600 guard.release();
00601 SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id);
00602 if (ret == SEND_CONTROL_ERROR) {
00603 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00604 ACE_TEXT("DataWriterImpl::association_complete_i: ")
00605 ACE_TEXT("send_w_control failed.\n")));
00606 this->controlTracker.message_dropped();
00607 }
00608 }
00609 }
00610 }
00611
00612 void
00613 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00614 CORBA::Boolean notify_lost)
00615 {
00616 if (readers.length() == 0) {
00617 return;
00618 }
00619
00620 if (DCPS_debug_level >= 1) {
00621 GuidConverter writer_converter(publication_id_);
00622 GuidConverter reader_converter(readers[0]);
00623 ACE_DEBUG((LM_DEBUG,
00624 ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00625 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00626 is_bit_,
00627 OPENDDS_STRING(writer_converter).c_str(),
00628 OPENDDS_STRING(reader_converter).c_str(),
00629 readers.length()));
00630 }
00631
00632
00633 this->stop_associating(readers.get_buffer(), readers.length());
00634
00635 ReaderIdSeq fully_associated_readers;
00636 CORBA::ULong fully_associated_len = 0;
00637 ReaderIdSeq rds;
00638 CORBA::ULong rds_len = 0;
00639 DDS::InstanceHandleSeq handles;
00640
00641 ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00642 {
00643
00644 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00645
00646
00647
00648
00649
00650 CORBA::ULong len = readers.length();
00651
00652 for (CORBA::ULong i = 0; i < len; ++i) {
00653
00654
00655
00656
00657 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00658 ++ fully_associated_len;
00659 fully_associated_readers.length(fully_associated_len);
00660 fully_associated_readers [fully_associated_len - 1] = readers[i];
00661
00662 ++ rds_len;
00663 rds.length(rds_len);
00664 rds [rds_len - 1] = readers[i];
00665
00666 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00667 ++ rds_len;
00668 rds.length(rds_len);
00669 rds [rds_len - 1] = readers[i];
00670
00671 GuidConverter converter(readers[i]);
00672 ACE_DEBUG((LM_WARNING,
00673 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00674 ACE_TEXT("removing reader %C before association_complete() call.\n"),
00675 OPENDDS_STRING(converter).c_str()));
00676 }
00677
00678 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00679 reader_info_.erase(readers[i]);
00680
00681
00682 }
00683
00684 if (fully_associated_len > 0 && !is_bit_) {
00685
00686
00687 if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00688 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::remove_associations: "
00689 "lookup_instance_handles failed, notify %d \n", notify_lost));
00690
00691 return;
00692 }
00693
00694 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00695 id_to_handle_map_.erase(fully_associated_readers[i]);
00696 }
00697 }
00698
00699
00700
00701 if (!this->is_bit_) {
00702
00703
00704 int matchedSubscriptions =
00705 static_cast<int>(this->id_to_handle_map_.size());
00706 this->publication_match_status_.current_count_change =
00707 matchedSubscriptions - this->publication_match_status_.current_count;
00708
00709
00710 if (this->publication_match_status_.current_count_change != 0) {
00711 this->publication_match_status_.current_count = matchedSubscriptions;
00712
00713
00714
00715
00716 this->publication_match_status_.last_subscription_handle =
00717 handles[fully_associated_len - 1];
00718
00719 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00720
00721 DDS::DataWriterListener_var listener =
00722 this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00723
00724 if (!CORBA::is_nil(listener.in())) {
00725 listener->on_publication_matched(
00726 this->dw_local_objref_.in(),
00727 this->publication_match_status_);
00728
00729
00730 this->publication_match_status_.total_count_change = 0;
00731 this->publication_match_status_.current_count_change = 0;
00732 }
00733
00734 this->notify_status_condition();
00735 }
00736 }
00737 }
00738
00739 for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00740 this->disassociate(rds[i]);
00741 }
00742
00743
00744
00745
00746 if (notify_lost && handles.length() > 0) {
00747 this->notify_publication_lost(handles);
00748 }
00749 }
00750
00751 void DataWriterImpl::remove_all_associations()
00752 {
00753 DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00754
00755 this->stop_associating();
00756
00757 OpenDDS::DCPS::ReaderIdSeq readers;
00758 CORBA::ULong size;
00759 CORBA::ULong num_pending_readers;
00760 {
00761 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00762
00763 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00764 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00765 readers.length(size);
00766
00767 RepoIdSet::iterator itEnd = readers_.end();
00768 int i = 0;
00769
00770 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00771 readers[i ++] = *it;
00772 }
00773
00774 itEnd = pending_readers_.end();
00775
00776 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00777 readers[i ++] = *it;
00778 }
00779
00780 if (num_pending_readers > 0) {
00781 ACE_DEBUG((LM_WARNING,
00782 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00783 ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00784 num_pending_readers));
00785 }
00786 }
00787
00788 try {
00789 if (0 < size) {
00790 CORBA::Boolean dont_notify_lost = false;
00791
00792 this->remove_associations(readers, dont_notify_lost);
00793 }
00794
00795 } catch (const CORBA::Exception&) {
00796 ACE_DEBUG((LM_WARNING,
00797 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00798 ACE_TEXT("caught exception from remove_associations.\n")));
00799 }
00800 }
00801
00802 void
00803 DataWriterImpl::register_for_reader(const RepoId& participant,
00804 const RepoId& writerid,
00805 const RepoId& readerid,
00806 const TransportLocatorSeq& locators,
00807 DiscoveryListener* listener)
00808 {
00809 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00810 }
00811
00812 void
00813 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00814 const RepoId& writerid,
00815 const RepoId& readerid)
00816 {
00817 TransportClient::unregister_for_reader(participant, writerid, readerid);
00818 }
00819
00820 void
00821 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00822 {
00823 DDS::DataWriterListener_var listener =
00824 listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00825
00826 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00827
00828 #if 0
00829
00830 if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00831
00832 return;
00833 }
00834
00835 #endif
00836
00837 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00838
00839
00840 offered_incompatible_qos_status_.total_count = status.total_count;
00841 offered_incompatible_qos_status_.total_count_change +=
00842 status.count_since_last_send;
00843 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00844 offered_incompatible_qos_status_.policies = status.policies;
00845
00846 if (!CORBA::is_nil(listener.in())) {
00847 listener->on_offered_incompatible_qos(dw_local_objref_.in(),
00848 offered_incompatible_qos_status_);
00849
00850
00851
00852 offered_incompatible_qos_status_.total_count_change = 0;
00853 }
00854
00855 notify_status_condition();
00856 }
00857
00858 void
00859 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00860 const DDS::StringSeq& params)
00861 {
00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00863 ACE_UNUSED_ARG(readerId);
00864 ACE_UNUSED_ARG(params);
00865 #else
00866 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00867 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00868 RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00869
00870 if (iter != reader_info_.end()) {
00871 iter->second.expression_params_ = params;
00872
00873 } else if (DCPS_debug_level > 4 &&
00874 TheServiceParticipant->publisher_content_filter()) {
00875 GuidConverter pubConv(this->publication_id_), subConv(readerId);
00876 ACE_DEBUG((LM_WARNING,
00877 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00878 ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00879 OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00880 }
00881
00882 #endif
00883 }
00884
00885 void
00886 DataWriterImpl::inconsistent_topic()
00887 {
00888 topic_servant_->inconsistent_topic();
00889 }
00890
00891 DDS::ReturnCode_t
00892 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00893 {
00894
00895 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00896 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00897 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00898 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00899 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00900
00901 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00902 if (qos_ == qos)
00903 return DDS::RETCODE_OK;
00904
00905 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00906 return DDS::RETCODE_IMMUTABLE_POLICY;
00907
00908 } else {
00909 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00910 DDS::PublisherQos publisherQos;
00911 this->publisher_servant_->get_qos(publisherQos);
00912 const bool status
00913 = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00914 this->participant_servant_->get_id(),
00915 this->publication_id_,
00916 qos,
00917 publisherQos);
00918
00919 if (!status) {
00920 ACE_ERROR_RETURN((LM_ERROR,
00921 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00922 ACE_TEXT("qos not updated. \n")),
00923 DDS::RETCODE_ERROR);
00924 }
00925 }
00926
00927 if (!(qos_ == qos)) {
00928
00929 if (qos_.deadline.period.sec != qos.deadline.period.sec
00930 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00931 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00932 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00933 this->watchdog_ =
00934 new OfferedDeadlineWatchdog(
00935 this->lock_,
00936 qos.deadline,
00937 this,
00938 this->dw_local_objref_.in(),
00939 this->offered_deadline_missed_status_,
00940 this->last_deadline_missed_total_count_);
00941
00942 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00943 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00944 this->watchdog_->cancel_all();
00945 this->watchdog_->destroy();
00946 this->watchdog_ = 0;
00947
00948 } else {
00949 this->watchdog_->reset_interval(
00950 duration_to_time_value(qos.deadline.period));
00951 }
00952 }
00953
00954 qos_ = qos;
00955 }
00956
00957 return DDS::RETCODE_OK;
00958
00959 } else {
00960 return DDS::RETCODE_INCONSISTENT_POLICY;
00961 }
00962 }
00963
00964 DDS::ReturnCode_t
00965 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00966 {
00967 qos = qos_;
00968 return DDS::RETCODE_OK;
00969 }
00970
00971 DDS::ReturnCode_t
00972 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00973 DDS::StatusMask mask)
00974 {
00975 listener_mask_ = mask;
00976
00977 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00978 return DDS::RETCODE_OK;
00979 }
00980
00981 DDS::DataWriterListener_ptr
00982 DataWriterImpl::get_listener()
00983 {
00984 return DDS::DataWriterListener::_duplicate(listener_.in());
00985 }
00986
00987 DDS::Topic_ptr
00988 DataWriterImpl::get_topic()
00989 {
00990 return DDS::Topic::_duplicate(topic_objref_.in());
00991 }
00992
00993 bool
00994 DataWriterImpl::should_ack() const
00995 {
00996
00997
00998
00999 return this->readers_.size() != 0;
01000 }
01001
01002 DataWriterImpl::AckToken
01003 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
01004 {
01005 if (DCPS_debug_level > 0) {
01006 ACE_DEBUG((LM_DEBUG,
01007 ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
01008 ACE_TEXT("for sequence %q \n"),
01009 this->sequence_number_.getValue()));
01010 }
01011 return AckToken(max_wait, this->sequence_number_);
01012 }
01013
01014 DDS::ReturnCode_t
01015 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01016 {
01017 if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01018 return DDS::RETCODE_OK;
01019 DataWriterImpl::AckToken token = create_ack_token(max_wait);
01020 if (DCPS_debug_level) {
01021 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01022 ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01023 token.sequence_.getValue()));
01024 }
01025 return wait_for_specific_ack(token);
01026 }
01027
01028 DDS::ReturnCode_t
01029 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01030 {
01031 return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01032 }
01033
01034 DDS::Publisher_ptr
01035 DataWriterImpl::get_publisher()
01036 {
01037 return DDS::Publisher::_duplicate(publisher_servant_);
01038 }
01039
01040 DDS::ReturnCode_t
01041 DataWriterImpl::get_liveliness_lost_status(
01042 DDS::LivelinessLostStatus & status)
01043 {
01044 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01045 guard,
01046 this->lock_,
01047 DDS::RETCODE_ERROR);
01048 set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01049 status = liveliness_lost_status_;
01050 liveliness_lost_status_.total_count_change = 0;
01051 return DDS::RETCODE_OK;
01052 }
01053
01054 DDS::ReturnCode_t
01055 DataWriterImpl::get_offered_deadline_missed_status(
01056 DDS::OfferedDeadlineMissedStatus & status)
01057 {
01058 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01059 guard,
01060 this->lock_,
01061 DDS::RETCODE_ERROR);
01062
01063 set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01064
01065 this->offered_deadline_missed_status_.total_count_change =
01066 this->offered_deadline_missed_status_.total_count
01067 - this->last_deadline_missed_total_count_;
01068
01069
01070 this->last_deadline_missed_total_count_ =
01071 this->offered_deadline_missed_status_.total_count;
01072
01073 status = offered_deadline_missed_status_;
01074
01075 this->offered_deadline_missed_status_.total_count_change = 0;
01076
01077 return DDS::RETCODE_OK;
01078 }
01079
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_incompatible_qos_status(
01082 DDS::OfferedIncompatibleQosStatus & status)
01083 {
01084 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085 guard,
01086 this->lock_,
01087 DDS::RETCODE_ERROR);
01088 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01089 status = offered_incompatible_qos_status_;
01090 offered_incompatible_qos_status_.total_count_change = 0;
01091 return DDS::RETCODE_OK;
01092 }
01093
01094 DDS::ReturnCode_t
01095 DataWriterImpl::get_publication_matched_status(
01096 DDS::PublicationMatchedStatus & status)
01097 {
01098 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01099 guard,
01100 this->lock_,
01101 DDS::RETCODE_ERROR);
01102 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01103 status = publication_match_status_;
01104 publication_match_status_.total_count_change = 0;
01105 publication_match_status_.current_count_change = 0;
01106 return DDS::RETCODE_OK;
01107 }
01108
01109 DDS::ReturnCode_t
01110 DataWriterImpl::assert_liveliness()
01111 {
01112 switch (this->qos_.liveliness.kind) {
01113 case DDS::AUTOMATIC_LIVELINESS_QOS:
01114
01115 break;
01116 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01117 return participant_servant_->assert_liveliness();
01118 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01119 if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01120 return DDS::RETCODE_ERROR;
01121 }
01122 break;
01123 }
01124
01125 return DDS::RETCODE_OK;
01126 }
01127
01128 DDS::ReturnCode_t
01129 DataWriterImpl::assert_liveliness_by_participant()
01130 {
01131
01132
01133 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01134
01135 liveliness_asserted_ = true;
01136 }
01137
01138 return DDS::RETCODE_OK;
01139 }
01140
01141 ACE_Time_Value
01142 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01143 {
01144 if (this->qos_.liveliness.kind == kind) {
01145 return liveliness_check_interval_;
01146 } else {
01147 return ACE_Time_Value::max_time;
01148 }
01149 }
01150
01151 bool
01152 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01153 {
01154 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01155 return last_liveliness_activity_time_ > tv;
01156 } else {
01157 return false;
01158 }
01159 }
01160
01161 DDS::ReturnCode_t
01162 DataWriterImpl::get_matched_subscriptions(
01163 DDS::InstanceHandleSeq & subscription_handles)
01164 {
01165 if (enabled_ == false) {
01166 ACE_ERROR_RETURN((LM_ERROR,
01167 ACE_TEXT("(%P|%t) ERROR: ")
01168 ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01169 ACE_TEXT(" Entity is not enabled. \n")),
01170 DDS::RETCODE_NOT_ENABLED);
01171 }
01172
01173 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01174 guard,
01175 this->lock_,
01176 DDS::RETCODE_ERROR);
01177
01178
01179 int index = 0;
01180 subscription_handles.length(
01181 static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01182
01183 for (RepoIdToHandleMap::iterator
01184 current = this->id_to_handle_map_.begin();
01185 current != this->id_to_handle_map_.end();
01186 ++current, ++index) {
01187 subscription_handles[index] = current->second;
01188 }
01189
01190 return DDS::RETCODE_OK;
01191 }
01192
01193 #if !defined (DDS_HAS_MINIMUM_BIT)
01194 DDS::ReturnCode_t
01195 DataWriterImpl::get_matched_subscription_data(
01196 DDS::SubscriptionBuiltinTopicData & subscription_data,
01197 DDS::InstanceHandle_t subscription_handle)
01198 {
01199 if (enabled_ == false) {
01200 ACE_ERROR_RETURN((LM_ERROR,
01201 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01202 ACE_TEXT("get_matched_subscription_data: ")
01203 ACE_TEXT("Entity is not enabled. \n")),
01204 DDS::RETCODE_NOT_ENABLED);
01205 }
01206
01207 BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader,
01208 DDS::SubscriptionBuiltinTopicDataDataReader_var,
01209 DDS::SubscriptionBuiltinTopicDataSeq > hh;
01210
01211 DDS::SubscriptionBuiltinTopicDataSeq data;
01212
01213 DDS::ReturnCode_t ret =
01214 hh.instance_handle_to_bit_data(participant_servant_,
01215 BUILT_IN_SUBSCRIPTION_TOPIC,
01216 subscription_handle,
01217 data);
01218
01219 if (ret == DDS::RETCODE_OK) {
01220 subscription_data = data[0];
01221 }
01222
01223 return ret;
01224 }
01225 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01226
01227 DDS::ReturnCode_t
01228 DataWriterImpl::enable()
01229 {
01230
01231
01232
01233
01234
01235
01236 if (this->is_enabled()) {
01237 return DDS::RETCODE_OK;
01238 }
01239
01240 if (this->publisher_servant_->is_enabled() == false) {
01241 return DDS::RETCODE_PRECONDITION_NOT_MET;
01242 }
01243
01244
01245
01246
01247
01248
01249
01250 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01251
01252 CORBA::Long const depth =
01253 get_instance_sample_list_depth(
01254 qos_.history.kind,
01255 qos_.history.depth,
01256 qos_.resource_limits.max_samples_per_instance);
01257
01258 bool resource_blocking = false;
01259
01260 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01261 n_chunks_ = qos_.resource_limits.max_samples;
01262
01263 if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) {
01264 resource_blocking = true;
01265
01266 } else {
01267 resource_blocking =
01268 (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01269 || (qos_.resource_limits.max_samples <
01270 (qos_.resource_limits.max_instances * depth));
01271 }
01272 }
01273
01274
01275
01276
01277 this->enable_specific();
01278
01279 CORBA::Long max_instances = 0;
01280
01281 if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01282 max_instances = qos_.resource_limits.max_instances;
01283
01284 CORBA::Long max_total_samples = 0;
01285
01286 if (reliable && resource_blocking)
01287 max_total_samples = qos_.resource_limits.max_samples;
01288
01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01290
01291
01292 DataDurabilityCache* const durability_cache =
01293 TheServiceParticipant->get_data_durability_cache(qos_.durability);
01294 #endif
01295
01296
01297
01298 data_container_ = new WriteDataContainer(this,
01299 depth,
01300 qos_.reliability.max_blocking_time,
01301 n_chunks_,
01302 domain_id_,
01303 get_topic_name(),
01304 get_type_name(),
01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01306 durability_cache,
01307 qos_.durability_service,
01308 #endif
01309 max_instances,
01310 max_total_samples);
01311
01312
01313
01314 mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_);
01315 db_allocator_ = new DataBlockAllocator(n_chunks_+1);
01316 header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1);
01317
01318 if (DCPS_debug_level >= 2) {
01319 ACE_DEBUG((LM_DEBUG,
01320 "(%P|%t) DataWriterImpl::enable-mb"
01321 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01322 mb_allocator_,
01323 n_chunks_));
01324
01325 ACE_DEBUG((LM_DEBUG,
01326 "(%P|%t) DataWriterImpl::enable-db"
01327 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01328 db_allocator_,
01329 n_chunks_));
01330
01331 ACE_DEBUG((LM_DEBUG,
01332 "(%P|%t) DataWriterImpl::enable-header"
01333 " Cached_Allocator_With_Overflow %x with %d chunks\n",
01334 header_allocator_,
01335 n_chunks_));
01336 }
01337
01338 if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01339 qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01340 liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01341 liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01342
01343 if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01344 liveliness_check_interval_ = ACE_Time_Value (0, 1);
01345 }
01346
01347 if (reactor_->schedule_timer(this,
01348 0,
01349 liveliness_check_interval_,
01350 liveliness_check_interval_) == -1) {
01351 ACE_ERROR((LM_ERROR,
01352 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01353 ACE_TEXT("schedule_timer")));
01354
01355 } else {
01356 cancel_timer_ = true;
01357 this->_add_ref();
01358 }
01359 }
01360
01361 participant_servant_->add_adjust_liveliness_timers(this);
01362
01363
01364
01365 DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01366
01367 if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01368 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01369 this->watchdog_ = new OfferedDeadlineWatchdog(
01370 this->lock_,
01371 this->qos_.deadline,
01372 this,
01373 this->dw_local_objref_.in(),
01374 this->offered_deadline_missed_status_,
01375 this->last_deadline_missed_total_count_);
01376 }
01377
01378 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01379 disco->pre_writer(this);
01380
01381 this->set_enabled();
01382
01383 try {
01384 this->enable_transport(reliable,
01385 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01386
01387 } catch (const Transport::Exception&) {
01388 ACE_ERROR((LM_ERROR,
01389 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01390 ACE_TEXT("Transport Exception.\n")));
01391 return DDS::RETCODE_ERROR;
01392
01393 }
01394
01395 const TransportLocatorSeq& trans_conf_info = connection_info();
01396
01397 DDS::PublisherQos pub_qos;
01398 this->publisher_servant_->get_qos(pub_qos);
01399
01400 this->publication_id_ =
01401 disco->add_publication(this->domain_id_,
01402 this->participant_servant_->get_id(),
01403 this->topic_servant_->get_id(),
01404 this,
01405 this->qos_,
01406 trans_conf_info,
01407 pub_qos);
01408
01409 if (this->publication_id_ == GUID_UNKNOWN) {
01410 ACE_ERROR((LM_ERROR,
01411 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01412 ACE_TEXT("add_publication returned invalid id. \n")));
01413 return DDS::RETCODE_ERROR;
01414 }
01415
01416 this->data_container_->publication_id_ = this->publication_id_;
01417
01418 const DDS::ReturnCode_t writer_enabled_result =
01419 publisher_servant_->writer_enabled(topic_name_.in(), this);
01420
01421 if (this->monitor_) {
01422 this->monitor_->report();
01423 }
01424
01425 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01426
01427
01428
01429 if (durability_cache != 0) {
01430
01431 if (!durability_cache->get_data(this->domain_id_,
01432 get_topic_name(),
01433 get_type_name(),
01434 this,
01435 this->mb_allocator_,
01436 this->db_allocator_,
01437 this->qos_.lifespan)) {
01438 ACE_ERROR((LM_ERROR,
01439 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01440 ACE_TEXT("unable to retrieve durable data\n")));
01441 }
01442 }
01443
01444 #endif
01445
01446 return writer_enabled_result;
01447 }
01448
01449 void
01450 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01451 {
01452 DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01453
01454 SendStateDataSampleList list;
01455
01456 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01457
01458 controlTracker.message_sent();
01459
01460
01461 guard.release();
01462
01463 this->send(list, transaction_id);
01464 }
01465
01466 DDS::ReturnCode_t
01467 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01468 DataSample* data,
01469 const DDS::Time_t& source_timestamp)
01470 {
01471 DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01472
01473 if (enabled_ == false) {
01474 ACE_ERROR_RETURN((LM_ERROR,
01475 ACE_TEXT("(%P|%t) ERROR: ")
01476 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01477 ACE_TEXT(" Entity is not enabled. \n")),
01478 DDS::RETCODE_NOT_ENABLED);
01479 }
01480
01481 DDS::ReturnCode_t ret =
01482 this->data_container_->register_instance(handle, data);
01483
01484 if (ret != DDS::RETCODE_OK) {
01485 ACE_ERROR_RETURN((LM_ERROR,
01486 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01487 ACE_TEXT("register instance with container failed.\n")),
01488 ret);
01489 }
01490
01491 if (this->monitor_) {
01492 this->monitor_->report();
01493 }
01494
01495 DataSampleElement* element = 0;
01496 ret = this->data_container_->obtain_buffer_for_control(element);
01497
01498 if (ret != DDS::RETCODE_OK) {
01499 ACE_ERROR_RETURN((LM_ERROR,
01500 ACE_TEXT("(%P|%t) ERROR: ")
01501 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01502 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01503 ret),
01504 ret);
01505 }
01506
01507
01508 element->set_sample(create_control_message(INSTANCE_REGISTRATION,
01509 element->get_header(),
01510 data,
01511 source_timestamp));
01512
01513 ret = this->data_container_->enqueue_control(element);
01514
01515 if (ret != DDS::RETCODE_OK) {
01516 ACE_ERROR_RETURN((LM_ERROR,
01517 ACE_TEXT("(%P|%t) ERROR: ")
01518 ACE_TEXT("DataWriterImpl::register_instance_i: ")
01519 ACE_TEXT("enqueue_control failed.\n")),
01520 ret);
01521 }
01522
01523 return ret;
01524 }
01525
01526 DDS::ReturnCode_t
01527 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01528 DataSample* data,
01529 const DDS::Time_t & source_timestamp)
01530 {
01531 DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01532
01533 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01534 guard,
01535 get_lock(),
01536 ::DDS::RETCODE_ERROR);
01537
01538 DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp);
01539 if (ret != DDS::RETCODE_OK) {
01540 ACE_ERROR_RETURN((LM_ERROR,
01541 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01542 ACE_TEXT("register instance with container failed.\n")),
01543 ret);
01544 }
01545
01546 send_all_to_flush_control(guard);
01547
01548 return ret;
01549 }
01550
01551 DDS::ReturnCode_t
01552 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01553 const DDS::Time_t& source_timestamp)
01554 {
01555 DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01556
01557 if (enabled_ == false) {
01558 ACE_ERROR_RETURN((LM_ERROR,
01559 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01560 ACE_TEXT(" Entity is not enabled.\n")),
01561 DDS::RETCODE_NOT_ENABLED);
01562 }
01563
01564
01565
01566 if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01567 return this->dispose_and_unregister(handle, source_timestamp);
01568 }
01569
01570 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01571 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01572 DataSample* unregistered_sample_data = 0;
01573 ret = this->data_container_->unregister(handle, unregistered_sample_data);
01574
01575 if (ret != DDS::RETCODE_OK) {
01576 ACE_ERROR_RETURN((LM_ERROR,
01577 ACE_TEXT("(%P|%t) ERROR: ")
01578 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01579 ACE_TEXT(" unregister with container failed. \n")),
01580 ret);
01581 }
01582
01583 DataSampleElement* element = 0;
01584 ret = this->data_container_->obtain_buffer_for_control(element);
01585
01586 if (ret != DDS::RETCODE_OK) {
01587 ACE_ERROR_RETURN((LM_ERROR,
01588 ACE_TEXT("(%P|%t) ERROR: ")
01589 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01590 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01591 ret),
01592 ret);
01593 }
01594
01595 element->set_sample(create_control_message(UNREGISTER_INSTANCE,
01596 element->get_header(),
01597 unregistered_sample_data,
01598 source_timestamp));
01599 ret = this->data_container_->enqueue_control(element);
01600
01601 if (ret != DDS::RETCODE_OK) {
01602 ACE_ERROR_RETURN((LM_ERROR,
01603 ACE_TEXT("(%P|%t) ERROR: ")
01604 ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01605 ACE_TEXT("enqueue_control failed.\n")),
01606 ret);
01607 }
01608
01609 send_all_to_flush_control(guard);
01610 return DDS::RETCODE_OK;
01611 }
01612
01613 DDS::ReturnCode_t
01614 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01615 const DDS::Time_t& source_timestamp)
01616 {
01617 DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01618
01619 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01620 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01621
01622 DataSample* data_sample = 0;
01623 ret = this->data_container_->dispose(handle, data_sample);
01624
01625 if (ret != DDS::RETCODE_OK) {
01626 ACE_ERROR_RETURN((LM_ERROR,
01627 ACE_TEXT("(%P|%t) ERROR: ")
01628 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01629 ACE_TEXT("dispose on container failed. \n")),
01630 ret);
01631 }
01632
01633 ret = this->data_container_->unregister(handle, data_sample, false);
01634
01635 if (ret != DDS::RETCODE_OK) {
01636 ACE_ERROR_RETURN((LM_ERROR,
01637 ACE_TEXT("(%P|%t) ERROR: ")
01638 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01639 ACE_TEXT("unregister with container failed. \n")),
01640 ret);
01641 }
01642
01643 DataSampleElement* element = 0;
01644 ret = this->data_container_->obtain_buffer_for_control(element);
01645
01646 if (ret != DDS::RETCODE_OK) {
01647 ACE_ERROR_RETURN((LM_ERROR,
01648 ACE_TEXT("(%P|%t) ERROR: ")
01649 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01650 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01651 ret),
01652 ret);
01653 }
01654
01655 element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01656 element->get_header(),
01657 data_sample,
01658 source_timestamp));
01659
01660 ret = this->data_container_->enqueue_control(element);
01661
01662 if (ret != DDS::RETCODE_OK) {
01663 ACE_ERROR_RETURN((LM_ERROR,
01664 ACE_TEXT("(%P|%t) ERROR: ")
01665 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01666 ACE_TEXT("enqueue_control failed.\n")),
01667 ret);
01668 }
01669
01670 send_all_to_flush_control(guard);
01671 return DDS::RETCODE_OK;
01672 }
01673
01674 void
01675 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01676 {
01677 {
01678 ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01679
01680 PublicationInstanceMapType::iterator it =
01681 this->data_container_->instances_.begin();
01682
01683 while (it != this->data_container_->instances_.end()) {
01684 DDS::InstanceHandle_t handle = it->first;
01685 ++it;
01686
01687 this->unregister_instance_i(handle, source_timestamp);
01688 }
01689 }
01690 }
01691
01692 DDS::ReturnCode_t
01693 DataWriterImpl::write(DataSample* data,
01694 DDS::InstanceHandle_t handle,
01695 const DDS::Time_t& source_timestamp,
01696 GUIDSeq* filter_out)
01697 {
01698 DBG_ENTRY_LVL("DataWriterImpl","write",6);
01699
01700 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01701 guard,
01702 get_lock (),
01703 ::DDS::RETCODE_ERROR);
01704
01705
01706 GUIDSeq_var filter_out_var(filter_out);
01707
01708 if (enabled_ == false) {
01709 ACE_ERROR_RETURN((LM_ERROR,
01710 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01711 ACE_TEXT(" Entity is not enabled. \n")),
01712 DDS::RETCODE_NOT_ENABLED);
01713 }
01714
01715 DataSampleElement* element = 0;
01716 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01717
01718 if (ret == DDS::RETCODE_TIMEOUT) {
01719 return ret;
01720
01721 } else if (ret != DDS::RETCODE_OK) {
01722 ACE_ERROR_RETURN((LM_ERROR,
01723 ACE_TEXT("(%P|%t) ERROR: ")
01724 ACE_TEXT("DataWriterImpl::write: ")
01725 ACE_TEXT("obtain_buffer returned %d.\n"),
01726 ret),
01727 ret);
01728 }
01729
01730 DataSample* temp;
01731 ret = create_sample_data_message(data,
01732 handle,
01733 element->get_header(),
01734 temp,
01735 source_timestamp,
01736 (filter_out != 0));
01737 element->set_sample(temp);
01738
01739 if (ret != DDS::RETCODE_OK) {
01740 return ret;
01741 }
01742
01743 element->set_filter_out(filter_out_var._retn());
01744
01745 ret = this->data_container_->enqueue(element, handle);
01746
01747 if (ret != DDS::RETCODE_OK) {
01748 ACE_ERROR_RETURN((LM_ERROR,
01749 ACE_TEXT("(%P|%t) ERROR: ")
01750 ACE_TEXT("DataWriterImpl::write: ")
01751 ACE_TEXT("enqueue failed.\n")),
01752 ret);
01753 }
01754 this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01755
01756 track_sequence_number(filter_out);
01757
01758 if (this->coherent_) {
01759 ++this->coherent_samples_;
01760 }
01761 SendStateDataSampleList list;
01762
01763 ACE_UINT64 transaction_id = this->get_unsent_data(list);
01764
01765 if (this->publisher_servant_->is_suspended()) {
01766 if (min_suspended_transaction_id_ == 0) {
01767
01768
01769 min_suspended_transaction_id_ = transaction_id;
01770 } else {
01771
01772
01773 max_suspended_transaction_id_ = transaction_id;
01774 }
01775 this->available_data_list_.enqueue_tail(list);
01776
01777 } else {
01778 guard.release();
01779
01780 this->send(list, transaction_id);
01781 }
01782
01783 return DDS::RETCODE_OK;
01784 }
01785
01786 void
01787 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01788 {
01789 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01790
01791 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01792
01793 RepoIdSet excluded;
01794
01795 if (filter_out && !reader_info_.empty()) {
01796 const GUID_t* buf = filter_out->get_buffer();
01797 excluded.insert(buf, buf + filter_out->length());
01798 }
01799
01800 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01801 end = reader_info_.end(); iter != end; ++iter) {
01802
01803 if (excluded.count(iter->first) == 0) {
01804 iter->second.expected_sequence_ = sequence_number_;
01805 }
01806 }
01807
01808 #else
01809 ACE_UNUSED_ARG(filter_out);
01810 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01811 end = reader_info_.end(); iter != end; ++iter) {
01812 iter->second.expected_sequence_ = sequence_number_;
01813 }
01814
01815 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01816
01817 }
01818
01819 void
01820 DataWriterImpl::send_suspended_data()
01821 {
01822
01823
01824 if (max_suspended_transaction_id_ != 0) {
01825 this->send(this->available_data_list_, max_suspended_transaction_id_);
01826 max_suspended_transaction_id_ = 0;
01827 }
01828
01829
01830
01831
01832
01833 this->send(this->available_data_list_, min_suspended_transaction_id_);
01834 min_suspended_transaction_id_ = 0;
01835 this->available_data_list_.reset();
01836 }
01837
01838 DDS::ReturnCode_t
01839 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01840 const DDS::Time_t & source_timestamp)
01841 {
01842 DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01843
01844 if (enabled_ == false) {
01845 ACE_ERROR_RETURN((LM_ERROR,
01846 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01847 ACE_TEXT(" Entity is not enabled. \n")),
01848 DDS::RETCODE_NOT_ENABLED);
01849 }
01850
01851 DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR;
01852
01853 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01854
01855 DataSample* registered_sample_data = 0;
01856 ret = this->data_container_->dispose(handle, registered_sample_data);
01857
01858 if (ret != DDS::RETCODE_OK) {
01859 ACE_ERROR_RETURN((LM_ERROR,
01860 ACE_TEXT("(%P|%t) ERROR: ")
01861 ACE_TEXT("DataWriterImpl::dispose: ")
01862 ACE_TEXT("dispose failed.\n")),
01863 ret);
01864 }
01865
01866 DataSampleElement* element = 0;
01867 ret = this->data_container_->obtain_buffer_for_control(element);
01868
01869 if (ret != DDS::RETCODE_OK) {
01870 ACE_ERROR_RETURN((LM_ERROR,
01871 ACE_TEXT("(%P|%t) ERROR: ")
01872 ACE_TEXT("DataWriterImpl::dispose: ")
01873 ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01874 ret),
01875 ret);
01876 }
01877
01878 element->set_sample(create_control_message(DISPOSE_INSTANCE,
01879 element->get_header(),
01880 registered_sample_data,
01881 source_timestamp));
01882 ret = this->data_container_->enqueue_control(element);
01883
01884 if (ret != DDS::RETCODE_OK) {
01885 ACE_ERROR_RETURN((LM_ERROR,
01886 ACE_TEXT("(%P|%t) ERROR: ")
01887 ACE_TEXT("DataWriterImpl::dispose: ")
01888 ACE_TEXT("enqueue_control failed.\n")),
01889 ret);
01890 }
01891
01892 send_all_to_flush_control(guard);
01893
01894 return DDS::RETCODE_OK;
01895 }
01896
01897 DDS::ReturnCode_t
01898 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01899 size_t& size)
01900 {
01901 return data_container_->num_samples(handle, size);
01902 }
01903
01904 void
01905 DataWriterImpl::unregister_all()
01906 {
01907 if (cancel_timer_) {
01908
01909 (void) reactor_->cancel_timer(this, 0);
01910 cancel_timer_ = false;
01911 }
01912
01913 data_container_->unregister_all();
01914 }
01915
01916 RepoId
01917 DataWriterImpl::get_publication_id()
01918 {
01919 return publication_id_;
01920 }
01921
01922 RepoId
01923 DataWriterImpl::get_dp_id()
01924 {
01925 return participant_servant_->get_id();
01926 }
01927
01928 const char*
01929 DataWriterImpl::get_topic_name()
01930 {
01931 return topic_name_.in();
01932 }
01933
01934 char const *
01935 DataWriterImpl::get_type_name() const
01936 {
01937 return type_name_.in();
01938 }
01939
01940 ACE_Message_Block*
01941 DataWriterImpl::create_control_message(MessageId message_id,
01942 DataSampleHeader& header_data,
01943 ACE_Message_Block* data,
01944 const DDS::Time_t& source_timestamp)
01945 {
01946 header_data.message_id_ = message_id;
01947 header_data.byte_order_ =
01948 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01949 header_data.coherent_change_ = 0;
01950
01951 if (data) {
01952 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01953
01954 if (header_data.message_length_ == 0) {
01955 data->release();
01956 }
01957 }
01958
01959 header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01960 header_data.sequence_repair_ = false;
01961 header_data.source_timestamp_sec_ = source_timestamp.sec;
01962 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01963 header_data.publication_id_ = publication_id_;
01964 header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01965
01966 if (message_id == INSTANCE_REGISTRATION
01967 || message_id == DISPOSE_INSTANCE
01968 || message_id == UNREGISTER_INSTANCE
01969 || message_id == DISPOSE_UNREGISTER_INSTANCE) {
01970
01971 header_data.sequence_repair_ = need_sequence_repair();
01972
01973
01974
01975 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01976 this->sequence_number_ = SequenceNumber();
01977
01978 } else {
01979 ++this->sequence_number_;
01980 }
01981
01982 header_data.sequence_ = this->sequence_number_;
01983 header_data.key_fields_only_ = true;
01984 }
01985
01986 ACE_Message_Block* message = 0;
01987 ACE_NEW_MALLOC_RETURN(message,
01988 static_cast<ACE_Message_Block*>(
01989 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01990 ACE_Message_Block(
01991 DataSampleHeader::max_marshaled_size(),
01992 ACE_Message_Block::MB_DATA,
01993 header_data.message_length_ ? data : 0,
01994 0,
01995 0,
01996 get_db_lock(),
01997 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01998 ACE_Time_Value::zero,
01999 ACE_Time_Value::max_time,
02000 db_allocator_,
02001 mb_allocator_),
02002 0);
02003
02004 *message << header_data;
02005
02006
02007 if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02008 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02009
02010 RepoIdToReaderInfoMap::iterator reader;
02011
02012 for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02013 reader->second.expected_sequence_ = sequence_number_;
02014 }
02015 }
02016
02017 return message;
02018 }
02019
02020 DDS::ReturnCode_t
02021 DataWriterImpl::create_sample_data_message(DataSample* data,
02022 DDS::InstanceHandle_t instance_handle,
02023 DataSampleHeader& header_data,
02024 ACE_Message_Block*& message,
02025 const DDS::Time_t& source_timestamp,
02026 bool content_filter)
02027 {
02028 PublicationInstance* const instance =
02029 data_container_->get_handle_instance(instance_handle);
02030
02031 if (0 == instance) {
02032 ACE_ERROR_RETURN((LM_ERROR,
02033 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02034 ACE_TEXT("failed to find instance for handle %d\n"),
02035 instance_handle),
02036 DDS::RETCODE_ERROR);
02037 }
02038
02039 header_data.message_id_ = SAMPLE_DATA;
02040 header_data.byte_order_ =
02041 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02042 header_data.coherent_change_ = this->coherent_;
02043 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02044 header_data.group_coherent_ =
02045 this->publisher_servant_->qos_.presentation.access_scope
02046 == DDS::GROUP_PRESENTATION_QOS;
02047 #endif
02048 header_data.content_filter_ = content_filter;
02049 header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02050 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02051 header_data.sequence_repair_ = need_sequence_repair();
02052
02053 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02054 this->sequence_number_ = SequenceNumber();
02055
02056 } else {
02057 ++this->sequence_number_;
02058 }
02059
02060 header_data.sequence_ = this->sequence_number_;
02061 header_data.source_timestamp_sec_ = source_timestamp.sec;
02062 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02063
02064 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02065 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02066 header_data.lifespan_duration_ = true;
02067 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02068 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02069 }
02070
02071 header_data.publication_id_ = publication_id_;
02072 header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
02073 size_t max_marshaled_size = header_data.max_marshaled_size();
02074
02075 ACE_NEW_MALLOC_RETURN(message,
02076 static_cast<ACE_Message_Block*>(
02077 mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02078 ACE_Message_Block(max_marshaled_size,
02079 ACE_Message_Block::MB_DATA,
02080 data,
02081 0,
02082 header_allocator_,
02083 get_db_lock(),
02084 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02085 ACE_Time_Value::zero,
02086 ACE_Time_Value::max_time,
02087 db_allocator_,
02088 mb_allocator_),
02089 DDS::RETCODE_ERROR);
02090
02091 *message << header_data;
02092 return DDS::RETCODE_OK;
02093 }
02094
02095 void
02096 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02097 {
02098 DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02099
02100 if (!(sample->get_pub_id() == this->publication_id_)) {
02101 GuidConverter sample_converter(sample->get_pub_id());
02102 GuidConverter writer_converter(publication_id_);
02103 ACE_ERROR((LM_ERROR,
02104 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02105 ACE_TEXT(" The publication id %C from delivered element ")
02106 ACE_TEXT("does not match the datawriter's id %C\n"),
02107 OPENDDS_STRING(sample_converter).c_str(),
02108 OPENDDS_STRING(writer_converter).c_str()));
02109 return;
02110 }
02111 this->data_container_->data_delivered(sample);
02112
02113 ++data_delivered_count_;
02114 }
02115
02116 void
02117 DataWriterImpl::control_delivered(ACE_Message_Block* sample)
02118 {
02119 DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02120 sample->release();
02121 controlTracker.message_delivered();
02122 }
02123
02124 EntityImpl*
02125 DataWriterImpl::parent() const
02126 {
02127 return this->publisher_servant_;
02128 }
02129
02130 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02131 bool
02132 DataWriterImpl::filter_out(const DataSampleElement& elt,
02133 const OPENDDS_STRING& filterClassName,
02134 const FilterEvaluator& evaluator,
02135 const DDS::StringSeq& expression_params) const
02136 {
02137 TypeSupportImpl* const typesupport =
02138 dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02139
02140 if (!typesupport) {
02141 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02142 return false;
02143 }
02144
02145 if (filterClassName == "DDSSQL" ||
02146 filterClassName == "OPENDDSSQL") {
02147 return !evaluator.eval(elt.get_sample()->cont(),
02148 elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02149 elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02150 expression_params);
02151 }
02152 else {
02153 return false;
02154 }
02155 }
02156 #endif
02157
02158 bool
02159 DataWriterImpl::check_transport_qos(const TransportInst&)
02160 {
02161
02162
02163 return true;
02164 }
02165
02166 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02167
02168 bool
02169 DataWriterImpl::coherent_changes_pending()
02170 {
02171 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02172 guard,
02173 get_lock(),
02174 false);
02175
02176 return this->coherent_;
02177 }
02178
02179 void
02180 DataWriterImpl::begin_coherent_changes()
02181 {
02182 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02183 guard,
02184 get_lock());
02185
02186 this->coherent_ = true;
02187 }
02188
02189 void
02190 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02191 {
02192
02193 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02194 guard,
02195 get_lock());
02196
02197 CoherentChangeControl end_msg;
02198 end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02199 end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02200 end_msg.group_coherent_
02201 = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02202
02203 if (end_msg.group_coherent_) {
02204 end_msg.publisher_id_ = this->publisher_servant_->publisher_id_;
02205 end_msg.group_coherent_samples_ = group_samples;
02206 }
02207
02208 ACE_Message_Block* data = 0;
02209 size_t max_marshaled_size = end_msg.max_marshaled_size();
02210
02211 ACE_NEW(data, ACE_Message_Block(max_marshaled_size,
02212 ACE_Message_Block::MB_DATA,
02213 0,
02214 0,
02215 0,
02216 get_db_lock()));
02217
02218 Serializer serializer(
02219 data,
02220 this->swap_bytes());
02221
02222 serializer << end_msg;
02223
02224 DDS::Time_t source_timestamp =
02225 time_value_to_time(ACE_OS::gettimeofday());
02226
02227 DataSampleHeader header;
02228 ACE_Message_Block* control =
02229 create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp);
02230
02231
02232 this->coherent_ = false;
02233 this->coherent_samples_ = 0;
02234
02235 guard.release();
02236 if (this->send_control(header, control) == SEND_CONTROL_ERROR) {
02237 ACE_ERROR((LM_ERROR,
02238 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02239 ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02240 }
02241 }
02242
02243 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02244
02245 void
02246 DataWriterImpl::data_dropped(const DataSampleElement* element,
02247 bool dropped_by_transport)
02248 {
02249 DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02250
02251 this->data_container_->data_dropped(element, dropped_by_transport);
02252
02253 ++data_dropped_count_;
02254 }
02255
02256 void
02257 DataWriterImpl::control_dropped(ACE_Message_Block* sample,
02258 bool )
02259 {
02260 DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02261 sample->release();
02262 controlTracker.message_dropped();
02263 }
02264
02265 DDS::DataWriterListener_ptr
02266 DataWriterImpl::listener_for(DDS::StatusKind kind)
02267 {
02268
02269
02270
02271 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02272 return publisher_servant_->listener_for(kind);
02273
02274 } else {
02275 return DDS::DataWriterListener::_duplicate(listener_.in());
02276 }
02277 }
02278
02279 int
02280 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02281 const void * )
02282 {
02283 const ACE_Time_Value delta = tv - last_liveliness_check_time_;
02284 if (delta < liveliness_check_interval_) {
02285
02286 if (reactor_->cancel_timer(this) == -1) {
02287 ACE_ERROR((LM_ERROR,
02288 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02289 ACE_TEXT("cancel_timer")));
02290 }
02291 if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) {
02292 ACE_ERROR((LM_ERROR,
02293 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02294 ACE_TEXT("schedule_timer")));
02295 }
02296 return 0;
02297 }
02298
02299 bool liveliness_lost = false;
02300
02301 ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02302
02303
02304 if (elapsed >= liveliness_check_interval_) {
02305 switch (this->qos_.liveliness.kind) {
02306 case DDS::AUTOMATIC_LIVELINESS_QOS:
02307 if (this->send_liveliness(tv) == false) {
02308 liveliness_lost = true;
02309 }
02310 break;
02311
02312 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02313 if (liveliness_asserted_) {
02314 if (this->send_liveliness(tv) == false) {
02315 liveliness_lost = true;
02316 }
02317 }
02318 break;
02319
02320 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02321
02322 break;
02323 }
02324 }
02325
02326 liveliness_asserted_ = false;
02327 last_liveliness_check_time_ = tv;
02328 elapsed = tv - last_liveliness_activity_time_;
02329
02330
02331 if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02332 liveliness_lost = true;
02333 }
02334
02335 if (!this->liveliness_lost_ && liveliness_lost) {
02336 ++ this->liveliness_lost_status_.total_count;
02337 ++ this->liveliness_lost_status_.total_count_change;
02338
02339 DDS::DataWriterListener_var listener =
02340 listener_for(DDS::LIVELINESS_LOST_STATUS);
02341
02342 if (!CORBA::is_nil(listener.in())) {
02343 listener->on_liveliness_lost(this->dw_local_objref_.in(),
02344 this->liveliness_lost_status_);
02345 }
02346 }
02347
02348 this->liveliness_lost_ = liveliness_lost;
02349 return 0;
02350 }
02351
02352 int
02353 DataWriterImpl::handle_close(ACE_HANDLE,
02354 ACE_Reactor_Mask)
02355 {
02356 this->_remove_ref();
02357 return 0;
02358 }
02359
02360 bool
02361 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02362 {
02363 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02364 !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02365 DDS::Time_t t = time_value_to_time(now);
02366 DataSampleHeader header;
02367 ACE_Message_Block* liveliness_msg =
02368 this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t);
02369
02370 if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) {
02371 ACE_ERROR_RETURN((LM_ERROR,
02372 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02373 ACE_TEXT(" send_control failed. \n")),
02374 false);
02375
02376 } else {
02377 last_liveliness_activity_time_ = now;
02378 return true;
02379 }
02380 } else {
02381 last_liveliness_activity_time_ = now;
02382 return true;
02383 }
02384 }
02385
02386 void
02387 DataWriterImpl::prepare_to_delete()
02388 {
02389 this->set_deleted(true);
02390 this->stop_associating();
02391 }
02392
02393 PublicationInstance*
02394 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02395 {
02396 PublicationInstance* instance = 0;
02397
02398 if (0 != data_container_) {
02399 instance = data_container_->get_handle_instance(handle);
02400 }
02401
02402 return instance;
02403 }
02404
02405 void
02406 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02407 {
02408 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02409
02410 if (!is_bit_) {
02411
02412
02413 DataWriterListener_var the_listener =
02414 DataWriterListener::_narrow(this->listener_.in());
02415
02416 if (!CORBA::is_nil(the_listener.in())) {
02417 PublicationDisconnectedStatus status;
02418
02419
02420
02421 this->lookup_instance_handles(subids,
02422 status.subscription_handles);
02423 the_listener->on_publication_disconnected(this->dw_local_objref_.in(),
02424 status);
02425 }
02426 }
02427 }
02428
02429 void
02430 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02431 {
02432 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02433
02434 if (!is_bit_) {
02435
02436
02437
02438 DataWriterListener_var the_listener =
02439 DataWriterListener::_narrow(this->listener_.in());
02440
02441 if (!CORBA::is_nil(the_listener.in())) {
02442 PublicationDisconnectedStatus status;
02443
02444
02445
02446 if (this->lookup_instance_handles(subids,
02447 status.subscription_handles) == false) {
02448 ACE_ERROR((LM_ERROR,
02449 "(%P|%t) ERROR: DataWriterImpl::"
02450 "notify_publication_reconnected: "
02451 "lookup_instance_handles failed\n"));
02452 }
02453
02454 the_listener->on_publication_reconnected(this->dw_local_objref_.in(),
02455 status);
02456 }
02457 }
02458 }
02459
02460 void
02461 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02462 {
02463 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02464
02465 if (!is_bit_) {
02466
02467
02468
02469 DataWriterListener_var the_listener =
02470 DataWriterListener::_narrow(this->listener_.in());
02471
02472 if (!CORBA::is_nil(the_listener.in())) {
02473 PublicationLostStatus status;
02474
02475
02476
02477 this->lookup_instance_handles(subids,
02478 status.subscription_handles);
02479 the_listener->on_publication_lost(this->dw_local_objref_.in(),
02480 status);
02481 }
02482 }
02483 }
02484
02485 void
02486 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02487 {
02488 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02489
02490 if (!is_bit_) {
02491
02492
02493
02494 DataWriterListener_var the_listener =
02495 DataWriterListener::_narrow(this->listener_.in());
02496
02497 if (!CORBA::is_nil(the_listener.in())) {
02498 PublicationLostStatus status;
02499
02500 CORBA::ULong len = handles.length();
02501 status.subscription_handles.length(len);
02502
02503 for (CORBA::ULong i = 0; i < len; ++ i) {
02504 status.subscription_handles[i] = handles[i];
02505 }
02506
02507 the_listener->on_publication_lost(this->dw_local_objref_.in(),
02508 status);
02509 }
02510 }
02511 }
02512
02513 void
02514 DataWriterImpl::notify_connection_deleted(const RepoId& peerId)
02515 {
02516 DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6);
02517 on_notification_of_connection_deletion(peerId);
02518
02519
02520 DataWriterListener_var the_listener =
02521 DataWriterListener::_narrow(this->listener_.in());
02522
02523 if (!CORBA::is_nil(the_listener.in()))
02524 the_listener->on_connection_deleted(this->dw_local_objref_.in());
02525 }
02526
02527 bool
02528 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02529 DDS::InstanceHandleSeq & hdls)
02530 {
02531 if (DCPS_debug_level > 9) {
02532 CORBA::ULong const size = ids.length();
02533 OPENDDS_STRING separator;
02534 OPENDDS_STRING buffer;
02535
02536 for (unsigned long i = 0; i < size; ++i) {
02537 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02538 separator = ", ";
02539 }
02540
02541 ACE_DEBUG((LM_DEBUG,
02542 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02543 ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02544 buffer.c_str()));
02545 }
02546
02547 CORBA::ULong const num_rds = ids.length();
02548 hdls.length(num_rds);
02549
02550 for (CORBA::ULong i = 0; i < num_rds; ++i) {
02551 hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02552 }
02553
02554 return true;
02555 }
02556
02557 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02558 bool
02559 DataWriterImpl::persist_data()
02560 {
02561 return this->data_container_->persist_data();
02562 }
02563 #endif
02564
02565 void
02566 DataWriterImpl::reschedule_deadline()
02567 {
02568 if (this->watchdog_ != 0) {
02569 this->data_container_->reschedule_deadline();
02570 }
02571 }
02572
02573 bool
02574 DataWriterImpl::pending_control()
02575 {
02576 return controlTracker.pending_messages();
02577 }
02578
02579 void
02580 DataWriterImpl::wait_control_pending()
02581 {
02582 OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02583 controlTracker.wait_messages_pending(caller_string);
02584 }
02585
02586 void
02587 DataWriterImpl::wait_pending()
02588 {
02589 this->data_container_->wait_pending();
02590 }
02591
02592 void
02593 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02594 {
02595 this->data_container_->get_instance_handles(instance_handles);
02596 }
02597
02598 void
02599 DataWriterImpl::get_readers(RepoIdSet& readers)
02600 {
02601 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02602 readers = this->readers_;
02603 }
02604
02605 void
02606 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02607 {
02608 this->publisher_servant_->get_qos(qos_data.pub_qos);
02609 qos_data.dw_qos = this->qos_;
02610 qos_data.topic_name = this->topic_name_.in();
02611 }
02612
02613 bool
02614 DataWriterImpl::need_sequence_repair()
02615 {
02616 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02617 return need_sequence_repair_i();
02618 }
02619
02620 bool
02621 DataWriterImpl::need_sequence_repair_i() const
02622 {
02623 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02624 end = reader_info_.end(); it != end; ++it) {
02625 if (it->second.expected_sequence_ != sequence_number_) {
02626 return true;
02627 }
02628 }
02629
02630 return false;
02631 }
02632
02633 SendControlStatus
02634 DataWriterImpl::send_control(const DataSampleHeader& header,
02635 ACE_Message_Block* msg)
02636 {
02637 controlTracker.message_sent();
02638
02639 SendControlStatus status = TransportClient::send_control(header, msg);
02640
02641 if (status != SEND_CONTROL_OK) {
02642 controlTracker.message_dropped();
02643 }
02644
02645 return status;
02646 }
02647
02648 }
02649 }