DataWriterImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025 
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029 
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033 
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038 
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042 
00043 #include "tao/ORB_Core.h"
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046 
00047 #include <stdexcept>
00048 
00049 namespace OpenDDS {
00050 namespace DCPS {
00051 
00052 //TBD - add check for enabled in most methods.
00053 //      currently this is not needed because auto_enable_created_entities
00054 //      cannot be false.
00055 
00056 DataWriterImpl::DataWriterImpl()
00057   : data_dropped_count_(0),
00058     data_delivered_count_(0),
00059     controlTracker("DataWriterImpl"),
00060     n_chunks_(TheServiceParticipant->n_chunks()),
00061     association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00062     qos_(TheServiceParticipant->initial_DataWriterQos()),
00063     participant_servant_(0),
00064     topic_id_(GUID_UNKNOWN),
00065     topic_servant_(0),
00066     listener_mask_(DEFAULT_STATUS_MASK),
00067     domain_id_(0),
00068     publisher_servant_(0),
00069     publication_id_(GUID_UNKNOWN),
00070     sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00071     coherent_(false),
00072     coherent_samples_(0),
00073     data_container_(0),
00074     liveliness_lost_(false),
00075     mb_allocator_(0),
00076     db_allocator_(0),
00077     header_allocator_(0),
00078     reactor_(0),
00079     liveliness_check_interval_(ACE_Time_Value::max_time),
00080     last_liveliness_activity_time_(ACE_Time_Value::zero),
00081     last_deadline_missed_total_count_(0),
00082     watchdog_(),
00083     cancel_timer_(false),
00084     is_bit_(false),
00085     initialized_(false),
00086     min_suspended_transaction_id_(0),
00087     max_suspended_transaction_id_(0),
00088     monitor_(0),
00089     periodic_monitor_(0),
00090     db_lock_pool_(0),
00091     liveliness_asserted_(false)
00092 {
00093   liveliness_lost_status_.total_count = 0;
00094   liveliness_lost_status_.total_count_change = 0;
00095 
00096   offered_deadline_missed_status_.total_count = 0;
00097   offered_deadline_missed_status_.total_count_change = 0;
00098   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00099 
00100   offered_incompatible_qos_status_.total_count = 0;
00101   offered_incompatible_qos_status_.total_count_change = 0;
00102   offered_incompatible_qos_status_.last_policy_id = 0;
00103   offered_incompatible_qos_status_.policies.length(0);
00104 
00105   publication_match_status_.total_count = 0;
00106   publication_match_status_.total_count_change = 0;
00107   publication_match_status_.current_count = 0;
00108   publication_match_status_.current_count_change = 0;
00109   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00110 
00111   monitor_ =
00112     TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00113   periodic_monitor_ =
00114     TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00115 
00116   db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_);
00117 }
00118 
00119 // This method is called when there are no longer any reference to the
00120 // the servant.
00121 DataWriterImpl::~DataWriterImpl()
00122 {
00123   DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00124 
00125   if (initialized_) {
00126     delete data_container_;
00127     delete mb_allocator_;
00128     delete db_allocator_;
00129     delete header_allocator_;
00130   }
00131   delete db_lock_pool_;
00132 }
00133 
00134 // this method is called when delete_datawriter is called.
00135 void
00136 DataWriterImpl::cleanup()
00137 {
00138   if (cancel_timer_) {
00139     // The cancel_timer will call handle_close to
00140     // remove_ref.
00141     (void) reactor_->cancel_timer(this, 0);
00142     cancel_timer_ = false;
00143   }
00144 
00145   // release our Topic_var
00146   topic_objref_ = DDS::Topic::_nil();
00147   topic_servant_->remove_entity_ref();
00148   topic_servant_->_remove_ref();
00149   topic_servant_ = 0;
00150 
00151   dw_local_objref_ = DDS::DataWriter::_nil();
00152 }
00153 
00154 void
00155 DataWriterImpl::init(
00156   DDS::Topic_ptr                       topic,
00157   TopicImpl *                            topic_servant,
00158   const DDS::DataWriterQos &           qos,
00159   DDS::DataWriterListener_ptr          a_listener,
00160   const DDS::StatusMask &              mask,
00161   OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00162   OpenDDS::DCPS::PublisherImpl *         publisher_servant,
00163   DDS::DataWriter_ptr                  dw_local)
00164 {
00165   DBG_ENTRY_LVL("DataWriterImpl","init",6);
00166   topic_objref_ = DDS::Topic::_duplicate(topic);
00167   topic_servant_ = topic_servant;
00168   topic_servant_->_add_ref();
00169   topic_servant_->add_entity_ref();
00170   topic_name_    = topic_servant_->get_name();
00171   topic_id_      = topic_servant_->get_id();
00172   type_name_     = topic_servant_->get_type_name();
00173 
00174 #if !defined (DDS_HAS_MINIMUM_BIT)
00175   is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00176             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00177             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00178             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00179 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00180 
00181   qos_ = qos;
00182 
00183   //Note: OK to _duplicate(nil).
00184   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00185   listener_mask_ = mask;
00186 
00187   // Only store the participant pointer, since it is our "grand"
00188   // parent, we will exist as long as it does.
00189   participant_servant_ = participant_servant;
00190   domain_id_ = participant_servant_->get_domain_id();
00191 
00192   // Only store the publisher pointer, since it is our parent, we will
00193   // exist as long as it does.
00194   publisher_servant_ = publisher_servant;
00195   dw_local_objref_   = DDS::DataWriter::_duplicate(dw_local);
00196 
00197   this->reactor_ = TheServiceParticipant->timer();
00198 
00199   initialized_ = true;
00200 }
00201 
00202 DDS::InstanceHandle_t
00203 DataWriterImpl::get_instance_handle()
00204 {
00205   return this->participant_servant_->id_to_handle(publication_id_);
00206 }
00207 
00208 DDS::InstanceHandle_t
00209 DataWriterImpl::get_next_handle()
00210 {
00211   return this->participant_servant_->id_to_handle(GUID_UNKNOWN);
00212 }
00213 
00214 void
00215 DataWriterImpl::add_association(const RepoId& yourId,
00216                                 const ReaderAssociation& reader,
00217                                 bool active)
00218 {
00219   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00220 
00221   if (DCPS_debug_level) {
00222     GuidConverter writer_converter(yourId);
00223     GuidConverter reader_converter(reader.readerId);
00224     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00225                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00226                OPENDDS_STRING(writer_converter).c_str(),
00227                OPENDDS_STRING(reader_converter).c_str()));
00228   }
00229 
00230   if (entity_deleted_.value()) {
00231     if (DCPS_debug_level)
00232       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00233                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00234 
00235     return;
00236   }
00237 
00238   if (GUID_UNKNOWN == publication_id_) {
00239     publication_id_ = yourId;
00240   }
00241 
00242   {
00243     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00244     reader_info_.insert(std::make_pair(reader.readerId,
00245                                        ReaderInfo(reader.filterClassName,
00246                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00247                                                   reader.exprParams, participant_servant_,
00248                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00249   }
00250 
00251   if (DCPS_debug_level > 4) {
00252     GuidConverter converter(get_publication_id());
00253     ACE_DEBUG((LM_DEBUG,
00254                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00255                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00256                OPENDDS_STRING(converter).c_str(),
00257                qos_.transport_priority.value));
00258   }
00259 
00260   AssociationData data;
00261   data.remote_id_ = reader.readerId;
00262   data.remote_data_ = reader.readerTransInfo;
00263   data.remote_reliable_ =
00264     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00265   data.remote_durable_ =
00266     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00267 
00268   if (!associate(data, active)) {
00269     //FUTURE: inform inforepo and try again as passive peer
00270     if (DCPS_debug_level) {
00271       ACE_DEBUG((LM_ERROR,
00272                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00273                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00274     }
00275   }
00276 }
00277 
00278 void
00279 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00280 {
00281   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00282 
00283   if (!(flags & ASSOC_OK)) {
00284     if (DCPS_debug_level) {
00285       const GuidConverter conv(remote_id);
00286       ACE_DEBUG((LM_ERROR,
00287                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00288                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00289                  OPENDDS_STRING(conv).c_str()));
00290     }
00291 
00292     return;
00293   }
00294   if (DCPS_debug_level) {
00295     const GuidConverter writer_conv(publication_id_);
00296     const GuidConverter conv(remote_id);
00297     ACE_DEBUG((LM_INFO,
00298                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00299                ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00300                OPENDDS_STRING(writer_conv).c_str(),
00301                OPENDDS_STRING(conv).c_str()));
00302   }
00303   if (flags & ASSOC_ACTIVE) {
00304 
00305     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00306 
00307     // Have we already received an association_complete() callback?
00308     if (assoc_complete_readers_.count(remote_id)) {
00309       if (DCPS_debug_level) {
00310         const GuidConverter writer_conv(publication_id_);
00311         const GuidConverter converter(remote_id);
00312         ACE_DEBUG((LM_DEBUG,
00313                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00314                    ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00315                    OPENDDS_STRING(writer_conv).c_str(),
00316                    OPENDDS_STRING(converter).c_str()));
00317       }
00318       assoc_complete_readers_.erase(remote_id);
00319       association_complete_i(remote_id);
00320 
00321       // Add to pending_readers_ -> pending means we are waiting
00322       // for the association_complete() callback.
00323 
00324     } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00325       const GuidConverter converter(remote_id);
00326       ACE_ERROR((LM_ERROR,
00327                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00328                  ACE_TEXT("failed to mark %C as pending.\n"),
00329                  OPENDDS_STRING(converter).c_str()));
00330 
00331     } else {
00332       if (DCPS_debug_level) {
00333         const GuidConverter converter(remote_id);
00334         ACE_DEBUG((LM_DEBUG,
00335                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00336                    ACE_TEXT("marked %C as pending.\n"),
00337                    OPENDDS_STRING(converter).c_str()));
00338       }
00339     }
00340 
00341   } else {
00342     // In the current implementation, DataWriter is always active, so this
00343     // code will not be applicable.
00344     if (DCPS_debug_level) {
00345       const GuidConverter conv(publication_id_);
00346       ACE_DEBUG((LM_ERROR,
00347                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00348                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00349                  OPENDDS_STRING(conv).c_str()));
00350     }
00351     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00352     disco->association_complete(domain_id_, participant_servant_->get_id(),
00353                                 publication_id_, remote_id);
00354   }
00355 }
00356 
00357 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00358                                        const char* filter,
00359                                        const DDS::StringSeq& params,
00360                                        DomainParticipantImpl* participant,
00361                                        bool durable)
00362 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00363   : participant_(participant)
00364   , filter_class_name_(filterClassName)
00365   , filter_(filter)
00366   , expression_params_(params)
00367   , eval_(*filter ? participant->get_filter_eval(filter) : 0)
00368   , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00369   , durable_(durable)
00370 {}
00371 #else
00372   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00373   , durable_(durable)
00374 {
00375   ACE_UNUSED_ARG(filterClassName);
00376   ACE_UNUSED_ARG(filter);
00377   ACE_UNUSED_ARG(params);
00378   ACE_UNUSED_ARG(participant);
00379 }
00380 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00381 
00382 DataWriterImpl::ReaderInfo::~ReaderInfo()
00383 {
00384 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00385   eval_ = RcHandle<FilterEvaluator>();
00386 
00387   if (!filter_.empty()) {
00388     participant_->deref_filter_eval(filter_.c_str());
00389   }
00390 
00391 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00392 }
00393 
00394 void
00395 DataWriterImpl::association_complete(const RepoId& remote_id)
00396 {
00397   DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00398 
00399   if (DCPS_debug_level >= 1) {
00400     GuidConverter writer_converter(this->publication_id_);
00401     GuidConverter reader_converter(remote_id);
00402     ACE_DEBUG((LM_DEBUG,
00403                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00404                ACE_TEXT("bit %d local %C remote %C\n"),
00405                is_bit_,
00406                OPENDDS_STRING(writer_converter).c_str(),
00407                OPENDDS_STRING(reader_converter).c_str()));
00408   }
00409 
00410   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00411 
00412   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00413     if (DCPS_debug_level) {
00414       GuidConverter writer_converter(this->publication_id_);
00415       GuidConverter reader_converter(remote_id);
00416       ACE_DEBUG((LM_DEBUG,
00417                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00418                  ACE_TEXT("bit %d local %C did not find pending reader: %C")
00419                  ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00420                  is_bit_,
00421                  OPENDDS_STRING(writer_converter).c_str(),
00422                  OPENDDS_STRING(reader_converter).c_str()));
00423     }
00424     // Not found in pending_readers_, defer calling association_complete_i()
00425     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00426     assoc_complete_readers_.insert(remote_id);
00427 
00428   } else {
00429     association_complete_i(remote_id);
00430   }
00431 }
00432 
00433 void
00434 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00435 {
00436   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00437 
00438   if (DCPS_debug_level >= 1) {
00439     GuidConverter writer_converter(this->publication_id_);
00440     GuidConverter reader_converter(remote_id);
00441     ACE_DEBUG((LM_DEBUG,
00442                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00443                ACE_TEXT("bit %d local %C remote %C\n"),
00444                is_bit_,
00445                OPENDDS_STRING(writer_converter).c_str(),
00446                OPENDDS_STRING(reader_converter).c_str()));
00447   }
00448 
00449   bool reader_durable = false;
00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00451   OPENDDS_STRING filterClassName;
00452   RcHandle<FilterEvaluator> eval;
00453   DDS::StringSeq expression_params;
00454 #endif
00455   {
00456     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00457 
00458     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00459       GuidConverter converter(remote_id);
00460       ACE_ERROR((LM_ERROR,
00461                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00462                  ACE_TEXT("insert %C from pending failed.\n"),
00463                  OPENDDS_STRING(converter).c_str()));
00464     }
00465   }
00466   {
00467     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00468     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00469 
00470     if (it != reader_info_.end()) {
00471       reader_durable = it->second.durable_;
00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00473       filterClassName = it->second.filter_class_name_;
00474       eval = it->second.eval_;
00475       expression_params = it->second.expression_params_;
00476 #endif
00477     }
00478   }
00479 
00480   if (this->monitor_) {
00481     this->monitor_->report();
00482   }
00483 
00484   if (!is_bit_) {
00485 
00486     DDS::InstanceHandle_t handle =
00487       this->participant_servant_->id_to_handle(remote_id);
00488 
00489     {
00490       // protect publication_match_status_ and status changed flags.
00491       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00492 
00493       // update the publication_match_status_
00494       ++publication_match_status_.total_count;
00495       ++publication_match_status_.total_count_change;
00496       ++publication_match_status_.current_count;
00497       ++publication_match_status_.current_count_change;
00498 
00499       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00500         GuidConverter converter(remote_id);
00501         ACE_DEBUG((LM_WARNING,
00502                    ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00503                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00504                    OPENDDS_STRING(converter).c_str(),
00505                    handle));
00506         return;
00507 
00508       } else if (DCPS_debug_level > 4) {
00509         GuidConverter converter(remote_id);
00510         ACE_DEBUG((LM_DEBUG,
00511                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00512                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00513                    OPENDDS_STRING(converter).c_str(),
00514                    handle));
00515       }
00516 
00517       publication_match_status_.last_subscription_handle = handle;
00518 
00519       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00520     }
00521 
00522     DDS::DataWriterListener_var listener =
00523       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00524 
00525     if (!CORBA::is_nil(listener.in())) {
00526 
00527       listener->on_publication_matched(dw_local_objref_.in(),
00528                                        publication_match_status_);
00529 
00530       // TBD - why does the spec say to change this but not
00531       // change the ChangeFlagStatus after a listener call?
00532       publication_match_status_.total_count_change = 0;
00533       publication_match_status_.current_count_change = 0;
00534     }
00535 
00536     notify_status_condition();
00537   }
00538 
00539   // Support DURABILITY QoS
00540   if (reader_durable) {
00541     // Tell the WriteDataContainer to resend all sending/sent
00542     // samples.
00543     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00545                                          , filterClassName, eval.in(), expression_params
00546 #endif
00547                                         );
00548 
00549     // Acquire the data writer container lock to avoid deadlock. The
00550     // thread calling association_complete() has to acquire lock in the
00551     // same order as the write()/register() operation.
00552 
00553     // Since the thread calling association_complete() is the ORB
00554     // thread, it may have some performance penalty. If the
00555     // performance is an issue, we may need a new thread to handle the
00556     // data_available() calls.
00557     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00558               guard,
00559               this->get_lock());
00560 
00561     SendStateDataSampleList list = this->get_resend_data();
00562     {
00563       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00564       // Update the reader's expected sequence
00565       SequenceNumber& seq =
00566         reader_info_.find(remote_id)->second.expected_sequence_;
00567 
00568       for (SendStateDataSampleList::iterator list_el = list.begin();
00569            list_el != list.end(); ++list_el) {
00570         list_el->get_header().historic_sample_ = true;
00571 
00572         if (list_el->get_header().sequence_ > seq) {
00573           seq = list_el->get_header().sequence_;
00574         }
00575       }
00576     }
00577 
00578     if (this->publisher_servant_->is_suspended()) {
00579       this->available_data_list_.enqueue_tail(list);
00580 
00581     } else {
00582       if (DCPS_debug_level >= 4) {
00583         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00584       }
00585 
00586       size_t size = 0, padding = 0;
00587       gen_find_size(remote_id, size, padding);
00588       ACE_Message_Block* const data =
00589         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00590                               get_db_lock());
00591       Serializer ser(data);
00592       ser << remote_id;
00593 
00594       const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00595       DataSampleHeader header;
00596       ACE_Message_Block* const end_historic_samples =
00597         create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp);
00598 
00599       this->controlTracker.message_sent();
00600       guard.release();
00601       SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id);
00602       if (ret == SEND_CONTROL_ERROR) {
00603         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00604                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
00605                              ACE_TEXT("send_w_control failed.\n")));
00606         this->controlTracker.message_dropped();
00607       }
00608     }
00609   }
00610 }
00611 
00612 void
00613 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00614                                     CORBA::Boolean notify_lost)
00615 {
00616   if (readers.length() == 0) {
00617     return;
00618   }
00619 
00620   if (DCPS_debug_level >= 1) {
00621     GuidConverter writer_converter(publication_id_);
00622     GuidConverter reader_converter(readers[0]);
00623     ACE_DEBUG((LM_DEBUG,
00624                ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00625                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00626                is_bit_,
00627                OPENDDS_STRING(writer_converter).c_str(),
00628                OPENDDS_STRING(reader_converter).c_str(),
00629                readers.length()));
00630   }
00631 
00632   // stop pending associations for these reader ids
00633   this->stop_associating(readers.get_buffer(), readers.length());
00634 
00635   ReaderIdSeq fully_associated_readers;
00636   CORBA::ULong fully_associated_len = 0;
00637   ReaderIdSeq rds;
00638   CORBA::ULong rds_len = 0;
00639   DDS::InstanceHandleSeq handles;
00640 
00641   ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00642   {
00643     // Ensure the same acquisition order as in wait_for_acknowledgments().
00644     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00645     //Remove the readers from fully associated reader list.
00646     //If the supplied reader is not in the cached reader list then it is
00647     //already removed. We just need remove the readers in the list that have
00648     //not been removed.
00649 
00650     CORBA::ULong len = readers.length();
00651 
00652     for (CORBA::ULong i = 0; i < len; ++i) {
00653       //Remove the readers from fully associated reader list. If it's not
00654       //in there, the association_complete() is not called yet and remove it
00655       //from pending list.
00656 
00657       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00658         ++ fully_associated_len;
00659         fully_associated_readers.length(fully_associated_len);
00660         fully_associated_readers [fully_associated_len - 1] = readers[i];
00661 
00662         ++ rds_len;
00663         rds.length(rds_len);
00664         rds [rds_len - 1] = readers[i];
00665 
00666       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00667         ++ rds_len;
00668         rds.length(rds_len);
00669         rds [rds_len - 1] = readers[i];
00670 
00671         GuidConverter converter(readers[i]);
00672         ACE_DEBUG((LM_WARNING,
00673                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00674                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00675                    OPENDDS_STRING(converter).c_str()));
00676       }
00677 
00678       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00679       reader_info_.erase(readers[i]);
00680       //else reader is already removed which indicates remove_association()
00681       //is called multiple times.
00682     }
00683 
00684     if (fully_associated_len > 0 && !is_bit_) {
00685       // The reader should be in the id_to_handle map at this time so
00686       // log with error.
00687       if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00688         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::remove_associations: "
00689                    "lookup_instance_handles failed, notify %d \n", notify_lost));
00690 
00691         return;
00692       }
00693 
00694       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00695         id_to_handle_map_.erase(fully_associated_readers[i]);
00696       }
00697     }
00698 
00699     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00700     // association_complete() here.
00701     if (!this->is_bit_) {
00702 
00703       // Derive the change in the number of subscriptions reading this writer.
00704       int matchedSubscriptions =
00705         static_cast<int>(this->id_to_handle_map_.size());
00706       this->publication_match_status_.current_count_change =
00707         matchedSubscriptions - this->publication_match_status_.current_count;
00708 
00709       // Only process status if the number of subscriptions has changed.
00710       if (this->publication_match_status_.current_count_change != 0) {
00711         this->publication_match_status_.current_count = matchedSubscriptions;
00712 
00713         /// Section 7.1.4.1: total_count will not decrement.
00714 
00715         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00716         this->publication_match_status_.last_subscription_handle =
00717           handles[fully_associated_len - 1];
00718 
00719         set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00720 
00721         DDS::DataWriterListener_var listener =
00722           this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00723 
00724         if (!CORBA::is_nil(listener.in())) {
00725           listener->on_publication_matched(
00726             this->dw_local_objref_.in(),
00727             this->publication_match_status_);
00728 
00729           // Listener consumes the change.
00730           this->publication_match_status_.total_count_change = 0;
00731           this->publication_match_status_.current_count_change = 0;
00732         }
00733 
00734         this->notify_status_condition();
00735       }
00736     }
00737   }
00738 
00739   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00740     this->disassociate(rds[i]);
00741   }
00742 
00743   // If this remove_association is invoked when the InfoRepo
00744   // detects a lost reader then make a callback to notify
00745   // subscription lost.
00746   if (notify_lost && handles.length() > 0) {
00747     this->notify_publication_lost(handles);
00748   }
00749 }
00750 
00751 void DataWriterImpl::remove_all_associations()
00752 {
00753   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00754   // stop pending associations
00755   this->stop_associating();
00756 
00757   OpenDDS::DCPS::ReaderIdSeq readers;
00758   CORBA::ULong size;
00759   CORBA::ULong num_pending_readers;
00760   {
00761     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00762 
00763     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00764     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00765     readers.length(size);
00766 
00767     RepoIdSet::iterator itEnd = readers_.end();
00768     int i = 0;
00769 
00770     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00771       readers[i ++] = *it;
00772     }
00773 
00774     itEnd = pending_readers_.end();
00775 
00776     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00777       readers[i ++] = *it;
00778     }
00779 
00780     if (num_pending_readers > 0) {
00781       ACE_DEBUG((LM_WARNING,
00782                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00783                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00784                  num_pending_readers));
00785     }
00786   }
00787 
00788   try {
00789     if (0 < size) {
00790       CORBA::Boolean dont_notify_lost = false;
00791 
00792       this->remove_associations(readers, dont_notify_lost);
00793     }
00794 
00795   } catch (const CORBA::Exception&) {
00796       ACE_DEBUG((LM_WARNING,
00797                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00798                  ACE_TEXT("caught exception from remove_associations.\n")));
00799   }
00800 }
00801 
00802 void
00803 DataWriterImpl::register_for_reader(const RepoId& participant,
00804                                     const RepoId& writerid,
00805                                     const RepoId& readerid,
00806                                     const TransportLocatorSeq& locators,
00807                                     DiscoveryListener* listener)
00808 {
00809   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00810 }
00811 
00812 void
00813 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00814                                       const RepoId& writerid,
00815                                       const RepoId& readerid)
00816 {
00817   TransportClient::unregister_for_reader(participant, writerid, readerid);
00818 }
00819 
00820 void
00821 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00822 {
00823   DDS::DataWriterListener_var listener =
00824     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00825 
00826   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00827 
00828 #if 0
00829 
00830   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00831     // This test should make the method idempotent.
00832     return;
00833   }
00834 
00835 #endif
00836 
00837   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00838 
00839   // copy status and increment change
00840   offered_incompatible_qos_status_.total_count = status.total_count;
00841   offered_incompatible_qos_status_.total_count_change +=
00842     status.count_since_last_send;
00843   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00844   offered_incompatible_qos_status_.policies = status.policies;
00845 
00846   if (!CORBA::is_nil(listener.in())) {
00847     listener->on_offered_incompatible_qos(dw_local_objref_.in(),
00848                                           offered_incompatible_qos_status_);
00849 
00850     // TBD - Why does the spec say to change this but not change the
00851     //       ChangeFlagStatus after a listener call?
00852     offered_incompatible_qos_status_.total_count_change = 0;
00853   }
00854 
00855   notify_status_condition();
00856 }
00857 
00858 void
00859 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00860                                            const DDS::StringSeq& params)
00861 {
00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00863   ACE_UNUSED_ARG(readerId);
00864   ACE_UNUSED_ARG(params);
00865 #else
00866   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00867   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00868   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00869 
00870   if (iter != reader_info_.end()) {
00871     iter->second.expression_params_ = params;
00872 
00873   } else if (DCPS_debug_level > 4 &&
00874              TheServiceParticipant->publisher_content_filter()) {
00875     GuidConverter pubConv(this->publication_id_), subConv(readerId);
00876     ACE_DEBUG((LM_WARNING,
00877                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00878                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00879                OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00880   }
00881 
00882 #endif
00883 }
00884 
00885 void
00886 DataWriterImpl::inconsistent_topic()
00887 {
00888   topic_servant_->inconsistent_topic();
00889 }
00890 
00891 DDS::ReturnCode_t
00892 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00893 {
00894 
00895   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00896   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00897   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00898   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00899   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00900 
00901   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00902     if (qos_ == qos)
00903       return DDS::RETCODE_OK;
00904 
00905     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00906       return DDS::RETCODE_IMMUTABLE_POLICY;
00907 
00908     } else {
00909       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00910       DDS::PublisherQos publisherQos;
00911       this->publisher_servant_->get_qos(publisherQos);
00912       const bool status
00913         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00914                                         this->participant_servant_->get_id(),
00915                                         this->publication_id_,
00916                                         qos,
00917                                         publisherQos);
00918 
00919       if (!status) {
00920         ACE_ERROR_RETURN((LM_ERROR,
00921                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00922                           ACE_TEXT("qos not updated. \n")),
00923                          DDS::RETCODE_ERROR);
00924       }
00925     }
00926 
00927     if (!(qos_ == qos)) {
00928       // Reset the deadline timer if the period has changed.
00929       if (qos_.deadline.period.sec != qos.deadline.period.sec
00930           || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00931         if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00932             && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00933           this->watchdog_ =
00934                              new OfferedDeadlineWatchdog(
00935                                this->lock_,
00936                                qos.deadline,
00937                                this,
00938                                this->dw_local_objref_.in(),
00939                                this->offered_deadline_missed_status_,
00940                                this->last_deadline_missed_total_count_);
00941 
00942         } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00943                    && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00944           this->watchdog_->cancel_all();
00945           this->watchdog_->destroy();
00946           this->watchdog_ = 0;
00947 
00948         } else {
00949           this->watchdog_->reset_interval(
00950             duration_to_time_value(qos.deadline.period));
00951         }
00952       }
00953 
00954       qos_ = qos;
00955     }
00956 
00957     return DDS::RETCODE_OK;
00958 
00959   } else {
00960     return DDS::RETCODE_INCONSISTENT_POLICY;
00961   }
00962 }
00963 
00964 DDS::ReturnCode_t
00965 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00966 {
00967   qos = qos_;
00968   return DDS::RETCODE_OK;
00969 }
00970 
00971 DDS::ReturnCode_t
00972 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00973                              DDS::StatusMask mask)
00974 {
00975   listener_mask_ = mask;
00976   //note: OK to duplicate  a nil object ref
00977   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00978   return DDS::RETCODE_OK;
00979 }
00980 
00981 DDS::DataWriterListener_ptr
00982 DataWriterImpl::get_listener()
00983 {
00984   return DDS::DataWriterListener::_duplicate(listener_.in());
00985 }
00986 
00987 DDS::Topic_ptr
00988 DataWriterImpl::get_topic()
00989 {
00990   return DDS::Topic::_duplicate(topic_objref_.in());
00991 }
00992 
00993 bool
00994 DataWriterImpl::should_ack() const
00995 {
00996   // N.B. It may be worthwhile to investigate a more efficient
00997   // heuristic for determining if a writer should send SAMPLE_ACK
00998   // control samples. Perhaps based on a sequence number delta?
00999   return this->readers_.size() != 0;
01000 }
01001 
01002 DataWriterImpl::AckToken
01003 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
01004 {
01005   if (DCPS_debug_level > 0) {
01006     ACE_DEBUG((LM_DEBUG,
01007                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
01008                ACE_TEXT("for sequence %q \n"),
01009                this->sequence_number_.getValue()));
01010   }
01011   return AckToken(max_wait, this->sequence_number_);
01012 }
01013 
01014 DDS::ReturnCode_t
01015 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01016 {
01017   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01018     return DDS::RETCODE_OK;
01019   DataWriterImpl::AckToken token = create_ack_token(max_wait);
01020   if (DCPS_debug_level) {
01021     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01022                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01023                           token.sequence_.getValue()));
01024   }
01025   return wait_for_specific_ack(token);
01026 }
01027 
01028 DDS::ReturnCode_t
01029 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01030 {
01031   return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01032 }
01033 
01034 DDS::Publisher_ptr
01035 DataWriterImpl::get_publisher()
01036 {
01037   return DDS::Publisher::_duplicate(publisher_servant_);
01038 }
01039 
01040 DDS::ReturnCode_t
01041 DataWriterImpl::get_liveliness_lost_status(
01042   DDS::LivelinessLostStatus & status)
01043 {
01044   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01045                    guard,
01046                    this->lock_,
01047                    DDS::RETCODE_ERROR);
01048   set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01049   status = liveliness_lost_status_;
01050   liveliness_lost_status_.total_count_change = 0;
01051   return DDS::RETCODE_OK;
01052 }
01053 
01054 DDS::ReturnCode_t
01055 DataWriterImpl::get_offered_deadline_missed_status(
01056   DDS::OfferedDeadlineMissedStatus & status)
01057 {
01058   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01059                    guard,
01060                    this->lock_,
01061                    DDS::RETCODE_ERROR);
01062 
01063   set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01064 
01065   this->offered_deadline_missed_status_.total_count_change =
01066     this->offered_deadline_missed_status_.total_count
01067     - this->last_deadline_missed_total_count_;
01068 
01069   // Update for next status check.
01070   this->last_deadline_missed_total_count_ =
01071     this->offered_deadline_missed_status_.total_count;
01072 
01073   status = offered_deadline_missed_status_;
01074 
01075   this->offered_deadline_missed_status_.total_count_change = 0;
01076 
01077   return DDS::RETCODE_OK;
01078 }
01079 
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_incompatible_qos_status(
01082   DDS::OfferedIncompatibleQosStatus & status)
01083 {
01084   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085                    guard,
01086                    this->lock_,
01087                    DDS::RETCODE_ERROR);
01088   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01089   status = offered_incompatible_qos_status_;
01090   offered_incompatible_qos_status_.total_count_change = 0;
01091   return DDS::RETCODE_OK;
01092 }
01093 
01094 DDS::ReturnCode_t
01095 DataWriterImpl::get_publication_matched_status(
01096   DDS::PublicationMatchedStatus & status)
01097 {
01098   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01099                    guard,
01100                    this->lock_,
01101                    DDS::RETCODE_ERROR);
01102   set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01103   status = publication_match_status_;
01104   publication_match_status_.total_count_change = 0;
01105   publication_match_status_.current_count_change = 0;
01106   return DDS::RETCODE_OK;
01107 }
01108 
01109 DDS::ReturnCode_t
01110 DataWriterImpl::assert_liveliness()
01111 {
01112   switch (this->qos_.liveliness.kind) {
01113   case DDS::AUTOMATIC_LIVELINESS_QOS:
01114     // Do nothing.
01115     break;
01116   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01117     return participant_servant_->assert_liveliness();
01118   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01119     if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01120       return DDS::RETCODE_ERROR;
01121     }
01122     break;
01123   }
01124 
01125   return DDS::RETCODE_OK;
01126 }
01127 
01128 DDS::ReturnCode_t
01129 DataWriterImpl::assert_liveliness_by_participant()
01130 {
01131   // This operation is called by participant.
01132 
01133   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01134     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
01135     liveliness_asserted_ = true;
01136   }
01137 
01138   return DDS::RETCODE_OK;
01139 }
01140 
01141 ACE_Time_Value
01142 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01143 {
01144   if (this->qos_.liveliness.kind == kind) {
01145     return liveliness_check_interval_;
01146   } else {
01147     return ACE_Time_Value::max_time;
01148   }
01149 }
01150 
01151 bool
01152 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01153 {
01154   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01155     return last_liveliness_activity_time_ > tv;
01156   } else {
01157     return false;
01158   }
01159 }
01160 
01161 DDS::ReturnCode_t
01162 DataWriterImpl::get_matched_subscriptions(
01163   DDS::InstanceHandleSeq & subscription_handles)
01164 {
01165   if (enabled_ == false) {
01166     ACE_ERROR_RETURN((LM_ERROR,
01167                       ACE_TEXT("(%P|%t) ERROR: ")
01168                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01169                       ACE_TEXT(" Entity is not enabled. \n")),
01170                      DDS::RETCODE_NOT_ENABLED);
01171   }
01172 
01173   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01174                    guard,
01175                    this->lock_,
01176                    DDS::RETCODE_ERROR);
01177 
01178   // Copy out the handles for the current set of subscriptions.
01179   int index = 0;
01180   subscription_handles.length(
01181     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01182 
01183   for (RepoIdToHandleMap::iterator
01184        current = this->id_to_handle_map_.begin();
01185        current != this->id_to_handle_map_.end();
01186        ++current, ++index) {
01187     subscription_handles[index] = current->second;
01188   }
01189 
01190   return DDS::RETCODE_OK;
01191 }
01192 
01193 #if !defined (DDS_HAS_MINIMUM_BIT)
01194 DDS::ReturnCode_t
01195 DataWriterImpl::get_matched_subscription_data(
01196   DDS::SubscriptionBuiltinTopicData & subscription_data,
01197   DDS::InstanceHandle_t subscription_handle)
01198 {
01199   if (enabled_ == false) {
01200     ACE_ERROR_RETURN((LM_ERROR,
01201                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01202                       ACE_TEXT("get_matched_subscription_data: ")
01203                       ACE_TEXT("Entity is not enabled. \n")),
01204                      DDS::RETCODE_NOT_ENABLED);
01205   }
01206 
01207   BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader,
01208                DDS::SubscriptionBuiltinTopicDataDataReader_var,
01209                DDS::SubscriptionBuiltinTopicDataSeq > hh;
01210 
01211   DDS::SubscriptionBuiltinTopicDataSeq data;
01212 
01213   DDS::ReturnCode_t ret =
01214     hh.instance_handle_to_bit_data(participant_servant_,
01215                                    BUILT_IN_SUBSCRIPTION_TOPIC,
01216                                    subscription_handle,
01217                                    data);
01218 
01219   if (ret == DDS::RETCODE_OK) {
01220     subscription_data = data[0];
01221   }
01222 
01223   return ret;
01224 }
01225 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01226 
01227 DDS::ReturnCode_t
01228 DataWriterImpl::enable()
01229 {
01230   //According spec:
01231   // - Calling enable on an already enabled Entity returns OK and has no
01232   // effect.
01233   // - Calling enable on an Entity whose factory is not enabled will fail
01234   // and return PRECONDITION_NOT_MET.
01235 
01236   if (this->is_enabled()) {
01237     return DDS::RETCODE_OK;
01238   }
01239 
01240   if (this->publisher_servant_->is_enabled() == false) {
01241     return DDS::RETCODE_PRECONDITION_NOT_MET;
01242   }
01243 
01244   // Note: do configuration based on QoS in enable() because
01245   //       before enable is called the QoS can be changed -- even
01246   //       for Changeable=NO
01247 
01248   // Configure WriteDataContainer constructor parameters from qos.
01249 
01250   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01251 
01252   CORBA::Long const depth =
01253     get_instance_sample_list_depth(
01254       qos_.history.kind,
01255       qos_.history.depth,
01256       qos_.resource_limits.max_samples_per_instance);
01257 
01258   bool resource_blocking = false;
01259 
01260   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01261     n_chunks_ = qos_.resource_limits.max_samples;
01262 
01263     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) {
01264       resource_blocking = true;
01265 
01266     } else {
01267       resource_blocking =
01268         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01269         || (qos_.resource_limits.max_samples <
01270             (qos_.resource_limits.max_instances * depth));
01271     }
01272   }
01273 
01274   //else using value from Service_Participant
01275 
01276   // enable the type specific part of this DataWriter
01277   this->enable_specific();
01278 
01279   CORBA::Long max_instances = 0;
01280 
01281   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01282     max_instances = qos_.resource_limits.max_instances;
01283 
01284   CORBA::Long max_total_samples = 0;
01285 
01286   if (reliable && resource_blocking)
01287     max_total_samples = qos_.resource_limits.max_samples;
01288 
01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01290   // Get data durability cache if DataWriter QoS requires durable
01291   // samples.  Publisher servant retains ownership of the cache.
01292   DataDurabilityCache* const durability_cache =
01293     TheServiceParticipant->get_data_durability_cache(qos_.durability);
01294 #endif
01295 
01296   //Note: the QoS used to set n_chunks_ is Changable=No so
01297   // it is OK that we cannot change the size of our allocators.
01298   data_container_ = new WriteDataContainer(this,
01299                                            depth,
01300                                            qos_.reliability.max_blocking_time,
01301                                            n_chunks_,
01302                                            domain_id_,
01303                                            get_topic_name(),
01304                                            get_type_name(),
01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01306                                            durability_cache,
01307                                            qos_.durability_service,
01308 #endif
01309                                            max_instances,
01310                                            max_total_samples);
01311 
01312   // +1 because we might allocate one before releasing another
01313   // TBD - see if this +1 can be removed.
01314   mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_);
01315   db_allocator_ = new DataBlockAllocator(n_chunks_+1);
01316   header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1);
01317 
01318   if (DCPS_debug_level >= 2) {
01319     ACE_DEBUG((LM_DEBUG,
01320                "(%P|%t) DataWriterImpl::enable-mb"
01321                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01322                mb_allocator_,
01323                n_chunks_));
01324 
01325     ACE_DEBUG((LM_DEBUG,
01326                "(%P|%t) DataWriterImpl::enable-db"
01327                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01328                db_allocator_,
01329                n_chunks_));
01330 
01331     ACE_DEBUG((LM_DEBUG,
01332                "(%P|%t) DataWriterImpl::enable-header"
01333                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01334                header_allocator_,
01335                n_chunks_));
01336   }
01337 
01338   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01339       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01340     liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01341     liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01342     // Must be at least 1 micro second.
01343     if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01344       liveliness_check_interval_ = ACE_Time_Value (0, 1);
01345     }
01346 
01347     if (reactor_->schedule_timer(this,
01348                                  0,
01349                                  liveliness_check_interval_,
01350                                  liveliness_check_interval_) == -1) {
01351       ACE_ERROR((LM_ERROR,
01352                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01353                  ACE_TEXT("schedule_timer")));
01354 
01355     } else {
01356       cancel_timer_ = true;
01357       this->_add_ref();
01358     }
01359   }
01360 
01361   participant_servant_->add_adjust_liveliness_timers(this);
01362 
01363   // Setup the offered deadline watchdog if the configured deadline
01364   // period is not the default (infinite).
01365   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01366 
01367   if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01368       || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01369     this->watchdog_ = new OfferedDeadlineWatchdog(
01370                          this->lock_,
01371                          this->qos_.deadline,
01372                          this,
01373                          this->dw_local_objref_.in(),
01374                          this->offered_deadline_missed_status_,
01375                          this->last_deadline_missed_total_count_);
01376   }
01377 
01378   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01379   disco->pre_writer(this);
01380 
01381   this->set_enabled();
01382 
01383   try {
01384     this->enable_transport(reliable,
01385                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01386 
01387   } catch (const Transport::Exception&) {
01388     ACE_ERROR((LM_ERROR,
01389                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01390                ACE_TEXT("Transport Exception.\n")));
01391     return DDS::RETCODE_ERROR;
01392 
01393   }
01394 
01395   const TransportLocatorSeq& trans_conf_info = connection_info();
01396 
01397   DDS::PublisherQos pub_qos;
01398   this->publisher_servant_->get_qos(pub_qos);
01399 
01400   this->publication_id_ =
01401     disco->add_publication(this->domain_id_,
01402                            this->participant_servant_->get_id(),
01403                            this->topic_servant_->get_id(),
01404                            this,
01405                            this->qos_,
01406                            trans_conf_info,
01407                            pub_qos);
01408 
01409   if (this->publication_id_ == GUID_UNKNOWN) {
01410     ACE_ERROR((LM_ERROR,
01411                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01412                ACE_TEXT("add_publication returned invalid id. \n")));
01413     return DDS::RETCODE_ERROR;
01414   }
01415 
01416   this->data_container_->publication_id_ = this->publication_id_;
01417 
01418   const DDS::ReturnCode_t writer_enabled_result =
01419     publisher_servant_->writer_enabled(topic_name_.in(), this);
01420 
01421   if (this->monitor_) {
01422     this->monitor_->report();
01423   }
01424 
01425 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01426 
01427   // Move cached data from the durability cache to the unsent data
01428   // queue.
01429   if (durability_cache != 0) {
01430 
01431     if (!durability_cache->get_data(this->domain_id_,
01432                                     get_topic_name(),
01433                                     get_type_name(),
01434                                     this,
01435                                     this->mb_allocator_,
01436                                     this->db_allocator_,
01437                                     this->qos_.lifespan)) {
01438       ACE_ERROR((LM_ERROR,
01439                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01440                  ACE_TEXT("unable to retrieve durable data\n")));
01441     }
01442   }
01443 
01444 #endif
01445 
01446   return writer_enabled_result;
01447 }
01448 
01449 void
01450 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01451 {
01452   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01453 
01454   SendStateDataSampleList list;
01455 
01456   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01457 
01458   controlTracker.message_sent();
01459 
01460   //need to release guard to call down to transport
01461   guard.release();
01462 
01463   this->send(list, transaction_id);
01464 }
01465 
01466 DDS::ReturnCode_t
01467 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01468                                     DataSample* data,
01469                                     const DDS::Time_t& source_timestamp)
01470 {
01471   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01472 
01473   if (enabled_ == false) {
01474     ACE_ERROR_RETURN((LM_ERROR,
01475                       ACE_TEXT("(%P|%t) ERROR: ")
01476                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01477                       ACE_TEXT(" Entity is not enabled. \n")),
01478                      DDS::RETCODE_NOT_ENABLED);
01479   }
01480 
01481   DDS::ReturnCode_t ret =
01482     this->data_container_->register_instance(handle, data);
01483 
01484   if (ret != DDS::RETCODE_OK) {
01485     ACE_ERROR_RETURN((LM_ERROR,
01486                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01487                       ACE_TEXT("register instance with container failed.\n")),
01488                      ret);
01489   }
01490 
01491   if (this->monitor_) {
01492     this->monitor_->report();
01493   }
01494 
01495   DataSampleElement* element = 0;
01496   ret = this->data_container_->obtain_buffer_for_control(element);
01497 
01498   if (ret != DDS::RETCODE_OK) {
01499     ACE_ERROR_RETURN((LM_ERROR,
01500                       ACE_TEXT("(%P|%t) ERROR: ")
01501                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01502                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01503                       ret),
01504                      ret);
01505   }
01506 
01507   // Add header with the registration sample data.
01508   element->set_sample(create_control_message(INSTANCE_REGISTRATION,
01509                                              element->get_header(),
01510                                              data,
01511                                              source_timestamp));
01512 
01513   ret = this->data_container_->enqueue_control(element);
01514 
01515   if (ret != DDS::RETCODE_OK) {
01516     ACE_ERROR_RETURN((LM_ERROR,
01517                       ACE_TEXT("(%P|%t) ERROR: ")
01518                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01519                       ACE_TEXT("enqueue_control failed.\n")),
01520                      ret);
01521   }
01522 
01523   return ret;
01524 }
01525 
01526 DDS::ReturnCode_t
01527 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01528                                     DataSample* data,
01529                                     const DDS::Time_t & source_timestamp)
01530 {
01531   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01532 
01533   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01534                    guard,
01535                    get_lock(),
01536                    ::DDS::RETCODE_ERROR);
01537 
01538   DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp);
01539   if (ret != DDS::RETCODE_OK) {
01540     ACE_ERROR_RETURN((LM_ERROR,
01541                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01542                       ACE_TEXT("register instance with container failed.\n")),
01543                       ret);
01544   }
01545 
01546   send_all_to_flush_control(guard);
01547 
01548   return ret;
01549 }
01550 
01551 DDS::ReturnCode_t
01552 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01553                                       const DDS::Time_t& source_timestamp)
01554 {
01555   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01556 
01557   if (enabled_ == false) {
01558     ACE_ERROR_RETURN((LM_ERROR,
01559                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01560                       ACE_TEXT(" Entity is not enabled.\n")),
01561                      DDS::RETCODE_NOT_ENABLED);
01562   }
01563 
01564   // According to spec 1.2, autodispose_unregistered_instances true causes
01565   // dispose on the instance prior to calling unregister operation.
01566   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01567     return this->dispose_and_unregister(handle, source_timestamp);
01568   }
01569 
01570   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01571   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01572   DataSample* unregistered_sample_data = 0;
01573   ret = this->data_container_->unregister(handle, unregistered_sample_data);
01574 
01575   if (ret != DDS::RETCODE_OK) {
01576     ACE_ERROR_RETURN((LM_ERROR,
01577                       ACE_TEXT("(%P|%t) ERROR: ")
01578                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01579                       ACE_TEXT(" unregister with container failed. \n")),
01580                      ret);
01581   }
01582 
01583   DataSampleElement* element = 0;
01584   ret = this->data_container_->obtain_buffer_for_control(element);
01585 
01586   if (ret != DDS::RETCODE_OK) {
01587     ACE_ERROR_RETURN((LM_ERROR,
01588                       ACE_TEXT("(%P|%t) ERROR: ")
01589                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01590                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01591                       ret),
01592                      ret);
01593   }
01594 
01595   element->set_sample(create_control_message(UNREGISTER_INSTANCE,
01596                                              element->get_header(),
01597                                              unregistered_sample_data,
01598                                              source_timestamp));
01599   ret = this->data_container_->enqueue_control(element);
01600 
01601   if (ret != DDS::RETCODE_OK) {
01602     ACE_ERROR_RETURN((LM_ERROR,
01603                       ACE_TEXT("(%P|%t) ERROR: ")
01604                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01605                       ACE_TEXT("enqueue_control failed.\n")),
01606                      ret);
01607   }
01608 
01609   send_all_to_flush_control(guard);
01610   return DDS::RETCODE_OK;
01611 }
01612 
01613 DDS::ReturnCode_t
01614 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01615                                        const DDS::Time_t& source_timestamp)
01616 {
01617   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01618 
01619   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01620   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01621 
01622   DataSample* data_sample = 0;
01623   ret = this->data_container_->dispose(handle, data_sample);
01624 
01625   if (ret != DDS::RETCODE_OK) {
01626     ACE_ERROR_RETURN((LM_ERROR,
01627                       ACE_TEXT("(%P|%t) ERROR: ")
01628                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01629                       ACE_TEXT("dispose on container failed. \n")),
01630                      ret);
01631   }
01632 
01633   ret = this->data_container_->unregister(handle, data_sample, false);
01634 
01635   if (ret != DDS::RETCODE_OK) {
01636     ACE_ERROR_RETURN((LM_ERROR,
01637                       ACE_TEXT("(%P|%t) ERROR: ")
01638                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01639                       ACE_TEXT("unregister with container failed. \n")),
01640                      ret);
01641   }
01642 
01643   DataSampleElement* element = 0;
01644   ret = this->data_container_->obtain_buffer_for_control(element);
01645 
01646   if (ret != DDS::RETCODE_OK) {
01647     ACE_ERROR_RETURN((LM_ERROR,
01648                       ACE_TEXT("(%P|%t) ERROR: ")
01649                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01650                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01651                       ret),
01652                      ret);
01653   }
01654 
01655   element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01656                                              element->get_header(),
01657                                              data_sample,
01658                                              source_timestamp));
01659 
01660   ret = this->data_container_->enqueue_control(element);
01661 
01662   if (ret != DDS::RETCODE_OK) {
01663     ACE_ERROR_RETURN((LM_ERROR,
01664                       ACE_TEXT("(%P|%t) ERROR: ")
01665                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01666                       ACE_TEXT("enqueue_control failed.\n")),
01667                      ret);
01668   }
01669 
01670   send_all_to_flush_control(guard);
01671   return DDS::RETCODE_OK;
01672 }
01673 
01674 void
01675 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01676 {
01677   {
01678     ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01679 
01680     PublicationInstanceMapType::iterator it =
01681       this->data_container_->instances_.begin();
01682 
01683     while (it != this->data_container_->instances_.end()) {
01684       DDS::InstanceHandle_t handle = it->first;
01685       ++it; // avoid mangling the iterator
01686 
01687       this->unregister_instance_i(handle, source_timestamp);
01688     }
01689   }
01690 }
01691 
01692 DDS::ReturnCode_t
01693 DataWriterImpl::write(DataSample* data,
01694                       DDS::InstanceHandle_t handle,
01695                       const DDS::Time_t& source_timestamp,
01696                       GUIDSeq* filter_out)
01697 {
01698   DBG_ENTRY_LVL("DataWriterImpl","write",6);
01699 
01700   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01701                     guard,
01702                     get_lock (),
01703                     ::DDS::RETCODE_ERROR);
01704 
01705   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
01706   GUIDSeq_var filter_out_var(filter_out);
01707 
01708   if (enabled_ == false) {
01709     ACE_ERROR_RETURN((LM_ERROR,
01710                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01711                       ACE_TEXT(" Entity is not enabled. \n")),
01712                      DDS::RETCODE_NOT_ENABLED);
01713   }
01714 
01715   DataSampleElement* element = 0;
01716   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01717 
01718   if (ret == DDS::RETCODE_TIMEOUT) {
01719     return ret; // silent for timeout
01720 
01721   } else if (ret != DDS::RETCODE_OK) {
01722     ACE_ERROR_RETURN((LM_ERROR,
01723                       ACE_TEXT("(%P|%t) ERROR: ")
01724                       ACE_TEXT("DataWriterImpl::write: ")
01725                       ACE_TEXT("obtain_buffer returned %d.\n"),
01726                       ret),
01727                      ret);
01728   }
01729 
01730   DataSample* temp;
01731   ret = create_sample_data_message(data,
01732                                    handle,
01733                                    element->get_header(),
01734                                    temp,
01735                                    source_timestamp,
01736                                    (filter_out != 0));
01737   element->set_sample(temp);
01738 
01739   if (ret != DDS::RETCODE_OK) {
01740     return ret;
01741   }
01742 
01743   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
01744 
01745   ret = this->data_container_->enqueue(element, handle);
01746 
01747   if (ret != DDS::RETCODE_OK) {
01748     ACE_ERROR_RETURN((LM_ERROR,
01749                       ACE_TEXT("(%P|%t) ERROR: ")
01750                       ACE_TEXT("DataWriterImpl::write: ")
01751                       ACE_TEXT("enqueue failed.\n")),
01752                      ret);
01753   }
01754   this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01755 
01756   track_sequence_number(filter_out);
01757 
01758   if (this->coherent_) {
01759     ++this->coherent_samples_;
01760   }
01761   SendStateDataSampleList list;
01762 
01763   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01764 
01765   if (this->publisher_servant_->is_suspended()) {
01766     if (min_suspended_transaction_id_ == 0) {
01767       //provides transaction id for lower bound of suspended transactions
01768       //or transaction id for single suspended write transaction
01769       min_suspended_transaction_id_ = transaction_id;
01770     } else {
01771       //when multiple write transactions have suspended, provides the upper bound
01772       //for suspended transactions.
01773       max_suspended_transaction_id_ = transaction_id;
01774     }
01775     this->available_data_list_.enqueue_tail(list);
01776 
01777   } else {
01778     guard.release();
01779 
01780     this->send(list, transaction_id);
01781   }
01782 
01783   return DDS::RETCODE_OK;
01784 }
01785 
01786 void
01787 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01788 {
01789   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01790 
01791 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01792   // Track individual expected sequence numbers in ReaderInfo
01793   RepoIdSet excluded;
01794 
01795   if (filter_out && !reader_info_.empty()) {
01796     const GUID_t* buf = filter_out->get_buffer();
01797     excluded.insert(buf, buf + filter_out->length());
01798   }
01799 
01800   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01801        end = reader_info_.end(); iter != end; ++iter) {
01802     // If not excluding this reader, update expected sequence
01803     if (excluded.count(iter->first) == 0) {
01804       iter->second.expected_sequence_ = sequence_number_;
01805     }
01806   }
01807 
01808 #else
01809   ACE_UNUSED_ARG(filter_out);
01810   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01811        end = reader_info_.end(); iter != end; ++iter) {
01812     iter->second.expected_sequence_ = sequence_number_;
01813   }
01814 
01815 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01816 
01817 }
01818 
01819 void
01820 DataWriterImpl::send_suspended_data()
01821 {
01822   //this serves to get TransportClient's max_transaction_id_seen_
01823   //to the correct value for this list of transactions
01824   if (max_suspended_transaction_id_ != 0) {
01825     this->send(this->available_data_list_, max_suspended_transaction_id_);
01826     max_suspended_transaction_id_ = 0;
01827   }
01828 
01829   //this serves to actually have the send proceed in
01830   //sending the samples to the datalinks by passing it
01831   //the min_suspended_transaction_id_ which should be the
01832   //TransportClient's expected_transaction_id_
01833   this->send(this->available_data_list_, min_suspended_transaction_id_);
01834   min_suspended_transaction_id_ = 0;
01835   this->available_data_list_.reset();
01836 }
01837 
01838 DDS::ReturnCode_t
01839 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01840                         const DDS::Time_t & source_timestamp)
01841 {
01842   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01843 
01844   if (enabled_ == false) {
01845     ACE_ERROR_RETURN((LM_ERROR,
01846                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01847                       ACE_TEXT(" Entity is not enabled. \n")),
01848                      DDS::RETCODE_NOT_ENABLED);
01849   }
01850 
01851   DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR;
01852 
01853   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01854 
01855   DataSample* registered_sample_data = 0;
01856   ret = this->data_container_->dispose(handle, registered_sample_data);
01857 
01858   if (ret != DDS::RETCODE_OK) {
01859     ACE_ERROR_RETURN((LM_ERROR,
01860                       ACE_TEXT("(%P|%t) ERROR: ")
01861                       ACE_TEXT("DataWriterImpl::dispose: ")
01862                       ACE_TEXT("dispose failed.\n")),
01863                      ret);
01864   }
01865 
01866   DataSampleElement* element = 0;
01867   ret = this->data_container_->obtain_buffer_for_control(element);
01868 
01869   if (ret != DDS::RETCODE_OK) {
01870     ACE_ERROR_RETURN((LM_ERROR,
01871                       ACE_TEXT("(%P|%t) ERROR: ")
01872                       ACE_TEXT("DataWriterImpl::dispose: ")
01873                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01874                       ret),
01875                      ret);
01876   }
01877 
01878   element->set_sample(create_control_message(DISPOSE_INSTANCE,
01879                                              element->get_header(),
01880                                              registered_sample_data,
01881                                              source_timestamp));
01882   ret = this->data_container_->enqueue_control(element);
01883 
01884   if (ret != DDS::RETCODE_OK) {
01885     ACE_ERROR_RETURN((LM_ERROR,
01886                       ACE_TEXT("(%P|%t) ERROR: ")
01887                       ACE_TEXT("DataWriterImpl::dispose: ")
01888                       ACE_TEXT("enqueue_control failed.\n")),
01889                      ret);
01890   }
01891 
01892   send_all_to_flush_control(guard);
01893 
01894   return DDS::RETCODE_OK;
01895 }
01896 
01897 DDS::ReturnCode_t
01898 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01899                             size_t&                 size)
01900 {
01901   return data_container_->num_samples(handle, size);
01902 }
01903 
01904 void
01905 DataWriterImpl::unregister_all()
01906 {
01907   if (cancel_timer_) {
01908     // The cancel_timer will call handle_close to remove_ref.
01909     (void) reactor_->cancel_timer(this, 0);
01910     cancel_timer_ = false;
01911   }
01912 
01913   data_container_->unregister_all();
01914 }
01915 
01916 RepoId
01917 DataWriterImpl::get_publication_id()
01918 {
01919   return publication_id_;
01920 }
01921 
01922 RepoId
01923 DataWriterImpl::get_dp_id()
01924 {
01925   return participant_servant_->get_id();
01926 }
01927 
01928 const char*
01929 DataWriterImpl::get_topic_name()
01930 {
01931   return topic_name_.in();
01932 }
01933 
01934 char const *
01935 DataWriterImpl::get_type_name() const
01936 {
01937   return type_name_.in();
01938 }
01939 
01940 ACE_Message_Block*
01941 DataWriterImpl::create_control_message(MessageId message_id,
01942                                        DataSampleHeader& header_data,
01943                                        ACE_Message_Block* data,
01944                                        const DDS::Time_t& source_timestamp)
01945 {
01946   header_data.message_id_ = message_id;
01947   header_data.byte_order_ =
01948     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01949   header_data.coherent_change_ = 0;
01950 
01951   if (data) {
01952     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01953 
01954     if (header_data.message_length_ == 0) {
01955       data->release();
01956     }
01957   }
01958 
01959   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01960   header_data.sequence_repair_ = false; // set below
01961   header_data.source_timestamp_sec_ = source_timestamp.sec;
01962   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01963   header_data.publication_id_ = publication_id_;
01964   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01965 
01966   if (message_id == INSTANCE_REGISTRATION
01967       || message_id == DISPOSE_INSTANCE
01968       || message_id == UNREGISTER_INSTANCE
01969       || message_id == DISPOSE_UNREGISTER_INSTANCE) {
01970 
01971     header_data.sequence_repair_ = need_sequence_repair();
01972 
01973     // Use the sequence number here for the sake of RTPS (where these
01974     // control messages map onto the Data Submessage).
01975     if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01976       this->sequence_number_ = SequenceNumber();
01977 
01978     } else {
01979       ++this->sequence_number_;
01980     }
01981 
01982     header_data.sequence_ = this->sequence_number_;
01983     header_data.key_fields_only_ = true;
01984   }
01985 
01986   ACE_Message_Block* message = 0;
01987   ACE_NEW_MALLOC_RETURN(message,
01988                         static_cast<ACE_Message_Block*>(
01989                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01990                         ACE_Message_Block(
01991                           DataSampleHeader::max_marshaled_size(),
01992                           ACE_Message_Block::MB_DATA,
01993                           header_data.message_length_ ? data : 0, //cont
01994                           0, //data
01995                           0, //allocator_strategy
01996                           get_db_lock(), //locking_strategy
01997                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01998                           ACE_Time_Value::zero,
01999                           ACE_Time_Value::max_time,
02000                           db_allocator_,
02001                           mb_allocator_),
02002                         0);
02003 
02004   *message << header_data;
02005 
02006   // If we incremented sequence number for this control message
02007   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02008     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02009     // Update the expected sequence number for all readers
02010     RepoIdToReaderInfoMap::iterator reader;
02011 
02012     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02013       reader->second.expected_sequence_ = sequence_number_;
02014     }
02015   }
02016 
02017   return message;
02018 }
02019 
02020 DDS::ReturnCode_t
02021 DataWriterImpl::create_sample_data_message(DataSample* data,
02022                                            DDS::InstanceHandle_t instance_handle,
02023                                            DataSampleHeader& header_data,
02024                                            ACE_Message_Block*& message,
02025                                            const DDS::Time_t& source_timestamp,
02026                                            bool content_filter)
02027 {
02028   PublicationInstance* const instance =
02029     data_container_->get_handle_instance(instance_handle);
02030 
02031   if (0 == instance) {
02032     ACE_ERROR_RETURN((LM_ERROR,
02033                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02034                       ACE_TEXT("failed to find instance for handle %d\n"),
02035                       instance_handle),
02036                      DDS::RETCODE_ERROR);
02037   }
02038 
02039   header_data.message_id_ = SAMPLE_DATA;
02040   header_data.byte_order_ =
02041     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02042   header_data.coherent_change_ = this->coherent_;
02043 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02044   header_data.group_coherent_ =
02045     this->publisher_servant_->qos_.presentation.access_scope
02046     == DDS::GROUP_PRESENTATION_QOS;
02047 #endif
02048   header_data.content_filter_ = content_filter;
02049   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02050   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02051   header_data.sequence_repair_ = need_sequence_repair();
02052 
02053   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02054     this->sequence_number_ = SequenceNumber();
02055 
02056   } else {
02057     ++this->sequence_number_;
02058   }
02059 
02060   header_data.sequence_ = this->sequence_number_;
02061   header_data.source_timestamp_sec_ = source_timestamp.sec;
02062   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02063 
02064   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02065       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02066     header_data.lifespan_duration_ = true;
02067     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02068     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02069   }
02070 
02071   header_data.publication_id_ = publication_id_;
02072   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
02073   size_t max_marshaled_size = header_data.max_marshaled_size();
02074 
02075   ACE_NEW_MALLOC_RETURN(message,
02076                         static_cast<ACE_Message_Block*>(
02077                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02078                         ACE_Message_Block(max_marshaled_size,
02079                                           ACE_Message_Block::MB_DATA,
02080                                           data, //cont
02081                                           0, //data
02082                                           header_allocator_, //alloc_strategy
02083                                           get_db_lock(), //locking_strategy
02084                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02085                                           ACE_Time_Value::zero,
02086                                           ACE_Time_Value::max_time,
02087                                           db_allocator_,
02088                                           mb_allocator_),
02089                         DDS::RETCODE_ERROR);
02090 
02091   *message << header_data;
02092   return DDS::RETCODE_OK;
02093 }
02094 
02095 void
02096 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02097 {
02098   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02099 
02100   if (!(sample->get_pub_id() == this->publication_id_)) {
02101     GuidConverter sample_converter(sample->get_pub_id());
02102     GuidConverter writer_converter(publication_id_);
02103     ACE_ERROR((LM_ERROR,
02104                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02105                ACE_TEXT(" The publication id %C from delivered element ")
02106                ACE_TEXT("does not match the datawriter's id %C\n"),
02107                OPENDDS_STRING(sample_converter).c_str(),
02108                OPENDDS_STRING(writer_converter).c_str()));
02109     return;
02110   }
02111   this->data_container_->data_delivered(sample);
02112 
02113   ++data_delivered_count_;
02114 }
02115 
02116 void
02117 DataWriterImpl::control_delivered(ACE_Message_Block* sample)
02118 {
02119   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02120   sample->release();
02121   controlTracker.message_delivered();
02122 }
02123 
02124 EntityImpl*
02125 DataWriterImpl::parent() const
02126 {
02127   return this->publisher_servant_;
02128 }
02129 
02130 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02131 bool
02132 DataWriterImpl::filter_out(const DataSampleElement& elt,
02133                            const OPENDDS_STRING& filterClassName,
02134                            const FilterEvaluator& evaluator,
02135                            const DDS::StringSeq& expression_params) const
02136 {
02137   TypeSupportImpl* const typesupport =
02138     dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02139 
02140   if (!typesupport) {
02141     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02142     return false;
02143   }
02144 
02145   if (filterClassName == "DDSSQL" ||
02146       filterClassName == "OPENDDSSQL") {
02147     return !evaluator.eval(elt.get_sample()->cont(),
02148                            elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02149                            elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02150                            expression_params);
02151   }
02152   else {
02153     return false;
02154   }
02155 }
02156 #endif
02157 
02158 bool
02159 DataWriterImpl::check_transport_qos(const TransportInst&)
02160 {
02161   // DataWriter does not impose any constraints on which transports
02162   // may be used based on QoS.
02163   return true;
02164 }
02165 
02166 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02167 
02168 bool
02169 DataWriterImpl::coherent_changes_pending()
02170 {
02171   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02172                    guard,
02173                    get_lock(),
02174                    false);
02175 
02176   return this->coherent_;
02177 }
02178 
02179 void
02180 DataWriterImpl::begin_coherent_changes()
02181 {
02182   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02183             guard,
02184             get_lock());
02185 
02186   this->coherent_ = true;
02187 }
02188 
02189 void
02190 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02191 {
02192   // PublisherImpl::pi_lock_ should be held.
02193   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02194             guard,
02195             get_lock());
02196 
02197   CoherentChangeControl end_msg;
02198   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02199   end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02200   end_msg.group_coherent_
02201     = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02202 
02203   if (end_msg.group_coherent_) {
02204     end_msg.publisher_id_ = this->publisher_servant_->publisher_id_;
02205     end_msg.group_coherent_samples_ = group_samples;
02206   }
02207 
02208   ACE_Message_Block* data = 0;
02209   size_t max_marshaled_size = end_msg.max_marshaled_size();
02210 
02211   ACE_NEW(data, ACE_Message_Block(max_marshaled_size,
02212                                   ACE_Message_Block::MB_DATA,
02213                                   0, //cont
02214                                   0, //data
02215                                   0, //alloc_strategy
02216                                   get_db_lock()));
02217 
02218   Serializer serializer(
02219     data,
02220     this->swap_bytes());
02221 
02222   serializer << end_msg;
02223 
02224   DDS::Time_t source_timestamp =
02225     time_value_to_time(ACE_OS::gettimeofday());
02226 
02227   DataSampleHeader header;
02228   ACE_Message_Block* control =
02229     create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp);
02230 
02231 
02232   this->coherent_ = false;
02233   this->coherent_samples_ = 0;
02234 
02235   guard.release();
02236   if (this->send_control(header, control) == SEND_CONTROL_ERROR) {
02237     ACE_ERROR((LM_ERROR,
02238                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02239                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02240   }
02241 }
02242 
02243 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02244 
02245 void
02246 DataWriterImpl::data_dropped(const DataSampleElement* element,
02247                              bool dropped_by_transport)
02248 {
02249   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02250 
02251   this->data_container_->data_dropped(element, dropped_by_transport);
02252 
02253   ++data_dropped_count_;
02254 }
02255 
02256 void
02257 DataWriterImpl::control_dropped(ACE_Message_Block* sample,
02258                                 bool /* dropped_by_transport */)
02259 {
02260   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02261   sample->release();
02262   controlTracker.message_dropped();
02263 }
02264 
02265 DDS::DataWriterListener_ptr
02266 DataWriterImpl::listener_for(DDS::StatusKind kind)
02267 {
02268   // per 2.1.4.3.1 Listener Access to Plain Communication Status
02269   // use this entities factory if listener is mask not enabled
02270   // for this kind.
02271   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02272     return publisher_servant_->listener_for(kind);
02273 
02274   } else {
02275     return DDS::DataWriterListener::_duplicate(listener_.in());
02276   }
02277 }
02278 
02279 int
02280 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02281                                const void * /* arg */)
02282 {
02283   const ACE_Time_Value delta = tv - last_liveliness_check_time_;
02284   if (delta < liveliness_check_interval_) {
02285     // Too early.  Reschedule.
02286     if (reactor_->cancel_timer(this) == -1) {
02287       ACE_ERROR((LM_ERROR,
02288                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02289                  ACE_TEXT("cancel_timer")));
02290     }
02291     if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) {
02292       ACE_ERROR((LM_ERROR,
02293                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02294                  ACE_TEXT("schedule_timer")));
02295     }
02296     return 0;
02297   }
02298 
02299   bool liveliness_lost = false;
02300 
02301   ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02302 
02303   // Do we need to send a liveliness message?
02304   if (elapsed >= liveliness_check_interval_) {
02305     switch (this->qos_.liveliness.kind) {
02306     case DDS::AUTOMATIC_LIVELINESS_QOS:
02307       if (this->send_liveliness(tv) == false) {
02308         liveliness_lost = true;
02309       }
02310       break;
02311 
02312     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02313       if (liveliness_asserted_) {
02314         if (this->send_liveliness(tv) == false) {
02315           liveliness_lost = true;
02316         }
02317       }
02318       break;
02319 
02320     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02321       // Do nothing.
02322       break;
02323     }
02324   }
02325 
02326   liveliness_asserted_ = false;
02327   last_liveliness_check_time_ = tv;
02328   elapsed = tv - last_liveliness_activity_time_;
02329 
02330   // Have we lost liveliness?
02331   if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02332     liveliness_lost = true;
02333   }
02334 
02335   if (!this->liveliness_lost_ && liveliness_lost) {
02336     ++ this->liveliness_lost_status_.total_count;
02337     ++ this->liveliness_lost_status_.total_count_change;
02338 
02339     DDS::DataWriterListener_var listener =
02340       listener_for(DDS::LIVELINESS_LOST_STATUS);
02341 
02342     if (!CORBA::is_nil(listener.in())) {
02343       listener->on_liveliness_lost(this->dw_local_objref_.in(),
02344                                    this->liveliness_lost_status_);
02345     }
02346   }
02347 
02348   this->liveliness_lost_ = liveliness_lost;
02349   return 0;
02350 }
02351 
02352 int
02353 DataWriterImpl::handle_close(ACE_HANDLE,
02354                              ACE_Reactor_Mask)
02355 {
02356   this->_remove_ref();
02357   return 0;
02358 }
02359 
02360 bool
02361 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02362 {
02363   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02364       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02365     DDS::Time_t t = time_value_to_time(now);
02366     DataSampleHeader header;
02367     ACE_Message_Block* liveliness_msg =
02368       this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t);
02369 
02370     if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) {
02371       ACE_ERROR_RETURN((LM_ERROR,
02372                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02373                         ACE_TEXT(" send_control failed. \n")),
02374                        false);
02375 
02376     } else {
02377       last_liveliness_activity_time_ = now;
02378       return true;
02379     }
02380   } else {
02381     last_liveliness_activity_time_ = now;
02382     return true;
02383   }
02384 }
02385 
02386 void
02387 DataWriterImpl::prepare_to_delete()
02388 {
02389   this->set_deleted(true);
02390   this->stop_associating();
02391 }
02392 
02393 PublicationInstance*
02394 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02395 {
02396   PublicationInstance* instance = 0;
02397 
02398   if (0 != data_container_) {
02399     instance = data_container_->get_handle_instance(handle);
02400   }
02401 
02402   return instance;
02403 }
02404 
02405 void
02406 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02407 {
02408   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02409 
02410   if (!is_bit_) {
02411     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02412     // is given to this DataWriter then narrow() fails.
02413     DataWriterListener_var the_listener =
02414       DataWriterListener::_narrow(this->listener_.in());
02415 
02416     if (!CORBA::is_nil(the_listener.in())) {
02417       PublicationDisconnectedStatus status;
02418       // Since this callback may come after remove_association which
02419       // removes the reader from id_to_handle map, we can ignore this
02420       // error.
02421       this->lookup_instance_handles(subids,
02422                                     status.subscription_handles);
02423       the_listener->on_publication_disconnected(this->dw_local_objref_.in(),
02424                                                 status);
02425     }
02426   }
02427 }
02428 
02429 void
02430 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02431 {
02432   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02433 
02434   if (!is_bit_) {
02435     // Narrow to DDS::DCPS::DataWriterListener. If a
02436     // DDS::DataWriterListener is given to this DataWriter then
02437     // narrow() fails.
02438     DataWriterListener_var the_listener =
02439       DataWriterListener::_narrow(this->listener_.in());
02440 
02441     if (!CORBA::is_nil(the_listener.in())) {
02442       PublicationDisconnectedStatus status;
02443 
02444       // If it's reconnected then the reader should be in id_to_handle
02445       // map otherwise log with an error.
02446       if (this->lookup_instance_handles(subids,
02447                                         status.subscription_handles) == false) {
02448         ACE_ERROR((LM_ERROR,
02449                    "(%P|%t) ERROR: DataWriterImpl::"
02450                    "notify_publication_reconnected: "
02451                    "lookup_instance_handles failed\n"));
02452       }
02453 
02454       the_listener->on_publication_reconnected(this->dw_local_objref_.in(),
02455                                                status);
02456     }
02457   }
02458 }
02459 
02460 void
02461 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02462 {
02463   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02464 
02465   if (!is_bit_) {
02466     // Narrow to DDS::DCPS::DataWriterListener. If a
02467     // DDS::DataWriterListener is given to this DataWriter then
02468     // narrow() fails.
02469     DataWriterListener_var the_listener =
02470       DataWriterListener::_narrow(this->listener_.in());
02471 
02472     if (!CORBA::is_nil(the_listener.in())) {
02473       PublicationLostStatus status;
02474 
02475       // Since this callback may come after remove_association which removes
02476       // the reader from id_to_handle map, we can ignore this error.
02477       this->lookup_instance_handles(subids,
02478                                     status.subscription_handles);
02479       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02480                                         status);
02481     }
02482   }
02483 }
02484 
02485 void
02486 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02487 {
02488   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02489 
02490   if (!is_bit_) {
02491     // Narrow to DDS::DCPS::DataWriterListener. If a
02492     // DDS::DataWriterListener is given to this DataWriter then
02493     // narrow() fails.
02494     DataWriterListener_var the_listener =
02495       DataWriterListener::_narrow(this->listener_.in());
02496 
02497     if (!CORBA::is_nil(the_listener.in())) {
02498       PublicationLostStatus status;
02499 
02500       CORBA::ULong len = handles.length();
02501       status.subscription_handles.length(len);
02502 
02503       for (CORBA::ULong i = 0; i < len; ++ i) {
02504         status.subscription_handles[i] = handles[i];
02505       }
02506 
02507       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02508                                         status);
02509     }
02510   }
02511 }
02512 
02513 void
02514 DataWriterImpl::notify_connection_deleted(const RepoId& peerId)
02515 {
02516   DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6);
02517   on_notification_of_connection_deletion(peerId);
02518   // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02519   // is given to this DataWriter then narrow() fails.
02520   DataWriterListener_var the_listener =
02521     DataWriterListener::_narrow(this->listener_.in());
02522 
02523   if (!CORBA::is_nil(the_listener.in()))
02524     the_listener->on_connection_deleted(this->dw_local_objref_.in());
02525 }
02526 
02527 bool
02528 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02529                                         DDS::InstanceHandleSeq & hdls)
02530 {
02531   if (DCPS_debug_level > 9) {
02532     CORBA::ULong const size = ids.length();
02533     OPENDDS_STRING separator;
02534     OPENDDS_STRING buffer;
02535 
02536     for (unsigned long i = 0; i < size; ++i) {
02537       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02538       separator = ", ";
02539     }
02540 
02541     ACE_DEBUG((LM_DEBUG,
02542                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02543                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02544                buffer.c_str()));
02545   }
02546 
02547   CORBA::ULong const num_rds = ids.length();
02548   hdls.length(num_rds);
02549 
02550   for (CORBA::ULong i = 0; i < num_rds; ++i) {
02551     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02552   }
02553 
02554   return true;
02555 }
02556 
02557 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02558 bool
02559 DataWriterImpl::persist_data()
02560 {
02561   return this->data_container_->persist_data();
02562 }
02563 #endif
02564 
02565 void
02566 DataWriterImpl::reschedule_deadline()
02567 {
02568   if (this->watchdog_ != 0) {
02569     this->data_container_->reschedule_deadline();
02570   }
02571 }
02572 
02573 bool
02574 DataWriterImpl::pending_control()
02575 {
02576   return controlTracker.pending_messages();
02577 }
02578 
02579 void
02580 DataWriterImpl::wait_control_pending()
02581 {
02582   OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02583   controlTracker.wait_messages_pending(caller_string);
02584 }
02585 
02586 void
02587 DataWriterImpl::wait_pending()
02588 {
02589   this->data_container_->wait_pending();
02590 }
02591 
02592 void
02593 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02594 {
02595   this->data_container_->get_instance_handles(instance_handles);
02596 }
02597 
02598 void
02599 DataWriterImpl::get_readers(RepoIdSet& readers)
02600 {
02601   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02602   readers = this->readers_;
02603 }
02604 
02605 void
02606 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02607 {
02608   this->publisher_servant_->get_qos(qos_data.pub_qos);
02609   qos_data.dw_qos = this->qos_;
02610   qos_data.topic_name = this->topic_name_.in();
02611 }
02612 
02613 bool
02614 DataWriterImpl::need_sequence_repair()
02615 {
02616   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02617   return need_sequence_repair_i();
02618 }
02619 
02620 bool
02621 DataWriterImpl::need_sequence_repair_i() const
02622 {
02623   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02624        end = reader_info_.end(); it != end; ++it) {
02625     if (it->second.expected_sequence_ != sequence_number_) {
02626       return true;
02627     }
02628   }
02629 
02630   return false;
02631 }
02632 
02633 SendControlStatus
02634 DataWriterImpl::send_control(const DataSampleHeader& header,
02635                              ACE_Message_Block* msg)
02636 {
02637   controlTracker.message_sent();
02638 
02639   SendControlStatus status = TransportClient::send_control(header, msg);
02640 
02641   if (status != SEND_CONTROL_OK) {
02642     controlTracker.message_dropped();
02643   }
02644 
02645   return status;
02646 }
02647 
02648 } // namespace DCPS
02649 } // namespace OpenDDS

Generated on Mon Sep 14 18:13:49 2015 for OpenDDS by  doxygen 1.4.7