DataWriterImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025 
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029 
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033 
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038 
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042 
00043 #include "tao/ORB_Core.h"
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046 
00047 #include <stdexcept>
00048 
00049 namespace OpenDDS {
00050 namespace DCPS {
00051 
00052 //TBD - add check for enabled in most methods.
00053 //      currently this is not needed because auto_enable_created_entities
00054 //      cannot be false.
00055 
00056 DataWriterImpl::DataWriterImpl()
00057   : data_dropped_count_(0),
00058     data_delivered_count_(0),
00059     controlTracker("DataWriterImpl"),
00060     n_chunks_(TheServiceParticipant->n_chunks()),
00061     association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00062     qos_(TheServiceParticipant->initial_DataWriterQos()),
00063     participant_servant_(0),
00064     topic_id_(GUID_UNKNOWN),
00065     topic_servant_(0),
00066     listener_mask_(DEFAULT_STATUS_MASK),
00067     domain_id_(0),
00068     publisher_servant_(0),
00069     publication_id_(GUID_UNKNOWN),
00070     sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00071     coherent_(false),
00072     coherent_samples_(0),
00073     data_container_(0),
00074     liveliness_lost_(false),
00075     mb_allocator_(0),
00076     db_allocator_(0),
00077     header_allocator_(0),
00078     reactor_(0),
00079     liveliness_check_interval_(ACE_Time_Value::max_time),
00080     last_liveliness_activity_time_(ACE_Time_Value::zero),
00081     last_deadline_missed_total_count_(0),
00082     watchdog_(),
00083     cancel_timer_(false),
00084     is_bit_(false),
00085     initialized_(false),
00086     min_suspended_transaction_id_(0),
00087     max_suspended_transaction_id_(0),
00088     monitor_(0),
00089     periodic_monitor_(0),
00090     db_lock_pool_(0),
00091     liveliness_asserted_(false)
00092 {
00093   liveliness_lost_status_.total_count = 0;
00094   liveliness_lost_status_.total_count_change = 0;
00095 
00096   offered_deadline_missed_status_.total_count = 0;
00097   offered_deadline_missed_status_.total_count_change = 0;
00098   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00099 
00100   offered_incompatible_qos_status_.total_count = 0;
00101   offered_incompatible_qos_status_.total_count_change = 0;
00102   offered_incompatible_qos_status_.last_policy_id = 0;
00103   offered_incompatible_qos_status_.policies.length(0);
00104 
00105   publication_match_status_.total_count = 0;
00106   publication_match_status_.total_count_change = 0;
00107   publication_match_status_.current_count = 0;
00108   publication_match_status_.current_count_change = 0;
00109   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00110 
00111   monitor_ =
00112     TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00113   periodic_monitor_ =
00114     TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00115 
00116   db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_);
00117 }
00118 
00119 // This method is called when there are no longer any reference to the
00120 // the servant.
00121 DataWriterImpl::~DataWriterImpl()
00122 {
00123   DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00124 
00125   if (initialized_) {
00126     delete data_container_;
00127     delete mb_allocator_;
00128     delete db_allocator_;
00129     delete header_allocator_;
00130   }
00131   delete db_lock_pool_;
00132 }
00133 
00134 // this method is called when delete_datawriter is called.
00135 void
00136 DataWriterImpl::cleanup()
00137 {
00138   if (cancel_timer_) {
00139     // The cancel_timer will call handle_close to
00140     // remove_ref.
00141     (void) reactor_->cancel_timer(this, 0);
00142     cancel_timer_ = false;
00143   }
00144 
00145   // release our Topic_var
00146   topic_objref_ = DDS::Topic::_nil();
00147   topic_servant_->remove_entity_ref();
00148   topic_servant_->_remove_ref();
00149   topic_servant_ = 0;
00150 
00151   dw_local_objref_ = DDS::DataWriter::_nil();
00152 }
00153 
00154 void
00155 DataWriterImpl::init(
00156   DDS::Topic_ptr                       topic,
00157   TopicImpl *                            topic_servant,
00158   const DDS::DataWriterQos &           qos,
00159   DDS::DataWriterListener_ptr          a_listener,
00160   const DDS::StatusMask &              mask,
00161   OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00162   OpenDDS::DCPS::PublisherImpl *         publisher_servant,
00163   DDS::DataWriter_ptr                  dw_local)
00164 {
00165   DBG_ENTRY_LVL("DataWriterImpl","init",6);
00166   topic_objref_ = DDS::Topic::_duplicate(topic);
00167   topic_servant_ = topic_servant;
00168   topic_servant_->_add_ref();
00169   topic_servant_->add_entity_ref();
00170   topic_name_    = topic_servant_->get_name();
00171   topic_id_      = topic_servant_->get_id();
00172   type_name_     = topic_servant_->get_type_name();
00173 
00174 #if !defined (DDS_HAS_MINIMUM_BIT)
00175   is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00176             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00177             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00178             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00179 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00180 
00181   qos_ = qos;
00182 
00183   //Note: OK to _duplicate(nil).
00184   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00185   listener_mask_ = mask;
00186 
00187   // Only store the participant pointer, since it is our "grand"
00188   // parent, we will exist as long as it does.
00189   participant_servant_ = participant_servant;
00190   domain_id_ = participant_servant_->get_domain_id();
00191 
00192   // Only store the publisher pointer, since it is our parent, we will
00193   // exist as long as it does.
00194   publisher_servant_ = publisher_servant;
00195   dw_local_objref_   = DDS::DataWriter::_duplicate(dw_local);
00196 
00197   this->reactor_ = TheServiceParticipant->timer();
00198 
00199   initialized_ = true;
00200 }
00201 
00202 DDS::InstanceHandle_t
00203 DataWriterImpl::get_instance_handle()
00204 {
00205   return this->participant_servant_->id_to_handle(publication_id_);
00206 }
00207 
00208 DDS::InstanceHandle_t
00209 DataWriterImpl::get_next_handle()
00210 {
00211   return this->participant_servant_->id_to_handle(GUID_UNKNOWN);
00212 }
00213 
00214 void
00215 DataWriterImpl::add_association(const RepoId& yourId,
00216                                 const ReaderAssociation& reader,
00217                                 bool active)
00218 {
00219   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00220 
00221   if (DCPS_debug_level) {
00222     GuidConverter writer_converter(yourId);
00223     GuidConverter reader_converter(reader.readerId);
00224     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00225                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00226                OPENDDS_STRING(writer_converter).c_str(),
00227                OPENDDS_STRING(reader_converter).c_str()));
00228   }
00229 
00230   if (entity_deleted_.value()) {
00231     if (DCPS_debug_level)
00232       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00233                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00234 
00235     return;
00236   }
00237 
00238   if (GUID_UNKNOWN == publication_id_) {
00239     publication_id_ = yourId;
00240   }
00241 
00242   {
00243     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00244     reader_info_.insert(std::make_pair(reader.readerId,
00245                                        ReaderInfo(reader.filterClassName,
00246                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00247                                                   reader.exprParams, participant_servant_,
00248                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00249   }
00250 
00251   if (DCPS_debug_level > 4) {
00252     GuidConverter converter(get_publication_id());
00253     ACE_DEBUG((LM_DEBUG,
00254                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00255                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00256                OPENDDS_STRING(converter).c_str(),
00257                qos_.transport_priority.value));
00258   }
00259 
00260   AssociationData data;
00261   data.remote_id_ = reader.readerId;
00262   data.remote_data_ = reader.readerTransInfo;
00263   data.remote_reliable_ =
00264     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00265   data.remote_durable_ =
00266     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00267 
00268   if (!associate(data, active)) {
00269     //FUTURE: inform inforepo and try again as passive peer
00270     if (DCPS_debug_level) {
00271       ACE_DEBUG((LM_ERROR,
00272                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00273                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00274     }
00275   }
00276 }
00277 
00278 void
00279 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00280 {
00281   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00282 
00283   if (!(flags & ASSOC_OK)) {
00284     if (DCPS_debug_level) {
00285       const GuidConverter conv(remote_id);
00286       ACE_DEBUG((LM_ERROR,
00287                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00288                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00289                  OPENDDS_STRING(conv).c_str()));
00290     }
00291 
00292     return;
00293   }
00294   if (DCPS_debug_level) {
00295     const GuidConverter writer_conv(publication_id_);
00296     const GuidConverter conv(remote_id);
00297     ACE_DEBUG((LM_INFO,
00298                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00299                ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00300                OPENDDS_STRING(writer_conv).c_str(),
00301                OPENDDS_STRING(conv).c_str()));
00302   }
00303   if (flags & ASSOC_ACTIVE) {
00304 
00305     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00306 
00307     // Have we already received an association_complete() callback?
00308     if (assoc_complete_readers_.count(remote_id)) {
00309       if (DCPS_debug_level) {
00310         const GuidConverter writer_conv(publication_id_);
00311         const GuidConverter converter(remote_id);
00312         ACE_DEBUG((LM_DEBUG,
00313                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00314                    ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00315                    OPENDDS_STRING(writer_conv).c_str(),
00316                    OPENDDS_STRING(converter).c_str()));
00317       }
00318       assoc_complete_readers_.erase(remote_id);
00319       association_complete_i(remote_id);
00320 
00321       // Add to pending_readers_ -> pending means we are waiting
00322       // for the association_complete() callback.
00323 
00324     } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00325       const GuidConverter converter(remote_id);
00326       ACE_ERROR((LM_ERROR,
00327                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00328                  ACE_TEXT("failed to mark %C as pending.\n"),
00329                  OPENDDS_STRING(converter).c_str()));
00330 
00331     } else {
00332       if (DCPS_debug_level) {
00333         const GuidConverter converter(remote_id);
00334         ACE_DEBUG((LM_DEBUG,
00335                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00336                    ACE_TEXT("marked %C as pending.\n"),
00337                    OPENDDS_STRING(converter).c_str()));
00338       }
00339     }
00340 
00341   } else {
00342     // In the current implementation, DataWriter is always active, so this
00343     // code will not be applicable.
00344     if (DCPS_debug_level) {
00345       const GuidConverter conv(publication_id_);
00346       ACE_DEBUG((LM_ERROR,
00347                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00348                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00349                  OPENDDS_STRING(conv).c_str()));
00350     }
00351     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00352     disco->association_complete(domain_id_, participant_servant_->get_id(),
00353                                 publication_id_, remote_id);
00354   }
00355 }
00356 
00357 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00358                                        const char* filter,
00359                                        const DDS::StringSeq& params,
00360                                        DomainParticipantImpl* participant,
00361                                        bool durable)
00362 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00363   : participant_(participant)
00364   , filter_class_name_(filterClassName)
00365   , filter_(filter)
00366   , expression_params_(params)
00367   , eval_(*filter ? participant->get_filter_eval(filter) : 0)
00368   , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00369   , durable_(durable)
00370 {}
00371 #else
00372   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00373   , durable_(durable)
00374 {
00375   ACE_UNUSED_ARG(filterClassName);
00376   ACE_UNUSED_ARG(filter);
00377   ACE_UNUSED_ARG(params);
00378   ACE_UNUSED_ARG(participant);
00379 }
00380 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00381 
00382 DataWriterImpl::ReaderInfo::~ReaderInfo()
00383 {
00384 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00385   eval_ = RcHandle<FilterEvaluator>();
00386 
00387   if (!filter_.empty()) {
00388     participant_->deref_filter_eval(filter_.c_str());
00389   }
00390 
00391 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00392 }
00393 
00394 void
00395 DataWriterImpl::association_complete(const RepoId& remote_id)
00396 {
00397   DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00398 
00399   if (DCPS_debug_level >= 1) {
00400     GuidConverter writer_converter(this->publication_id_);
00401     GuidConverter reader_converter(remote_id);
00402     ACE_DEBUG((LM_DEBUG,
00403                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00404                ACE_TEXT("bit %d local %C remote %C\n"),
00405                is_bit_,
00406                OPENDDS_STRING(writer_converter).c_str(),
00407                OPENDDS_STRING(reader_converter).c_str()));
00408   }
00409 
00410   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00411 
00412   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00413     if (DCPS_debug_level) {
00414       GuidConverter writer_converter(this->publication_id_);
00415       GuidConverter reader_converter(remote_id);
00416       ACE_DEBUG((LM_DEBUG,
00417                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00418                  ACE_TEXT("bit %d local %C did not find pending reader: %C")
00419                  ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00420                  is_bit_,
00421                  OPENDDS_STRING(writer_converter).c_str(),
00422                  OPENDDS_STRING(reader_converter).c_str()));
00423     }
00424     // Not found in pending_readers_, defer calling association_complete_i()
00425     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00426     assoc_complete_readers_.insert(remote_id);
00427 
00428   } else {
00429     association_complete_i(remote_id);
00430   }
00431 }
00432 
00433 void
00434 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00435 {
00436   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00437 
00438   if (DCPS_debug_level >= 1) {
00439     GuidConverter writer_converter(this->publication_id_);
00440     GuidConverter reader_converter(remote_id);
00441     ACE_DEBUG((LM_DEBUG,
00442                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00443                ACE_TEXT("bit %d local %C remote %C\n"),
00444                is_bit_,
00445                OPENDDS_STRING(writer_converter).c_str(),
00446                OPENDDS_STRING(reader_converter).c_str()));
00447   }
00448 
00449   bool reader_durable = false;
00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00451   OPENDDS_STRING filterClassName;
00452   RcHandle<FilterEvaluator> eval;
00453   DDS::StringSeq expression_params;
00454 #endif
00455   {
00456     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00457 
00458     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00459       GuidConverter converter(remote_id);
00460       ACE_ERROR((LM_ERROR,
00461                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00462                  ACE_TEXT("insert %C from pending failed.\n"),
00463                  OPENDDS_STRING(converter).c_str()));
00464     }
00465   }
00466   {
00467     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00468     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00469 
00470     if (it != reader_info_.end()) {
00471       reader_durable = it->second.durable_;
00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00473       filterClassName = it->second.filter_class_name_;
00474       eval = it->second.eval_;
00475       expression_params = it->second.expression_params_;
00476 #endif
00477     }
00478   }
00479 
00480   if (this->monitor_) {
00481     this->monitor_->report();
00482   }
00483 
00484   if (!is_bit_) {
00485 
00486     DDS::InstanceHandle_t handle =
00487       this->participant_servant_->id_to_handle(remote_id);
00488 
00489     {
00490       // protect publication_match_status_ and status changed flags.
00491       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00492 
00493       // update the publication_match_status_
00494       ++publication_match_status_.total_count;
00495       ++publication_match_status_.total_count_change;
00496       ++publication_match_status_.current_count;
00497       ++publication_match_status_.current_count_change;
00498 
00499       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00500         GuidConverter converter(remote_id);
00501         ACE_DEBUG((LM_WARNING,
00502                    ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00503                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00504                    OPENDDS_STRING(converter).c_str(),
00505                    handle));
00506         return;
00507 
00508       } else if (DCPS_debug_level > 4) {
00509         GuidConverter converter(remote_id);
00510         ACE_DEBUG((LM_DEBUG,
00511                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00512                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00513                    OPENDDS_STRING(converter).c_str(),
00514                    handle));
00515       }
00516 
00517       publication_match_status_.last_subscription_handle = handle;
00518 
00519       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00520     }
00521 
00522     DDS::DataWriterListener_var listener =
00523       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00524 
00525     if (!CORBA::is_nil(listener.in())) {
00526 
00527       listener->on_publication_matched(dw_local_objref_.in(),
00528                                        publication_match_status_);
00529 
00530       // TBD - why does the spec say to change this but not
00531       // change the ChangeFlagStatus after a listener call?
00532       publication_match_status_.total_count_change = 0;
00533       publication_match_status_.current_count_change = 0;
00534     }
00535 
00536     notify_status_condition();
00537   }
00538 
00539   // Support DURABILITY QoS
00540   if (reader_durable) {
00541     // Tell the WriteDataContainer to resend all sending/sent
00542     // samples.
00543     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00545                                          , filterClassName, eval.in(), expression_params
00546 #endif
00547                                         );
00548 
00549     // Acquire the data writer container lock to avoid deadlock. The
00550     // thread calling association_complete() has to acquire lock in the
00551     // same order as the write()/register() operation.
00552 
00553     // Since the thread calling association_complete() is the ORB
00554     // thread, it may have some performance penalty. If the
00555     // performance is an issue, we may need a new thread to handle the
00556     // data_available() calls.
00557     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00558               guard,
00559               this->get_lock());
00560 
00561     SendStateDataSampleList list = this->get_resend_data();
00562     {
00563       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00564       // Update the reader's expected sequence
00565       SequenceNumber& seq =
00566         reader_info_.find(remote_id)->second.expected_sequence_;
00567 
00568       for (SendStateDataSampleList::iterator list_el = list.begin();
00569            list_el != list.end(); ++list_el) {
00570         list_el->get_header().historic_sample_ = true;
00571 
00572         if (list_el->get_header().sequence_ > seq) {
00573           seq = list_el->get_header().sequence_;
00574         }
00575       }
00576     }
00577 
00578     if (this->publisher_servant_->is_suspended()) {
00579       this->available_data_list_.enqueue_tail(list);
00580 
00581     } else {
00582       if (DCPS_debug_level >= 4) {
00583         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00584       }
00585 
00586       size_t size = 0, padding = 0;
00587       gen_find_size(remote_id, size, padding);
00588       ACE_Message_Block* const data =
00589         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00590                               get_db_lock());
00591       Serializer ser(data);
00592       ser << remote_id;
00593 
00594       const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00595       DataSampleHeader header;
00596       ACE_Message_Block* const end_historic_samples =
00597         create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp);
00598 
00599       this->controlTracker.message_sent();
00600       guard.release();
00601       SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id);
00602       if (ret == SEND_CONTROL_ERROR) {
00603         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00604                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
00605                              ACE_TEXT("send_w_control failed.\n")));
00606         this->controlTracker.message_dropped();
00607       }
00608     }
00609   }
00610 }
00611 
00612 void
00613 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00614                                     CORBA::Boolean notify_lost)
00615 {
00616   if (readers.length() == 0) {
00617     return;
00618   }
00619 
00620   if (DCPS_debug_level >= 1) {
00621     GuidConverter writer_converter(publication_id_);
00622     GuidConverter reader_converter(readers[0]);
00623     ACE_DEBUG((LM_DEBUG,
00624                ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00625                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00626                is_bit_,
00627                OPENDDS_STRING(writer_converter).c_str(),
00628                OPENDDS_STRING(reader_converter).c_str(),
00629                readers.length()));
00630   }
00631 
00632   // stop pending associations for these reader ids
00633   this->stop_associating(readers.get_buffer(), readers.length());
00634 
00635   ReaderIdSeq fully_associated_readers;
00636   CORBA::ULong fully_associated_len = 0;
00637   ReaderIdSeq rds;
00638   CORBA::ULong rds_len = 0;
00639   DDS::InstanceHandleSeq handles;
00640 
00641   ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00642   {
00643     // Ensure the same acquisition order as in wait_for_acknowledgments().
00644     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00645     //Remove the readers from fully associated reader list.
00646     //If the supplied reader is not in the cached reader list then it is
00647     //already removed. We just need remove the readers in the list that have
00648     //not been removed.
00649 
00650     CORBA::ULong len = readers.length();
00651 
00652     for (CORBA::ULong i = 0; i < len; ++i) {
00653       //Remove the readers from fully associated reader list. If it's not
00654       //in there, the association_complete() is not called yet and remove it
00655       //from pending list.
00656 
00657       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00658         ++ fully_associated_len;
00659         fully_associated_readers.length(fully_associated_len);
00660         fully_associated_readers [fully_associated_len - 1] = readers[i];
00661 
00662         ++ rds_len;
00663         rds.length(rds_len);
00664         rds [rds_len - 1] = readers[i];
00665 
00666       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00667         ++ rds_len;
00668         rds.length(rds_len);
00669         rds [rds_len - 1] = readers[i];
00670 
00671         GuidConverter converter(readers[i]);
00672         ACE_DEBUG((LM_WARNING,
00673                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00674                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00675                    OPENDDS_STRING(converter).c_str()));
00676       }
00677 
00678       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00679       reader_info_.erase(readers[i]);
00680       //else reader is already removed which indicates remove_association()
00681       //is called multiple times.
00682     }
00683 
00684     if (fully_associated_len > 0 && !is_bit_) {
00685       // The reader should be in the id_to_handle map at this time so
00686       // log with error.
00687       if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00688         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::remove_associations: "
00689                    "lookup_instance_handles failed, notify %d \n", notify_lost));
00690 
00691         return;
00692       }
00693 
00694       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00695         id_to_handle_map_.erase(fully_associated_readers[i]);
00696       }
00697     }
00698 
00699     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00700     // association_complete() here.
00701     if (!this->is_bit_) {
00702 
00703       // Derive the change in the number of subscriptions reading this writer.
00704       int matchedSubscriptions =
00705         static_cast<int>(this->id_to_handle_map_.size());
00706       this->publication_match_status_.current_count_change =
00707         matchedSubscriptions - this->publication_match_status_.current_count;
00708 
00709       // Only process status if the number of subscriptions has changed.
00710       if (this->publication_match_status_.current_count_change != 0) {
00711         this->publication_match_status_.current_count = matchedSubscriptions;
00712 
00713         /// Section 7.1.4.1: total_count will not decrement.
00714 
00715         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00716         this->publication_match_status_.last_subscription_handle =
00717           handles[fully_associated_len - 1];
00718 
00719         set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00720 
00721         DDS::DataWriterListener_var listener =
00722           this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00723 
00724         if (!CORBA::is_nil(listener.in())) {
00725           listener->on_publication_matched(
00726             this->dw_local_objref_.in(),
00727             this->publication_match_status_);
00728 
00729           // Listener consumes the change.
00730           this->publication_match_status_.total_count_change = 0;
00731           this->publication_match_status_.current_count_change = 0;
00732         }
00733 
00734         this->notify_status_condition();
00735       }
00736     }
00737   }
00738 
00739   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00740     this->disassociate(rds[i]);
00741   }
00742 
00743   // If this remove_association is invoked when the InfoRepo
00744   // detects a lost reader then make a callback to notify
00745   // subscription lost.
00746   if (notify_lost && handles.length() > 0) {
00747     this->notify_publication_lost(handles);
00748   }
00749 }
00750 
00751 void DataWriterImpl::remove_all_associations()
00752 {
00753   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00754   // stop pending associations
00755   this->stop_associating();
00756 
00757   OpenDDS::DCPS::ReaderIdSeq readers;
00758   CORBA::ULong size;
00759   CORBA::ULong num_pending_readers;
00760   {
00761     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00762 
00763     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00764     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00765     readers.length(size);
00766 
00767     RepoIdSet::iterator itEnd = readers_.end();
00768     int i = 0;
00769 
00770     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00771       readers[i ++] = *it;
00772     }
00773 
00774     itEnd = pending_readers_.end();
00775 
00776     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00777       readers[i ++] = *it;
00778     }
00779 
00780     if (num_pending_readers > 0) {
00781       ACE_DEBUG((LM_WARNING,
00782                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00783                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00784                  num_pending_readers));
00785     }
00786   }
00787 
00788   try {
00789     if (0 < size) {
00790       CORBA::Boolean dont_notify_lost = false;
00791 
00792       this->remove_associations(readers, dont_notify_lost);
00793     }
00794 
00795   } catch (const CORBA::Exception&) {
00796       ACE_DEBUG((LM_WARNING,
00797                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00798                  ACE_TEXT("caught exception from remove_associations.\n")));
00799   }
00800 }
00801 
00802 void
00803 DataWriterImpl::register_for_reader(const RepoId& participant,
00804                                     const RepoId& writerid,
00805                                     const RepoId& readerid,
00806                                     const TransportLocatorSeq& locators,
00807                                     DiscoveryListener* listener)
00808 {
00809   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00810 }
00811 
00812 void
00813 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00814                                       const RepoId& writerid,
00815                                       const RepoId& readerid)
00816 {
00817   TransportClient::unregister_for_reader(participant, writerid, readerid);
00818 }
00819 
00820 void
00821 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00822 {
00823   DDS::DataWriterListener_var listener =
00824     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00825 
00826   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00827 
00828 #if 0
00829 
00830   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00831     // This test should make the method idempotent.
00832     return;
00833   }
00834 
00835 #endif
00836 
00837   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00838 
00839   // copy status and increment change
00840   offered_incompatible_qos_status_.total_count = status.total_count;
00841   offered_incompatible_qos_status_.total_count_change +=
00842     status.count_since_last_send;
00843   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00844   offered_incompatible_qos_status_.policies = status.policies;
00845 
00846   if (!CORBA::is_nil(listener.in())) {
00847     listener->on_offered_incompatible_qos(dw_local_objref_.in(),
00848                                           offered_incompatible_qos_status_);
00849 
00850     // TBD - Why does the spec say to change this but not change the
00851     //       ChangeFlagStatus after a listener call?
00852     offered_incompatible_qos_status_.total_count_change = 0;
00853   }
00854 
00855   notify_status_condition();
00856 }
00857 
00858 void
00859 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00860                                            const DDS::StringSeq& params)
00861 {
00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00863   ACE_UNUSED_ARG(readerId);
00864   ACE_UNUSED_ARG(params);
00865 #else
00866   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00867   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00868   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00869 
00870   if (iter != reader_info_.end()) {
00871     iter->second.expression_params_ = params;
00872 
00873   } else if (DCPS_debug_level > 4 &&
00874              TheServiceParticipant->publisher_content_filter()) {
00875     GuidConverter pubConv(this->publication_id_), subConv(readerId);
00876     ACE_DEBUG((LM_WARNING,
00877                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00878                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00879                OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00880   }
00881 
00882 #endif
00883 }
00884 
00885 void
00886 DataWriterImpl::inconsistent_topic()
00887 {
00888   topic_servant_->inconsistent_topic();
00889 }
00890 
00891 DDS::ReturnCode_t
00892 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00893 {
00894 
00895   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00896   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00897   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00898   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00899   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00900 
00901   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00902     if (qos_ == qos)
00903       return DDS::RETCODE_OK;
00904 
00905     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00906       return DDS::RETCODE_IMMUTABLE_POLICY;
00907 
00908     } else {
00909       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00910       DDS::PublisherQos publisherQos;
00911       this->publisher_servant_->get_qos(publisherQos);
00912       const bool status
00913         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00914                                         this->participant_servant_->get_id(),
00915                                         this->publication_id_,
00916                                         qos,
00917                                         publisherQos);
00918 
00919       if (!status) {
00920         ACE_ERROR_RETURN((LM_ERROR,
00921                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00922                           ACE_TEXT("qos not updated. \n")),
00923                          DDS::RETCODE_ERROR);
00924       }
00925     }
00926 
00927     if (!(qos_ == qos)) {
00928       // Reset the deadline timer if the period has changed.
00929       if (qos_.deadline.period.sec != qos.deadline.period.sec
00930           || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00931         if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00932             && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00933           this->watchdog_ =
00934                              new OfferedDeadlineWatchdog(
00935                                this->lock_,
00936                                qos.deadline,
00937                                this,
00938                                this->dw_local_objref_.in(),
00939                                this->offered_deadline_missed_status_,
00940                                this->last_deadline_missed_total_count_);
00941 
00942         } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00943                    && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00944           this->watchdog_->cancel_all();
00945           this->watchdog_->destroy();
00946           this->watchdog_ = 0;
00947 
00948         } else {
00949           this->watchdog_->reset_interval(
00950             duration_to_time_value(qos.deadline.period));
00951         }
00952       }
00953 
00954       qos_ = qos;
00955     }
00956 
00957     return DDS::RETCODE_OK;
00958 
00959   } else {
00960     return DDS::RETCODE_INCONSISTENT_POLICY;
00961   }
00962 }
00963 
00964 DDS::ReturnCode_t
00965 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00966 {
00967   qos = qos_;
00968   return DDS::RETCODE_OK;
00969 }
00970 
00971 DDS::ReturnCode_t
00972 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00973                              DDS::StatusMask mask)
00974 {
00975   listener_mask_ = mask;
00976   //note: OK to duplicate  a nil object ref
00977   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00978   return DDS::RETCODE_OK;
00979 }
00980 
00981 DDS::DataWriterListener_ptr
00982 DataWriterImpl::get_listener()
00983 {
00984   return DDS::DataWriterListener::_duplicate(listener_.in());
00985 }
00986 
00987 DDS::Topic_ptr
00988 DataWriterImpl::get_topic()
00989 {
00990   return DDS::Topic::_duplicate(topic_objref_.in());
00991 }
00992 
00993 bool
00994 DataWriterImpl::should_ack() const
00995 {
00996   // N.B. It may be worthwhile to investigate a more efficient
00997   // heuristic for determining if a writer should send SAMPLE_ACK
00998   // control samples. Perhaps based on a sequence number delta?
00999   return this->readers_.size() != 0;
01000 }
01001 
01002 DataWriterImpl::AckToken
01003 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
01004 {
01005   if (DCPS_debug_level > 0) {
01006     ACE_DEBUG((LM_DEBUG,
01007                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
01008                ACE_TEXT("for sequence %q \n"),
01009                this->sequence_number_.getValue()));
01010   }
01011   return AckToken(max_wait, this->sequence_number_);
01012 }
01013 
01014 DDS::ReturnCode_t
01015 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01016 {
01017   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01018     return DDS::RETCODE_OK;
01019   DataWriterImpl::AckToken token = create_ack_token(max_wait);
01020   if (DCPS_debug_level) {
01021     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01022                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01023                           token.sequence_.getValue()));
01024   }
01025   return wait_for_specific_ack(token);
01026 }
01027 
01028 DDS::ReturnCode_t
01029 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01030 {
01031   return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01032 }
01033 
01034 DDS::Publisher_ptr
01035 DataWriterImpl::get_publisher()
01036 {
01037   return DDS::Publisher::_duplicate(publisher_servant_);
01038 }
01039 
01040 DDS::ReturnCode_t
01041 DataWriterImpl::get_liveliness_lost_status(
01042   DDS::LivelinessLostStatus & status)
01043 {
01044   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01045                    guard,
01046                    this->lock_,
01047                    DDS::RETCODE_ERROR);
01048   set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01049   status = liveliness_lost_status_;
01050   liveliness_lost_status_.total_count_change = 0;
01051   return DDS::RETCODE_OK;
01052 }
01053 
01054 DDS::ReturnCode_t
01055 DataWriterImpl::get_offered_deadline_missed_status(
01056   DDS::OfferedDeadlineMissedStatus & status)
01057 {
01058   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01059                    guard,
01060                    this->lock_,
01061                    DDS::RETCODE_ERROR);
01062 
01063   set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01064 
01065   this->offered_deadline_missed_status_.total_count_change =
01066     this->offered_deadline_missed_status_.total_count
01067     - this->last_deadline_missed_total_count_;
01068 
01069   // Update for next status check.
01070   this->last_deadline_missed_total_count_ =
01071     this->offered_deadline_missed_status_.total_count;
01072 
01073   status = offered_deadline_missed_status_;
01074 
01075   this->offered_deadline_missed_status_.total_count_change = 0;
01076 
01077   return DDS::RETCODE_OK;
01078 }
01079 
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_incompatible_qos_status(
01082   DDS::OfferedIncompatibleQosStatus & status)
01083 {
01084   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085                    guard,
01086                    this->lock_,
01087                    DDS::RETCODE_ERROR);
01088   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01089   status = offered_incompatible_qos_status_;
01090   offered_incompatible_qos_status_.total_count_change = 0;
01091   return DDS::RETCODE_OK;
01092 }
01093 
01094 DDS::ReturnCode_t
01095 DataWriterImpl::get_publication_matched_status(
01096   DDS::PublicationMatchedStatus & status)
01097 {
01098   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01099                    guard,
01100                    this->lock_,
01101                    DDS::RETCODE_ERROR);
01102   set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01103   status = publication_match_status_;
01104   publication_match_status_.total_count_change = 0;
01105   publication_match_status_.current_count_change = 0;
01106   return DDS::RETCODE_OK;
01107 }
01108 
01109 DDS::ReturnCode_t
01110 DataWriterImpl::assert_liveliness()
01111 {
01112   switch (this->qos_.liveliness.kind) {
01113   case DDS::AUTOMATIC_LIVELINESS_QOS:
01114     // Do nothing.
01115     break;
01116   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01117     return participant_servant_->assert_liveliness();
01118   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01119     if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01120       return DDS::RETCODE_ERROR;
01121     }
01122     break;
01123   }
01124 
01125   return DDS::RETCODE_OK;
01126 }
01127 
01128 DDS::ReturnCode_t
01129 DataWriterImpl::assert_liveliness_by_participant()
01130 {
01131   // This operation is called by participant.
01132 
01133   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01134     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
01135     liveliness_asserted_ = true;
01136   }
01137 
01138   return DDS::RETCODE_OK;
01139 }
01140 
01141 ACE_Time_Value
01142 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01143 {
01144   if (this->qos_.liveliness.kind == kind) {
01145     return liveliness_check_interval_;
01146   } else {
01147     return ACE_Time_Value::max_time;
01148   }
01149 }
01150 
01151 bool
01152 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01153 {
01154   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01155     return last_liveliness_activity_time_ > tv;
01156   } else {
01157     return false;
01158   }
01159 }
01160 
01161 DDS::ReturnCode_t
01162 DataWriterImpl::get_matched_subscriptions(
01163   DDS::InstanceHandleSeq & subscription_handles)
01164 {
01165   if (enabled_ == false) {
01166     ACE_ERROR_RETURN((LM_ERROR,
01167                       ACE_TEXT("(%P|%t) ERROR: ")
01168                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01169                       ACE_TEXT(" Entity is not enabled. \n")),
01170                      DDS::RETCODE_NOT_ENABLED);
01171   }
01172 
01173   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01174                    guard,
01175                    this->lock_,
01176                    DDS::RETCODE_ERROR);
01177 
01178   // Copy out the handles for the current set of subscriptions.
01179   int index = 0;
01180   subscription_handles.length(
01181     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01182 
01183   for (RepoIdToHandleMap::iterator
01184        current = this->id_to_handle_map_.begin();
01185        current != this->id_to_handle_map_.end();
01186        ++current, ++index) {
01187     subscription_handles[index] = current->second;
01188   }
01189 
01190   return DDS::RETCODE_OK;
01191 }
01192 
01193 #if !defined (DDS_HAS_MINIMUM_BIT)
01194 DDS::ReturnCode_t
01195 DataWriterImpl::get_matched_subscription_data(
01196   DDS::SubscriptionBuiltinTopicData & subscription_data,
01197   DDS::InstanceHandle_t subscription_handle)
01198 {
01199   if (enabled_ == false) {
01200     ACE_ERROR_RETURN((LM_ERROR,
01201                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01202                       ACE_TEXT("get_matched_subscription_data: ")
01203                       ACE_TEXT("Entity is not enabled. \n")),
01204                      DDS::RETCODE_NOT_ENABLED);
01205   }
01206 
01207   BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader,
01208                DDS::SubscriptionBuiltinTopicDataDataReader_var,
01209                DDS::SubscriptionBuiltinTopicDataSeq > hh;
01210 
01211   DDS::SubscriptionBuiltinTopicDataSeq data;
01212 
01213   DDS::ReturnCode_t ret =
01214     hh.instance_handle_to_bit_data(participant_servant_,
01215                                    BUILT_IN_SUBSCRIPTION_TOPIC,
01216                                    subscription_handle,
01217                                    data);
01218 
01219   if (ret == DDS::RETCODE_OK) {
01220     subscription_data = data[0];
01221   }
01222 
01223   return ret;
01224 }
01225 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01226 
01227 DDS::ReturnCode_t
01228 DataWriterImpl::enable()
01229 {
01230   //According spec:
01231   // - Calling enable on an already enabled Entity returns OK and has no
01232   // effect.
01233   // - Calling enable on an Entity whose factory is not enabled will fail
01234   // and return PRECONDITION_NOT_MET.
01235 
01236   if (this->is_enabled()) {
01237     return DDS::RETCODE_OK;
01238   }
01239 
01240   if (this->publisher_servant_->is_enabled() == false) {
01241     return DDS::RETCODE_PRECONDITION_NOT_MET;
01242   }
01243 
01244   // Note: do configuration based on QoS in enable() because
01245   //       before enable is called the QoS can be changed -- even
01246   //       for Changeable=NO
01247 
01248   // Configure WriteDataContainer constructor parameters from qos.
01249 
01250   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01251 
01252   CORBA::Long const depth =
01253     get_instance_sample_list_depth(
01254       qos_.history.kind,
01255       qos_.history.depth,
01256       qos_.resource_limits.max_samples_per_instance);
01257 
01258   bool resource_blocking = false;
01259 
01260   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01261     n_chunks_ = qos_.resource_limits.max_samples;
01262 
01263     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) {
01264       resource_blocking = true;
01265 
01266     } else {
01267       resource_blocking =
01268         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01269         || (qos_.resource_limits.max_samples <
01270             (qos_.resource_limits.max_instances * depth));
01271     }
01272   }
01273 
01274   //else using value from Service_Participant
01275 
01276   // enable the type specific part of this DataWriter
01277   this->enable_specific();
01278 
01279   CORBA::Long max_instances = 0;
01280 
01281   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01282     max_instances = qos_.resource_limits.max_instances;
01283 
01284   CORBA::Long max_total_samples = 0;
01285 
01286   if (reliable && resource_blocking)
01287     max_total_samples = qos_.resource_limits.max_samples;
01288 
01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01290   // Get data durability cache if DataWriter QoS requires durable
01291   // samples.  Publisher servant retains ownership of the cache.
01292   DataDurabilityCache* const durability_cache =
01293     TheServiceParticipant->get_data_durability_cache(qos_.durability);
01294 #endif
01295 
01296   //Note: the QoS used to set n_chunks_ is Changable=No so
01297   // it is OK that we cannot change the size of our allocators.
01298   data_container_ = new WriteDataContainer(this,
01299                                            depth,
01300                                            qos_.reliability.max_blocking_time,
01301                                            n_chunks_,
01302                                            domain_id_,
01303                                            get_topic_name(),
01304                                            get_type_name(),
01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01306                                            durability_cache,
01307                                            qos_.durability_service,
01308 #endif
01309                                            max_instances,
01310                                            max_total_samples);
01311 
01312   // +1 because we might allocate one before releasing another
01313   // TBD - see if this +1 can be removed.
01314   mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_);
01315   db_allocator_ = new DataBlockAllocator(n_chunks_+1);
01316   header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1);
01317 
01318   if (DCPS_debug_level >= 2) {
01319     ACE_DEBUG((LM_DEBUG,
01320                "(%P|%t) DataWriterImpl::enable-mb"
01321                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01322                mb_allocator_,
01323                n_chunks_));
01324 
01325     ACE_DEBUG((LM_DEBUG,
01326                "(%P|%t) DataWriterImpl::enable-db"
01327                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01328                db_allocator_,
01329                n_chunks_));
01330 
01331     ACE_DEBUG((LM_DEBUG,
01332                "(%P|%t) DataWriterImpl::enable-header"
01333                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01334                header_allocator_,
01335                n_chunks_));
01336   }
01337 
01338   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01339       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01340     liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01341     liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01342     // Must be at least 1 micro second.
01343     if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01344       liveliness_check_interval_ = ACE_Time_Value (0, 1);
01345     }
01346 
01347     last_liveliness_check_time_ = ACE_OS::gettimeofday();
01348 
01349     if (reactor_->schedule_timer(this,
01350                                  0,
01351                                  liveliness_check_interval_,
01352                                  liveliness_check_interval_) == -1) {
01353       ACE_ERROR((LM_ERROR,
01354                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01355                  ACE_TEXT("schedule_timer")));
01356 
01357     } else {
01358       cancel_timer_ = true;
01359       this->_add_ref();
01360     }
01361   }
01362 
01363   participant_servant_->add_adjust_liveliness_timers(this);
01364 
01365   // Setup the offered deadline watchdog if the configured deadline
01366   // period is not the default (infinite).
01367   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01368 
01369   if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01370       || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01371     this->watchdog_ = new OfferedDeadlineWatchdog(
01372                          this->lock_,
01373                          this->qos_.deadline,
01374                          this,
01375                          this->dw_local_objref_.in(),
01376                          this->offered_deadline_missed_status_,
01377                          this->last_deadline_missed_total_count_);
01378   }
01379 
01380   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01381   disco->pre_writer(this);
01382 
01383   this->set_enabled();
01384 
01385   try {
01386     this->enable_transport(reliable,
01387                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01388 
01389   } catch (const Transport::Exception&) {
01390     ACE_ERROR((LM_ERROR,
01391                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01392                ACE_TEXT("Transport Exception.\n")));
01393     return DDS::RETCODE_ERROR;
01394 
01395   }
01396 
01397   const TransportLocatorSeq& trans_conf_info = connection_info();
01398 
01399   DDS::PublisherQos pub_qos;
01400   this->publisher_servant_->get_qos(pub_qos);
01401 
01402   this->publication_id_ =
01403     disco->add_publication(this->domain_id_,
01404                            this->participant_servant_->get_id(),
01405                            this->topic_servant_->get_id(),
01406                            this,
01407                            this->qos_,
01408                            trans_conf_info,
01409                            pub_qos);
01410 
01411   if (this->publication_id_ == GUID_UNKNOWN) {
01412     ACE_ERROR((LM_ERROR,
01413                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01414                ACE_TEXT("add_publication returned invalid id. \n")));
01415     return DDS::RETCODE_ERROR;
01416   }
01417 
01418   this->data_container_->publication_id_ = this->publication_id_;
01419 
01420   const DDS::ReturnCode_t writer_enabled_result =
01421     publisher_servant_->writer_enabled(topic_name_.in(), this);
01422 
01423   if (this->monitor_) {
01424     this->monitor_->report();
01425   }
01426 
01427 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01428 
01429   // Move cached data from the durability cache to the unsent data
01430   // queue.
01431   if (durability_cache != 0) {
01432 
01433     if (!durability_cache->get_data(this->domain_id_,
01434                                     get_topic_name(),
01435                                     get_type_name(),
01436                                     this,
01437                                     this->mb_allocator_,
01438                                     this->db_allocator_,
01439                                     this->qos_.lifespan)) {
01440       ACE_ERROR((LM_ERROR,
01441                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01442                  ACE_TEXT("unable to retrieve durable data\n")));
01443     }
01444   }
01445 
01446 #endif
01447 
01448   return writer_enabled_result;
01449 }
01450 
01451 void
01452 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01453 {
01454   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01455 
01456   SendStateDataSampleList list;
01457 
01458   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01459 
01460   controlTracker.message_sent();
01461 
01462   //need to release guard to call down to transport
01463   guard.release();
01464 
01465   this->send(list, transaction_id);
01466 }
01467 
01468 DDS::ReturnCode_t
01469 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01470                                     DataSample* data,
01471                                     const DDS::Time_t& source_timestamp)
01472 {
01473   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01474 
01475   if (enabled_ == false) {
01476     ACE_ERROR_RETURN((LM_ERROR,
01477                       ACE_TEXT("(%P|%t) ERROR: ")
01478                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01479                       ACE_TEXT(" Entity is not enabled. \n")),
01480                      DDS::RETCODE_NOT_ENABLED);
01481   }
01482 
01483   DDS::ReturnCode_t ret =
01484     this->data_container_->register_instance(handle, data);
01485 
01486   if (ret != DDS::RETCODE_OK) {
01487     ACE_ERROR_RETURN((LM_ERROR,
01488                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01489                       ACE_TEXT("register instance with container failed.\n")),
01490                      ret);
01491   }
01492 
01493   if (this->monitor_) {
01494     this->monitor_->report();
01495   }
01496 
01497   DataSampleElement* element = 0;
01498   ret = this->data_container_->obtain_buffer_for_control(element);
01499 
01500   if (ret != DDS::RETCODE_OK) {
01501     ACE_ERROR_RETURN((LM_ERROR,
01502                       ACE_TEXT("(%P|%t) ERROR: ")
01503                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01504                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01505                       ret),
01506                      ret);
01507   }
01508 
01509   // Add header with the registration sample data.
01510   element->set_sample(create_control_message(INSTANCE_REGISTRATION,
01511                                              element->get_header(),
01512                                              data,
01513                                              source_timestamp));
01514 
01515   ret = this->data_container_->enqueue_control(element);
01516 
01517   if (ret != DDS::RETCODE_OK) {
01518     ACE_ERROR_RETURN((LM_ERROR,
01519                       ACE_TEXT("(%P|%t) ERROR: ")
01520                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01521                       ACE_TEXT("enqueue_control failed.\n")),
01522                      ret);
01523   }
01524 
01525   return ret;
01526 }
01527 
01528 DDS::ReturnCode_t
01529 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01530                                     DataSample* data,
01531                                     const DDS::Time_t & source_timestamp)
01532 {
01533   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01534 
01535   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01536                    guard,
01537                    get_lock(),
01538                    ::DDS::RETCODE_ERROR);
01539 
01540   DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp);
01541   if (ret != DDS::RETCODE_OK) {
01542     ACE_ERROR_RETURN((LM_ERROR,
01543                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01544                       ACE_TEXT("register instance with container failed.\n")),
01545                       ret);
01546   }
01547 
01548   send_all_to_flush_control(guard);
01549 
01550   return ret;
01551 }
01552 
01553 DDS::ReturnCode_t
01554 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01555                                       const DDS::Time_t& source_timestamp)
01556 {
01557   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01558 
01559   if (enabled_ == false) {
01560     ACE_ERROR_RETURN((LM_ERROR,
01561                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01562                       ACE_TEXT(" Entity is not enabled.\n")),
01563                      DDS::RETCODE_NOT_ENABLED);
01564   }
01565 
01566   // According to spec 1.2, autodispose_unregistered_instances true causes
01567   // dispose on the instance prior to calling unregister operation.
01568   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01569     return this->dispose_and_unregister(handle, source_timestamp);
01570   }
01571 
01572   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01573   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01574   DataSample* unregistered_sample_data = 0;
01575   ret = this->data_container_->unregister(handle, unregistered_sample_data);
01576 
01577   if (ret != DDS::RETCODE_OK) {
01578     ACE_ERROR_RETURN((LM_ERROR,
01579                       ACE_TEXT("(%P|%t) ERROR: ")
01580                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01581                       ACE_TEXT(" unregister with container failed. \n")),
01582                      ret);
01583   }
01584 
01585   DataSampleElement* element = 0;
01586   ret = this->data_container_->obtain_buffer_for_control(element);
01587 
01588   if (ret != DDS::RETCODE_OK) {
01589     ACE_ERROR_RETURN((LM_ERROR,
01590                       ACE_TEXT("(%P|%t) ERROR: ")
01591                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01592                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01593                       ret),
01594                      ret);
01595   }
01596 
01597   element->set_sample(create_control_message(UNREGISTER_INSTANCE,
01598                                              element->get_header(),
01599                                              unregistered_sample_data,
01600                                              source_timestamp));
01601   ret = this->data_container_->enqueue_control(element);
01602 
01603   if (ret != DDS::RETCODE_OK) {
01604     ACE_ERROR_RETURN((LM_ERROR,
01605                       ACE_TEXT("(%P|%t) ERROR: ")
01606                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01607                       ACE_TEXT("enqueue_control failed.\n")),
01608                      ret);
01609   }
01610 
01611   send_all_to_flush_control(guard);
01612   return DDS::RETCODE_OK;
01613 }
01614 
01615 DDS::ReturnCode_t
01616 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01617                                        const DDS::Time_t& source_timestamp)
01618 {
01619   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01620 
01621   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01622   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01623 
01624   DataSample* data_sample = 0;
01625   ret = this->data_container_->dispose(handle, data_sample);
01626 
01627   if (ret != DDS::RETCODE_OK) {
01628     ACE_ERROR_RETURN((LM_ERROR,
01629                       ACE_TEXT("(%P|%t) ERROR: ")
01630                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01631                       ACE_TEXT("dispose on container failed. \n")),
01632                      ret);
01633   }
01634 
01635   ret = this->data_container_->unregister(handle, data_sample, false);
01636 
01637   if (ret != DDS::RETCODE_OK) {
01638     ACE_ERROR_RETURN((LM_ERROR,
01639                       ACE_TEXT("(%P|%t) ERROR: ")
01640                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01641                       ACE_TEXT("unregister with container failed. \n")),
01642                      ret);
01643   }
01644 
01645   DataSampleElement* element = 0;
01646   ret = this->data_container_->obtain_buffer_for_control(element);
01647 
01648   if (ret != DDS::RETCODE_OK) {
01649     ACE_ERROR_RETURN((LM_ERROR,
01650                       ACE_TEXT("(%P|%t) ERROR: ")
01651                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01652                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01653                       ret),
01654                      ret);
01655   }
01656 
01657   element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01658                                              element->get_header(),
01659                                              data_sample,
01660                                              source_timestamp));
01661 
01662   ret = this->data_container_->enqueue_control(element);
01663 
01664   if (ret != DDS::RETCODE_OK) {
01665     ACE_ERROR_RETURN((LM_ERROR,
01666                       ACE_TEXT("(%P|%t) ERROR: ")
01667                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01668                       ACE_TEXT("enqueue_control failed.\n")),
01669                      ret);
01670   }
01671 
01672   send_all_to_flush_control(guard);
01673   return DDS::RETCODE_OK;
01674 }
01675 
01676 void
01677 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01678 {
01679   {
01680     ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01681 
01682     PublicationInstanceMapType::iterator it =
01683       this->data_container_->instances_.begin();
01684 
01685     while (it != this->data_container_->instances_.end()) {
01686       DDS::InstanceHandle_t handle = it->first;
01687       ++it; // avoid mangling the iterator
01688 
01689       this->unregister_instance_i(handle, source_timestamp);
01690     }
01691   }
01692 }
01693 
01694 DDS::ReturnCode_t
01695 DataWriterImpl::write(DataSample* data,
01696                       DDS::InstanceHandle_t handle,
01697                       const DDS::Time_t& source_timestamp,
01698                       GUIDSeq* filter_out)
01699 {
01700   DBG_ENTRY_LVL("DataWriterImpl","write",6);
01701 
01702   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01703                     guard,
01704                     get_lock (),
01705                     ::DDS::RETCODE_ERROR);
01706 
01707   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
01708   GUIDSeq_var filter_out_var(filter_out);
01709 
01710   if (enabled_ == false) {
01711     ACE_ERROR_RETURN((LM_ERROR,
01712                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01713                       ACE_TEXT(" Entity is not enabled. \n")),
01714                      DDS::RETCODE_NOT_ENABLED);
01715   }
01716 
01717   DataSampleElement* element = 0;
01718   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01719 
01720   if (ret == DDS::RETCODE_TIMEOUT) {
01721     return ret; // silent for timeout
01722 
01723   } else if (ret != DDS::RETCODE_OK) {
01724     ACE_ERROR_RETURN((LM_ERROR,
01725                       ACE_TEXT("(%P|%t) ERROR: ")
01726                       ACE_TEXT("DataWriterImpl::write: ")
01727                       ACE_TEXT("obtain_buffer returned %d.\n"),
01728                       ret),
01729                      ret);
01730   }
01731 
01732   DataSample* temp;
01733   ret = create_sample_data_message(data,
01734                                    handle,
01735                                    element->get_header(),
01736                                    temp,
01737                                    source_timestamp,
01738                                    (filter_out != 0));
01739   element->set_sample(temp);
01740 
01741   if (ret != DDS::RETCODE_OK) {
01742     return ret;
01743   }
01744 
01745   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
01746 
01747   ret = this->data_container_->enqueue(element, handle);
01748 
01749   if (ret != DDS::RETCODE_OK) {
01750     ACE_ERROR_RETURN((LM_ERROR,
01751                       ACE_TEXT("(%P|%t) ERROR: ")
01752                       ACE_TEXT("DataWriterImpl::write: ")
01753                       ACE_TEXT("enqueue failed.\n")),
01754                      ret);
01755   }
01756   this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01757 
01758   track_sequence_number(filter_out);
01759 
01760   if (this->coherent_) {
01761     ++this->coherent_samples_;
01762   }
01763   SendStateDataSampleList list;
01764 
01765   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01766 
01767   if (this->publisher_servant_->is_suspended()) {
01768     if (min_suspended_transaction_id_ == 0) {
01769       //provides transaction id for lower bound of suspended transactions
01770       //or transaction id for single suspended write transaction
01771       min_suspended_transaction_id_ = transaction_id;
01772     } else {
01773       //when multiple write transactions have suspended, provides the upper bound
01774       //for suspended transactions.
01775       max_suspended_transaction_id_ = transaction_id;
01776     }
01777     this->available_data_list_.enqueue_tail(list);
01778 
01779   } else {
01780     guard.release();
01781 
01782     this->send(list, transaction_id);
01783   }
01784 
01785   return DDS::RETCODE_OK;
01786 }
01787 
01788 void
01789 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01790 {
01791   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01792 
01793 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01794   // Track individual expected sequence numbers in ReaderInfo
01795   RepoIdSet excluded;
01796 
01797   if (filter_out && !reader_info_.empty()) {
01798     const GUID_t* buf = filter_out->get_buffer();
01799     excluded.insert(buf, buf + filter_out->length());
01800   }
01801 
01802   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01803        end = reader_info_.end(); iter != end; ++iter) {
01804     // If not excluding this reader, update expected sequence
01805     if (excluded.count(iter->first) == 0) {
01806       iter->second.expected_sequence_ = sequence_number_;
01807     }
01808   }
01809 
01810 #else
01811   ACE_UNUSED_ARG(filter_out);
01812   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01813        end = reader_info_.end(); iter != end; ++iter) {
01814     iter->second.expected_sequence_ = sequence_number_;
01815   }
01816 
01817 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01818 
01819 }
01820 
01821 void
01822 DataWriterImpl::send_suspended_data()
01823 {
01824   //this serves to get TransportClient's max_transaction_id_seen_
01825   //to the correct value for this list of transactions
01826   if (max_suspended_transaction_id_ != 0) {
01827     this->send(this->available_data_list_, max_suspended_transaction_id_);
01828     max_suspended_transaction_id_ = 0;
01829   }
01830 
01831   //this serves to actually have the send proceed in
01832   //sending the samples to the datalinks by passing it
01833   //the min_suspended_transaction_id_ which should be the
01834   //TransportClient's expected_transaction_id_
01835   this->send(this->available_data_list_, min_suspended_transaction_id_);
01836   min_suspended_transaction_id_ = 0;
01837   this->available_data_list_.reset();
01838 }
01839 
01840 DDS::ReturnCode_t
01841 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01842                         const DDS::Time_t & source_timestamp)
01843 {
01844   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01845 
01846   if (enabled_ == false) {
01847     ACE_ERROR_RETURN((LM_ERROR,
01848                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01849                       ACE_TEXT(" Entity is not enabled. \n")),
01850                      DDS::RETCODE_NOT_ENABLED);
01851   }
01852 
01853   DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR;
01854 
01855   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01856 
01857   DataSample* registered_sample_data = 0;
01858   ret = this->data_container_->dispose(handle, registered_sample_data);
01859 
01860   if (ret != DDS::RETCODE_OK) {
01861     ACE_ERROR_RETURN((LM_ERROR,
01862                       ACE_TEXT("(%P|%t) ERROR: ")
01863                       ACE_TEXT("DataWriterImpl::dispose: ")
01864                       ACE_TEXT("dispose failed.\n")),
01865                      ret);
01866   }
01867 
01868   DataSampleElement* element = 0;
01869   ret = this->data_container_->obtain_buffer_for_control(element);
01870 
01871   if (ret != DDS::RETCODE_OK) {
01872     ACE_ERROR_RETURN((LM_ERROR,
01873                       ACE_TEXT("(%P|%t) ERROR: ")
01874                       ACE_TEXT("DataWriterImpl::dispose: ")
01875                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01876                       ret),
01877                      ret);
01878   }
01879 
01880   element->set_sample(create_control_message(DISPOSE_INSTANCE,
01881                                              element->get_header(),
01882                                              registered_sample_data,
01883                                              source_timestamp));
01884   ret = this->data_container_->enqueue_control(element);
01885 
01886   if (ret != DDS::RETCODE_OK) {
01887     ACE_ERROR_RETURN((LM_ERROR,
01888                       ACE_TEXT("(%P|%t) ERROR: ")
01889                       ACE_TEXT("DataWriterImpl::dispose: ")
01890                       ACE_TEXT("enqueue_control failed.\n")),
01891                      ret);
01892   }
01893 
01894   send_all_to_flush_control(guard);
01895 
01896   return DDS::RETCODE_OK;
01897 }
01898 
01899 DDS::ReturnCode_t
01900 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01901                             size_t&                 size)
01902 {
01903   return data_container_->num_samples(handle, size);
01904 }
01905 
01906 void
01907 DataWriterImpl::unregister_all()
01908 {
01909   if (cancel_timer_) {
01910     // The cancel_timer will call handle_close to remove_ref.
01911     (void) reactor_->cancel_timer(this, 0);
01912     cancel_timer_ = false;
01913   }
01914 
01915   data_container_->unregister_all();
01916 }
01917 
01918 RepoId
01919 DataWriterImpl::get_publication_id()
01920 {
01921   return publication_id_;
01922 }
01923 
01924 RepoId
01925 DataWriterImpl::get_dp_id()
01926 {
01927   return participant_servant_->get_id();
01928 }
01929 
01930 const char*
01931 DataWriterImpl::get_topic_name()
01932 {
01933   return topic_name_.in();
01934 }
01935 
01936 char const *
01937 DataWriterImpl::get_type_name() const
01938 {
01939   return type_name_.in();
01940 }
01941 
01942 ACE_Message_Block*
01943 DataWriterImpl::create_control_message(MessageId message_id,
01944                                        DataSampleHeader& header_data,
01945                                        ACE_Message_Block* data,
01946                                        const DDS::Time_t& source_timestamp)
01947 {
01948   header_data.message_id_ = message_id;
01949   header_data.byte_order_ =
01950     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01951   header_data.coherent_change_ = 0;
01952 
01953   if (data) {
01954     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01955 
01956     if (header_data.message_length_ == 0) {
01957       data->release();
01958     }
01959   }
01960 
01961   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01962   header_data.sequence_repair_ = false; // set below
01963   header_data.source_timestamp_sec_ = source_timestamp.sec;
01964   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01965   header_data.publication_id_ = publication_id_;
01966   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01967 
01968   if (message_id == INSTANCE_REGISTRATION
01969       || message_id == DISPOSE_INSTANCE
01970       || message_id == UNREGISTER_INSTANCE
01971       || message_id == DISPOSE_UNREGISTER_INSTANCE) {
01972 
01973     header_data.sequence_repair_ = need_sequence_repair();
01974 
01975     // Use the sequence number here for the sake of RTPS (where these
01976     // control messages map onto the Data Submessage).
01977     if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01978       this->sequence_number_ = SequenceNumber();
01979 
01980     } else {
01981       ++this->sequence_number_;
01982     }
01983 
01984     header_data.sequence_ = this->sequence_number_;
01985     header_data.key_fields_only_ = true;
01986   }
01987 
01988   ACE_Message_Block* message = 0;
01989   ACE_NEW_MALLOC_RETURN(message,
01990                         static_cast<ACE_Message_Block*>(
01991                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01992                         ACE_Message_Block(
01993                           DataSampleHeader::max_marshaled_size(),
01994                           ACE_Message_Block::MB_DATA,
01995                           header_data.message_length_ ? data : 0, //cont
01996                           0, //data
01997                           0, //allocator_strategy
01998                           get_db_lock(), //locking_strategy
01999                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02000                           ACE_Time_Value::zero,
02001                           ACE_Time_Value::max_time,
02002                           db_allocator_,
02003                           mb_allocator_),
02004                         0);
02005 
02006   *message << header_data;
02007 
02008   // If we incremented sequence number for this control message
02009   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02010     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02011     // Update the expected sequence number for all readers
02012     RepoIdToReaderInfoMap::iterator reader;
02013 
02014     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02015       reader->second.expected_sequence_ = sequence_number_;
02016     }
02017   }
02018   if (DCPS_debug_level >= 4) {
02019     const GuidConverter converter(publication_id_);
02020     ACE_DEBUG((LM_DEBUG,
02021                ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02022                ACE_TEXT("from publication %C sending control sample: %C .\n"),
02023                OPENDDS_STRING(converter).c_str(),
02024                to_string(header_data).c_str()));
02025   }
02026   return message;
02027 }
02028 
02029 DDS::ReturnCode_t
02030 DataWriterImpl::create_sample_data_message(DataSample* data,
02031                                            DDS::InstanceHandle_t instance_handle,
02032                                            DataSampleHeader& header_data,
02033                                            ACE_Message_Block*& message,
02034                                            const DDS::Time_t& source_timestamp,
02035                                            bool content_filter)
02036 {
02037   PublicationInstance* const instance =
02038     data_container_->get_handle_instance(instance_handle);
02039 
02040   if (0 == instance) {
02041     ACE_ERROR_RETURN((LM_ERROR,
02042                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02043                       ACE_TEXT("failed to find instance for handle %d\n"),
02044                       instance_handle),
02045                      DDS::RETCODE_ERROR);
02046   }
02047 
02048   header_data.message_id_ = SAMPLE_DATA;
02049   header_data.byte_order_ =
02050     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02051   header_data.coherent_change_ = this->coherent_;
02052 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02053   header_data.group_coherent_ =
02054     this->publisher_servant_->qos_.presentation.access_scope
02055     == DDS::GROUP_PRESENTATION_QOS;
02056 #endif
02057   header_data.content_filter_ = content_filter;
02058   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02059   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02060   header_data.sequence_repair_ = need_sequence_repair();
02061 
02062   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02063     this->sequence_number_ = SequenceNumber();
02064 
02065   } else {
02066     ++this->sequence_number_;
02067   }
02068 
02069   header_data.sequence_ = this->sequence_number_;
02070   header_data.source_timestamp_sec_ = source_timestamp.sec;
02071   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02072 
02073   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02074       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02075     header_data.lifespan_duration_ = true;
02076     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02077     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02078   }
02079 
02080   header_data.publication_id_ = publication_id_;
02081   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
02082   size_t max_marshaled_size = header_data.max_marshaled_size();
02083 
02084   ACE_NEW_MALLOC_RETURN(message,
02085                         static_cast<ACE_Message_Block*>(
02086                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02087                         ACE_Message_Block(max_marshaled_size,
02088                                           ACE_Message_Block::MB_DATA,
02089                                           data, //cont
02090                                           0, //data
02091                                           header_allocator_, //alloc_strategy
02092                                           get_db_lock(), //locking_strategy
02093                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02094                                           ACE_Time_Value::zero,
02095                                           ACE_Time_Value::max_time,
02096                                           db_allocator_,
02097                                           mb_allocator_),
02098                         DDS::RETCODE_ERROR);
02099 
02100   *message << header_data;
02101   if (DCPS_debug_level >= 4) {
02102     const GuidConverter converter(publication_id_);
02103     ACE_DEBUG((LM_DEBUG,
02104                ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02105                ACE_TEXT("from publication %C sending data sample: %C .\n"),
02106                OPENDDS_STRING(converter).c_str(),
02107                to_string(header_data).c_str()));
02108   }
02109   return DDS::RETCODE_OK;
02110 }
02111 
02112 void
02113 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02114 {
02115   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02116 
02117   if (!(sample->get_pub_id() == this->publication_id_)) {
02118     GuidConverter sample_converter(sample->get_pub_id());
02119     GuidConverter writer_converter(publication_id_);
02120     ACE_ERROR((LM_ERROR,
02121                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02122                ACE_TEXT(" The publication id %C from delivered element ")
02123                ACE_TEXT("does not match the datawriter's id %C\n"),
02124                OPENDDS_STRING(sample_converter).c_str(),
02125                OPENDDS_STRING(writer_converter).c_str()));
02126     return;
02127   }
02128   //provided for statistics tracking in tests
02129   ++data_delivered_count_;
02130 
02131   this->data_container_->data_delivered(sample);
02132 }
02133 
02134 void
02135 DataWriterImpl::control_delivered(ACE_Message_Block* sample)
02136 {
02137   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02138   sample->release();
02139   controlTracker.message_delivered();
02140 }
02141 
02142 EntityImpl*
02143 DataWriterImpl::parent() const
02144 {
02145   return this->publisher_servant_;
02146 }
02147 
02148 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02149 bool
02150 DataWriterImpl::filter_out(const DataSampleElement& elt,
02151                            const OPENDDS_STRING& filterClassName,
02152                            const FilterEvaluator& evaluator,
02153                            const DDS::StringSeq& expression_params) const
02154 {
02155   TypeSupportImpl* const typesupport =
02156     dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02157 
02158   if (!typesupport) {
02159     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02160     return false;
02161   }
02162 
02163   if (filterClassName == "DDSSQL" ||
02164       filterClassName == "OPENDDSSQL") {
02165     return !evaluator.eval(elt.get_sample()->cont(),
02166                            elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02167                            elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02168                            expression_params);
02169   }
02170   else {
02171     return false;
02172   }
02173 }
02174 #endif
02175 
02176 bool
02177 DataWriterImpl::check_transport_qos(const TransportInst&)
02178 {
02179   // DataWriter does not impose any constraints on which transports
02180   // may be used based on QoS.
02181   return true;
02182 }
02183 
02184 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02185 
02186 bool
02187 DataWriterImpl::coherent_changes_pending()
02188 {
02189   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02190                    guard,
02191                    get_lock(),
02192                    false);
02193 
02194   return this->coherent_;
02195 }
02196 
02197 void
02198 DataWriterImpl::begin_coherent_changes()
02199 {
02200   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02201             guard,
02202             get_lock());
02203 
02204   this->coherent_ = true;
02205 }
02206 
02207 void
02208 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02209 {
02210   // PublisherImpl::pi_lock_ should be held.
02211   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02212             guard,
02213             get_lock());
02214 
02215   CoherentChangeControl end_msg;
02216   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02217   end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02218   end_msg.group_coherent_
02219     = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02220 
02221   if (end_msg.group_coherent_) {
02222     end_msg.publisher_id_ = this->publisher_servant_->publisher_id_;
02223     end_msg.group_coherent_samples_ = group_samples;
02224   }
02225 
02226   ACE_Message_Block* data = 0;
02227   size_t max_marshaled_size = end_msg.max_marshaled_size();
02228 
02229   ACE_NEW(data, ACE_Message_Block(max_marshaled_size,
02230                                   ACE_Message_Block::MB_DATA,
02231                                   0, //cont
02232                                   0, //data
02233                                   0, //alloc_strategy
02234                                   get_db_lock()));
02235 
02236   Serializer serializer(
02237     data,
02238     this->swap_bytes());
02239 
02240   serializer << end_msg;
02241 
02242   DDS::Time_t source_timestamp =
02243     time_value_to_time(ACE_OS::gettimeofday());
02244 
02245   DataSampleHeader header;
02246   ACE_Message_Block* control =
02247     create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp);
02248 
02249 
02250   this->coherent_ = false;
02251   this->coherent_samples_ = 0;
02252 
02253   guard.release();
02254   if (this->send_control(header, control) == SEND_CONTROL_ERROR) {
02255     ACE_ERROR((LM_ERROR,
02256                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02257                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02258   }
02259 }
02260 
02261 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02262 
02263 void
02264 DataWriterImpl::data_dropped(const DataSampleElement* element,
02265                              bool dropped_by_transport)
02266 {
02267   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02268 
02269   //provided for statistics tracking in tests
02270   ++data_dropped_count_;
02271 
02272   this->data_container_->data_dropped(element, dropped_by_transport);
02273 }
02274 
02275 void
02276 DataWriterImpl::control_dropped(ACE_Message_Block* sample,
02277                                 bool /* dropped_by_transport */)
02278 {
02279   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02280   sample->release();
02281   controlTracker.message_dropped();
02282 }
02283 
02284 DDS::DataWriterListener_ptr
02285 DataWriterImpl::listener_for(DDS::StatusKind kind)
02286 {
02287   // per 2.1.4.3.1 Listener Access to Plain Communication Status
02288   // use this entities factory if listener is mask not enabled
02289   // for this kind.
02290   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02291     return publisher_servant_->listener_for(kind);
02292 
02293   } else {
02294     return DDS::DataWriterListener::_duplicate(listener_.in());
02295   }
02296 }
02297 
02298 int
02299 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02300                                const void * /* arg */)
02301 {
02302   const ACE_Time_Value delta = tv - last_liveliness_check_time_;
02303   if (delta < liveliness_check_interval_) {
02304     // Too early.  Reschedule.
02305     if (reactor_->cancel_timer(this) == -1) {
02306       ACE_ERROR((LM_ERROR,
02307                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02308                  ACE_TEXT("cancel_timer")));
02309     }
02310     if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) {
02311       ACE_ERROR((LM_ERROR,
02312                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02313                  ACE_TEXT("schedule_timer")));
02314     }
02315     return 0;
02316   }
02317 
02318   bool liveliness_lost = false;
02319 
02320   ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02321 
02322   // Do we need to send a liveliness message?
02323   if (elapsed >= liveliness_check_interval_) {
02324     switch (this->qos_.liveliness.kind) {
02325     case DDS::AUTOMATIC_LIVELINESS_QOS:
02326       if (this->send_liveliness(tv) == false) {
02327         liveliness_lost = true;
02328       }
02329       break;
02330 
02331     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02332       if (liveliness_asserted_) {
02333         if (this->send_liveliness(tv) == false) {
02334           liveliness_lost = true;
02335         }
02336       }
02337       break;
02338 
02339     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02340       // Do nothing.
02341       break;
02342     }
02343   }
02344 
02345   liveliness_asserted_ = false;
02346   last_liveliness_check_time_ = tv;
02347   elapsed = tv - last_liveliness_activity_time_;
02348 
02349   // Have we lost liveliness?
02350   if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02351     liveliness_lost = true;
02352   }
02353 
02354   if (!this->liveliness_lost_ && liveliness_lost) {
02355     ++ this->liveliness_lost_status_.total_count;
02356     ++ this->liveliness_lost_status_.total_count_change;
02357 
02358     DDS::DataWriterListener_var listener =
02359       listener_for(DDS::LIVELINESS_LOST_STATUS);
02360 
02361     if (!CORBA::is_nil(listener.in())) {
02362       listener->on_liveliness_lost(this->dw_local_objref_.in(),
02363                                    this->liveliness_lost_status_);
02364     }
02365   }
02366 
02367   this->liveliness_lost_ = liveliness_lost;
02368   return 0;
02369 }
02370 
02371 int
02372 DataWriterImpl::handle_close(ACE_HANDLE,
02373                              ACE_Reactor_Mask)
02374 {
02375   this->_remove_ref();
02376   return 0;
02377 }
02378 
02379 bool
02380 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02381 {
02382   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02383       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02384     DDS::Time_t t = time_value_to_time(now);
02385     DataSampleHeader header;
02386     ACE_Message_Block* liveliness_msg =
02387       this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t);
02388 
02389     if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) {
02390       ACE_ERROR_RETURN((LM_ERROR,
02391                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02392                         ACE_TEXT(" send_control failed. \n")),
02393                        false);
02394 
02395     } else {
02396       last_liveliness_activity_time_ = now;
02397       return true;
02398     }
02399   } else {
02400     last_liveliness_activity_time_ = now;
02401     return true;
02402   }
02403 }
02404 
02405 void
02406 DataWriterImpl::prepare_to_delete()
02407 {
02408   this->set_deleted(true);
02409   this->stop_associating();
02410 }
02411 
02412 PublicationInstance*
02413 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02414 {
02415   PublicationInstance* instance = 0;
02416 
02417   if (0 != data_container_) {
02418     instance = data_container_->get_handle_instance(handle);
02419   }
02420 
02421   return instance;
02422 }
02423 
02424 void
02425 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02426 {
02427   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02428 
02429   if (!is_bit_) {
02430     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02431     // is given to this DataWriter then narrow() fails.
02432     DataWriterListener_var the_listener =
02433       DataWriterListener::_narrow(this->listener_.in());
02434 
02435     if (!CORBA::is_nil(the_listener.in())) {
02436       PublicationDisconnectedStatus status;
02437       // Since this callback may come after remove_association which
02438       // removes the reader from id_to_handle map, we can ignore this
02439       // error.
02440       this->lookup_instance_handles(subids,
02441                                     status.subscription_handles);
02442       the_listener->on_publication_disconnected(this->dw_local_objref_.in(),
02443                                                 status);
02444     }
02445   }
02446 }
02447 
02448 void
02449 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02450 {
02451   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02452 
02453   if (!is_bit_) {
02454     // Narrow to DDS::DCPS::DataWriterListener. If a
02455     // DDS::DataWriterListener is given to this DataWriter then
02456     // narrow() fails.
02457     DataWriterListener_var the_listener =
02458       DataWriterListener::_narrow(this->listener_.in());
02459 
02460     if (!CORBA::is_nil(the_listener.in())) {
02461       PublicationDisconnectedStatus status;
02462 
02463       // If it's reconnected then the reader should be in id_to_handle
02464       // map otherwise log with an error.
02465       if (this->lookup_instance_handles(subids,
02466                                         status.subscription_handles) == false) {
02467         ACE_ERROR((LM_ERROR,
02468                    "(%P|%t) ERROR: DataWriterImpl::"
02469                    "notify_publication_reconnected: "
02470                    "lookup_instance_handles failed\n"));
02471       }
02472 
02473       the_listener->on_publication_reconnected(this->dw_local_objref_.in(),
02474                                                status);
02475     }
02476   }
02477 }
02478 
02479 void
02480 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02481 {
02482   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02483 
02484   if (!is_bit_) {
02485     // Narrow to DDS::DCPS::DataWriterListener. If a
02486     // DDS::DataWriterListener is given to this DataWriter then
02487     // narrow() fails.
02488     DataWriterListener_var the_listener =
02489       DataWriterListener::_narrow(this->listener_.in());
02490 
02491     if (!CORBA::is_nil(the_listener.in())) {
02492       PublicationLostStatus status;
02493 
02494       // Since this callback may come after remove_association which removes
02495       // the reader from id_to_handle map, we can ignore this error.
02496       this->lookup_instance_handles(subids,
02497                                     status.subscription_handles);
02498       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02499                                         status);
02500     }
02501   }
02502 }
02503 
02504 void
02505 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02506 {
02507   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02508 
02509   if (!is_bit_) {
02510     // Narrow to DDS::DCPS::DataWriterListener. If a
02511     // DDS::DataWriterListener is given to this DataWriter then
02512     // narrow() fails.
02513     DataWriterListener_var the_listener =
02514       DataWriterListener::_narrow(this->listener_.in());
02515 
02516     if (!CORBA::is_nil(the_listener.in())) {
02517       PublicationLostStatus status;
02518 
02519       CORBA::ULong len = handles.length();
02520       status.subscription_handles.length(len);
02521 
02522       for (CORBA::ULong i = 0; i < len; ++ i) {
02523         status.subscription_handles[i] = handles[i];
02524       }
02525 
02526       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02527                                         status);
02528     }
02529   }
02530 }
02531 
02532 void
02533 DataWriterImpl::notify_connection_deleted(const RepoId& peerId)
02534 {
02535   DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6);
02536   on_notification_of_connection_deletion(peerId);
02537   // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02538   // is given to this DataWriter then narrow() fails.
02539   DataWriterListener_var the_listener =
02540     DataWriterListener::_narrow(this->listener_.in());
02541 
02542   if (!CORBA::is_nil(the_listener.in()))
02543     the_listener->on_connection_deleted(this->dw_local_objref_.in());
02544 }
02545 
02546 bool
02547 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02548                                         DDS::InstanceHandleSeq & hdls)
02549 {
02550   if (DCPS_debug_level > 9) {
02551     CORBA::ULong const size = ids.length();
02552     OPENDDS_STRING separator;
02553     OPENDDS_STRING buffer;
02554 
02555     for (unsigned long i = 0; i < size; ++i) {
02556       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02557       separator = ", ";
02558     }
02559 
02560     ACE_DEBUG((LM_DEBUG,
02561                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02562                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02563                buffer.c_str()));
02564   }
02565 
02566   CORBA::ULong const num_rds = ids.length();
02567   hdls.length(num_rds);
02568 
02569   for (CORBA::ULong i = 0; i < num_rds; ++i) {
02570     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02571   }
02572 
02573   return true;
02574 }
02575 
02576 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02577 bool
02578 DataWriterImpl::persist_data()
02579 {
02580   return this->data_container_->persist_data();
02581 }
02582 #endif
02583 
02584 void
02585 DataWriterImpl::reschedule_deadline()
02586 {
02587   if (this->watchdog_ != 0) {
02588     this->data_container_->reschedule_deadline();
02589   }
02590 }
02591 
02592 bool
02593 DataWriterImpl::pending_control()
02594 {
02595   return controlTracker.pending_messages();
02596 }
02597 
02598 void
02599 DataWriterImpl::wait_control_pending()
02600 {
02601   OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02602   controlTracker.wait_messages_pending(caller_string);
02603 }
02604 
02605 void
02606 DataWriterImpl::wait_pending()
02607 {
02608   this->data_container_->wait_pending();
02609 }
02610 
02611 void
02612 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02613 {
02614   this->data_container_->get_instance_handles(instance_handles);
02615 }
02616 
02617 void
02618 DataWriterImpl::get_readers(RepoIdSet& readers)
02619 {
02620   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02621   readers = this->readers_;
02622 }
02623 
02624 void
02625 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02626 {
02627   this->publisher_servant_->get_qos(qos_data.pub_qos);
02628   qos_data.dw_qos = this->qos_;
02629   qos_data.topic_name = this->topic_name_.in();
02630 }
02631 
02632 bool
02633 DataWriterImpl::need_sequence_repair()
02634 {
02635   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02636   return need_sequence_repair_i();
02637 }
02638 
02639 bool
02640 DataWriterImpl::need_sequence_repair_i() const
02641 {
02642   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02643        end = reader_info_.end(); it != end; ++it) {
02644     if (it->second.expected_sequence_ != sequence_number_) {
02645       return true;
02646     }
02647   }
02648 
02649   return false;
02650 }
02651 
02652 SendControlStatus
02653 DataWriterImpl::send_control(const DataSampleHeader& header,
02654                              ACE_Message_Block* msg)
02655 {
02656   controlTracker.message_sent();
02657 
02658   SendControlStatus status = TransportClient::send_control(header, msg);
02659 
02660   if (status != SEND_CONTROL_OK) {
02661     controlTracker.message_dropped();
02662   }
02663 
02664   return status;
02665 }
02666 
02667 } // namespace DCPS
02668 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7