DataWriterImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "Serializer.h"
00018 #include "Transient_Kludge.h"
00019 #include "DataDurabilityCache.h"
00020 #include "OfferedDeadlineWatchdog.h"
00021 #include "MonitorFactory.h"
00022 #include "TypeSupportImpl.h"
00023 #include "SendStateDataSampleList.h"
00024 #include "DataSampleElement.h"
00025 
00026 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00027 #include "CoherentChangeControl.h"
00028 #endif
00029 
00030 #include "AssociationData.h"
00031 #include "dds/DdsDcpsCoreC.h"
00032 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00033 
00034 #if !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "BuiltInTopicUtils.h"
00036 #include "dds/DdsDcpsCoreTypeSupportC.h"
00037 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00038 
00039 #include "Util.h"
00040 #include "dds/DCPS/transport/framework/EntryExit.h"
00041 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00042 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00043 
00044 #include "ace/Reactor.h"
00045 #include "ace/Auto_Ptr.h"
00046 
00047 #include <stdexcept>
00048 
00049 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00050 
00051 namespace OpenDDS {
00052 namespace DCPS {
00053 
00054 //TBD - add check for enabled in most methods.
00055 //      currently this is not needed because auto_enable_created_entities
00056 //      cannot be false.
00057 
00058 DataWriterImpl::DataWriterImpl()
00059   : data_dropped_count_(0),
00060     data_delivered_count_(0),
00061     controlTracker("DataWriterImpl"),
00062     n_chunks_(TheServiceParticipant->n_chunks()),
00063     association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00064     qos_(TheServiceParticipant->initial_DataWriterQos()),
00065     db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks())),
00066     topic_id_(GUID_UNKNOWN),
00067     topic_servant_(0),
00068     listener_mask_(DEFAULT_STATUS_MASK),
00069     domain_id_(0),
00070     publication_id_(GUID_UNKNOWN),
00071     sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00072     coherent_(false),
00073     coherent_samples_(0),
00074     liveliness_lost_(false),
00075     reactor_(0),
00076     liveliness_check_interval_(ACE_Time_Value::max_time),
00077     last_liveliness_activity_time_(ACE_Time_Value::zero),
00078     last_deadline_missed_total_count_(0),
00079     watchdog_(),
00080     is_bit_(false),
00081     min_suspended_transaction_id_(0),
00082     max_suspended_transaction_id_(0),
00083     monitor_(0),
00084     periodic_monitor_(0),
00085     liveliness_asserted_(false),
00086     liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
00087 {
00088   liveliness_lost_status_.total_count = 0;
00089   liveliness_lost_status_.total_count_change = 0;
00090 
00091   offered_deadline_missed_status_.total_count = 0;
00092   offered_deadline_missed_status_.total_count_change = 0;
00093   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00094 
00095   offered_incompatible_qos_status_.total_count = 0;
00096   offered_incompatible_qos_status_.total_count_change = 0;
00097   offered_incompatible_qos_status_.last_policy_id = 0;
00098   offered_incompatible_qos_status_.policies.length(0);
00099 
00100   publication_match_status_.total_count = 0;
00101   publication_match_status_.total_count_change = 0;
00102   publication_match_status_.current_count = 0;
00103   publication_match_status_.current_count_change = 0;
00104   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00105 
00106   monitor_ =
00107     TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00108   periodic_monitor_ =
00109     TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00110 }
00111 
00112 // This method is called when there are no longer any reference to the
00113 // the servant.
00114 DataWriterImpl::~DataWriterImpl()
00115 {
00116   DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00117 }
00118 
00119 // this method is called when delete_datawriter is called.
00120 void
00121 DataWriterImpl::cleanup()
00122 {
00123   // As first step set our listener to nill which will prevent us from calling
00124   // back onto the listener at the moment the related DDS entity has been
00125   // deleted
00126   set_listener(0, NO_STATUS_MASK);
00127   topic_servant_ = 0;
00128 }
00129 
00130 void
00131 DataWriterImpl::init(
00132   TopicImpl *                          topic_servant,
00133   const DDS::DataWriterQos &           qos,
00134   DDS::DataWriterListener_ptr          a_listener,
00135   const DDS::StatusMask &              mask,
00136   OpenDDS::DCPS::WeakRcHandle<OpenDDS::DCPS::DomainParticipantImpl> participant_servant,
00137   OpenDDS::DCPS::PublisherImpl *         publisher_servant)
00138 {
00139   DBG_ENTRY_LVL("DataWriterImpl","init",6);
00140   topic_servant_ = topic_servant;
00141   topic_name_    = topic_servant_->get_name();
00142   topic_id_      = topic_servant_->get_id();
00143   type_name_     = topic_servant_->get_type_name();
00144 
00145 #if !defined (DDS_HAS_MINIMUM_BIT)
00146   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00147 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00148 
00149   qos_ = qos;
00150 
00151   //Note: OK to _duplicate(nil).
00152   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00153   listener_mask_ = mask;
00154 
00155   // Only store the participant pointer, since it is our "grand"
00156   // parent, we will exist as long as it does.
00157   participant_servant_ = participant_servant;
00158 
00159   RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
00160   domain_id_ = participant->get_domain_id();
00161 
00162   // Only store the publisher pointer, since it is our parent, we will
00163   // exist as long as it does.
00164   publisher_servant_ = *publisher_servant;
00165 
00166   this->reactor_ = TheServiceParticipant->timer();
00167 }
00168 
00169 DDS::InstanceHandle_t
00170 DataWriterImpl::get_instance_handle()
00171 {
00172   using namespace OpenDDS::DCPS;
00173   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00174   if (participant)
00175     return participant->id_to_handle(publication_id_);
00176   return 0;
00177 }
00178 
00179 DDS::InstanceHandle_t
00180 DataWriterImpl::get_next_handle()
00181 {
00182   using namespace OpenDDS::DCPS;
00183   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00184   if (participant)
00185     return participant->id_to_handle(GUID_UNKNOWN);
00186   return 0;
00187 }
00188 
00189 void
00190 DataWriterImpl::add_association(const RepoId& yourId,
00191                                 const ReaderAssociation& reader,
00192                                 bool active)
00193 {
00194   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00195 
00196   if (DCPS_debug_level) {
00197     GuidConverter writer_converter(yourId);
00198     GuidConverter reader_converter(reader.readerId);
00199     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00200                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00201                OPENDDS_STRING(writer_converter).c_str(),
00202                OPENDDS_STRING(reader_converter).c_str()));
00203   }
00204 
00205   if (entity_deleted_.value()) {
00206     if (DCPS_debug_level)
00207       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00208                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00209 
00210     return;
00211   }
00212 
00213   if (GUID_UNKNOWN == publication_id_) {
00214     publication_id_ = yourId;
00215   }
00216 
00217   {
00218     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00219     reader_info_.insert(std::make_pair(reader.readerId,
00220                                        ReaderInfo(reader.filterClassName,
00221                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00222                                                   reader.exprParams, participant_servant_,
00223                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00224   }
00225 
00226   if (DCPS_debug_level > 4) {
00227     GuidConverter converter(get_publication_id());
00228     ACE_DEBUG((LM_DEBUG,
00229                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00230                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00231                OPENDDS_STRING(converter).c_str(),
00232                qos_.transport_priority.value));
00233   }
00234 
00235   AssociationData data;
00236   data.remote_id_ = reader.readerId;
00237   data.remote_data_ = reader.readerTransInfo;
00238   data.remote_reliable_ =
00239     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00240   data.remote_durable_ =
00241     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00242 
00243   if (!associate(data, active)) {
00244     //FUTURE: inform inforepo and try again as passive peer
00245     if (DCPS_debug_level) {
00246       ACE_ERROR((LM_ERROR,
00247                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00248                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00249     }
00250   }
00251 }
00252 
00253 void
00254 DataWriterImpl::transport_assoc_done(int flags, const RepoId& remote_id)
00255 {
00256   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00257 
00258   if (!(flags & ASSOC_OK)) {
00259     if (DCPS_debug_level) {
00260       const GuidConverter conv(remote_id);
00261       ACE_ERROR((LM_ERROR,
00262                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00263                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00264                  OPENDDS_STRING(conv).c_str()));
00265     }
00266 
00267     return;
00268   }
00269   if (DCPS_debug_level) {
00270     const GuidConverter writer_conv(publication_id_);
00271     const GuidConverter conv(remote_id);
00272     ACE_DEBUG((LM_INFO,
00273                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00274                ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00275                OPENDDS_STRING(writer_conv).c_str(),
00276                OPENDDS_STRING(conv).c_str()));
00277   }
00278   if (flags & ASSOC_ACTIVE) {
00279 
00280     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00281 
00282     // Have we already received an association_complete() callback?
00283     if (assoc_complete_readers_.count(remote_id)) {
00284       if (DCPS_debug_level) {
00285         const GuidConverter writer_conv(publication_id_);
00286         const GuidConverter converter(remote_id);
00287         ACE_DEBUG((LM_DEBUG,
00288                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00289                    ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00290                    OPENDDS_STRING(writer_conv).c_str(),
00291                    OPENDDS_STRING(converter).c_str()));
00292       }
00293       assoc_complete_readers_.erase(remote_id);
00294       association_complete_i(remote_id);
00295 
00296       // Add to pending_readers_ -> pending means we are waiting
00297       // for the association_complete() callback.
00298 
00299     } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00300       const GuidConverter converter(remote_id);
00301       ACE_ERROR((LM_ERROR,
00302                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00303                  ACE_TEXT("failed to mark %C as pending.\n"),
00304                  OPENDDS_STRING(converter).c_str()));
00305 
00306     } else {
00307       if (DCPS_debug_level) {
00308         const GuidConverter converter(remote_id);
00309         ACE_DEBUG((LM_DEBUG,
00310                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00311                    ACE_TEXT("marked %C as pending.\n"),
00312                    OPENDDS_STRING(converter).c_str()));
00313       }
00314     }
00315 
00316   } else {
00317     // In the current implementation, DataWriter is always active, so this
00318     // code will not be applicable.
00319     if (DCPS_debug_level) {
00320       const GuidConverter conv(publication_id_);
00321       ACE_ERROR((LM_ERROR,
00322                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00323                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00324                  OPENDDS_STRING(conv).c_str()));
00325     }
00326     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00327     disco->association_complete(domain_id_, dp_id_,
00328                                 publication_id_, remote_id);
00329   }
00330 }
00331 
00332 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
00333                                        const char* filter,
00334                                        const DDS::StringSeq& params,
00335                                        WeakRcHandle<DomainParticipantImpl> participant,
00336                                        bool durable)
00337 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00338   : participant_(participant)
00339   , filter_class_name_(filterClassName)
00340   , filter_(filter)
00341   , expression_params_(params)
00342   , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00343   , durable_(durable)
00344 {
00345   RcHandle<DomainParticipantImpl> part = participant_.lock();
00346   if (part && *filter) {
00347     eval_ = part->get_filter_eval(filter);
00348   }
00349 }
00350 #else
00351   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00352   , durable_(durable)
00353 {
00354   ACE_UNUSED_ARG(filterClassName);
00355   ACE_UNUSED_ARG(filter);
00356   ACE_UNUSED_ARG(params);
00357   ACE_UNUSED_ARG(participant);
00358 }
00359 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00360 
00361 DataWriterImpl::ReaderInfo::~ReaderInfo()
00362 {
00363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00364   eval_ = RcHandle<FilterEvaluator>();
00365   RcHandle<DomainParticipantImpl> participant = participant_.lock();
00366   if (participant && !filter_.empty()) {
00367     participant->deref_filter_eval(filter_.c_str());
00368   }
00369 
00370 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00371 }
00372 
00373 void
00374 DataWriterImpl::association_complete(const RepoId& remote_id)
00375 {
00376   DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00377 
00378   if (DCPS_debug_level >= 1) {
00379     GuidConverter writer_converter(this->publication_id_);
00380     GuidConverter reader_converter(remote_id);
00381     ACE_DEBUG((LM_DEBUG,
00382                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00383                ACE_TEXT("bit %d local %C remote %C\n"),
00384                is_bit_,
00385                OPENDDS_STRING(writer_converter).c_str(),
00386                OPENDDS_STRING(reader_converter).c_str()));
00387   }
00388 
00389   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00390 
00391   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00392     if (DCPS_debug_level) {
00393       GuidConverter writer_converter(this->publication_id_);
00394       GuidConverter reader_converter(remote_id);
00395       ACE_DEBUG((LM_DEBUG,
00396                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00397                  ACE_TEXT("bit %d local %C did not find pending reader: %C")
00398                  ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00399                  is_bit_,
00400                  OPENDDS_STRING(writer_converter).c_str(),
00401                  OPENDDS_STRING(reader_converter).c_str()));
00402     }
00403     // Not found in pending_readers_, defer calling association_complete_i()
00404     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00405     assoc_complete_readers_.insert(remote_id);
00406 
00407   } else {
00408     association_complete_i(remote_id);
00409   }
00410 }
00411 
00412 void
00413 DataWriterImpl::association_complete_i(const RepoId& remote_id)
00414 {
00415   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00416 
00417   if (DCPS_debug_level >= 1) {
00418     GuidConverter writer_converter(this->publication_id_);
00419     GuidConverter reader_converter(remote_id);
00420     ACE_DEBUG((LM_DEBUG,
00421                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00422                ACE_TEXT("bit %d local %C remote %C\n"),
00423                is_bit_,
00424                OPENDDS_STRING(writer_converter).c_str(),
00425                OPENDDS_STRING(reader_converter).c_str()));
00426   }
00427 
00428   bool reader_durable = false;
00429 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00430   OPENDDS_STRING filterClassName;
00431   RcHandle<FilterEvaluator> eval;
00432   DDS::StringSeq expression_params;
00433 #endif
00434   {
00435     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00436 
00437     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00438       GuidConverter converter(remote_id);
00439       ACE_ERROR((LM_ERROR,
00440                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00441                  ACE_TEXT("insert %C from pending failed.\n"),
00442                  OPENDDS_STRING(converter).c_str()));
00443     }
00444   }
00445   {
00446     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00447     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00448 
00449     if (it != reader_info_.end()) {
00450       reader_durable = it->second.durable_;
00451 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00452       filterClassName = it->second.filter_class_name_;
00453       eval = it->second.eval_;
00454       expression_params = it->second.expression_params_;
00455 #endif
00456     }
00457   }
00458 
00459   if (this->monitor_) {
00460     this->monitor_->report();
00461   }
00462 
00463   if (!is_bit_) {
00464 
00465     RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00466 
00467     if (!participant)
00468       return;
00469 
00470     DDS::InstanceHandle_t handle =
00471       participant->id_to_handle(remote_id);
00472 
00473     {
00474       // protect publication_match_status_ and status changed flags.
00475       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00476 
00477       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00478         GuidConverter converter(remote_id);
00479         ACE_DEBUG((LM_WARNING,
00480                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
00481                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00482                    OPENDDS_STRING(converter).c_str(),
00483                    handle));
00484         return;
00485 
00486       } else if (DCPS_debug_level > 4) {
00487         GuidConverter converter(remote_id);
00488         ACE_DEBUG((LM_DEBUG,
00489                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00490                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00491                    OPENDDS_STRING(converter).c_str(),
00492                    handle));
00493       }
00494 
00495       ++publication_match_status_.total_count;
00496       ++publication_match_status_.total_count_change;
00497       ++publication_match_status_.current_count;
00498       ++publication_match_status_.current_count_change;
00499       publication_match_status_.last_subscription_handle = handle;
00500       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00501     }
00502 
00503     DDS::DataWriterListener_var listener =
00504       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00505 
00506     if (!CORBA::is_nil(listener.in())) {
00507 
00508       listener->on_publication_matched(this, publication_match_status_);
00509 
00510       // TBD - why does the spec say to change this but not
00511       // change the ChangeFlagStatus after a listener call?
00512       publication_match_status_.total_count_change = 0;
00513       publication_match_status_.current_count_change = 0;
00514     }
00515 
00516     notify_status_condition();
00517   }
00518 
00519   // Support DURABILITY QoS
00520   if (reader_durable) {
00521     // Tell the WriteDataContainer to resend all sending/sent
00522     // samples.
00523     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00524 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00525                                          , filterClassName, eval.in(), expression_params
00526 #endif
00527                                         );
00528 
00529     // Acquire the data writer container lock to avoid deadlock. The
00530     // thread calling association_complete() has to acquire lock in the
00531     // same order as the write()/register() operation.
00532 
00533     // Since the thread calling association_complete() is the ORB
00534     // thread, it may have some performance penalty. If the
00535     // performance is an issue, we may need a new thread to handle the
00536     // data_available() calls.
00537     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00538               guard,
00539               this->get_lock());
00540 
00541     SendStateDataSampleList list = this->get_resend_data();
00542     {
00543       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00544       // Update the reader's expected sequence
00545       SequenceNumber& seq =
00546         reader_info_.find(remote_id)->second.expected_sequence_;
00547 
00548       for (SendStateDataSampleList::iterator list_el = list.begin();
00549            list_el != list.end(); ++list_el) {
00550         list_el->get_header().historic_sample_ = true;
00551 
00552         if (list_el->get_header().sequence_ > seq) {
00553           seq = list_el->get_header().sequence_;
00554         }
00555       }
00556     }
00557 
00558     RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00559     if (!publisher || publisher->is_suspended()) {
00560       this->available_data_list_.enqueue_tail(list);
00561 
00562     } else {
00563       if (DCPS_debug_level >= 4) {
00564         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00565       }
00566 
00567       size_t size = 0, padding = 0;
00568       gen_find_size(remote_id, size, padding);
00569       Message_Block_Ptr data(
00570         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00571                               get_db_lock()));
00572       Serializer ser(data.get());
00573       ser << remote_id;
00574 
00575       const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00576       DataSampleHeader header;
00577       Message_Block_Ptr end_historic_samples(
00578         create_control_message(END_HISTORIC_SAMPLES, header, move(data), timestamp));
00579 
00580       this->controlTracker.message_sent();
00581       guard.release();
00582       SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
00583       if (ret == SEND_CONTROL_ERROR) {
00584         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00585                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
00586                              ACE_TEXT("send_w_control failed.\n")));
00587         this->controlTracker.message_dropped();
00588       }
00589     }
00590   }
00591 }
00592 
00593 void
00594 DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
00595                                     CORBA::Boolean notify_lost)
00596 {
00597   if (readers.length() == 0) {
00598     return;
00599   }
00600 
00601   if (DCPS_debug_level >= 1) {
00602     GuidConverter writer_converter(publication_id_);
00603     GuidConverter reader_converter(readers[0]);
00604     ACE_DEBUG((LM_DEBUG,
00605                ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
00606                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00607                is_bit_,
00608                OPENDDS_STRING(writer_converter).c_str(),
00609                OPENDDS_STRING(reader_converter).c_str(),
00610                readers.length()));
00611   }
00612 
00613   // stop pending associations for these reader ids
00614   this->stop_associating(readers.get_buffer(), readers.length());
00615 
00616   ReaderIdSeq fully_associated_readers;
00617   CORBA::ULong fully_associated_len = 0;
00618   ReaderIdSeq rds;
00619   CORBA::ULong rds_len = 0;
00620   DDS::InstanceHandleSeq handles;
00621 
00622   ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
00623   {
00624     // Ensure the same acquisition order as in wait_for_acknowledgments().
00625     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00626     //Remove the readers from fully associated reader list.
00627     //If the supplied reader is not in the cached reader list then it is
00628     //already removed. We just need remove the readers in the list that have
00629     //not been removed.
00630 
00631     CORBA::ULong len = readers.length();
00632 
00633     for (CORBA::ULong i = 0; i < len; ++i) {
00634       //Remove the readers from fully associated reader list. If it's not
00635       //in there, the association_complete() is not called yet and remove it
00636       //from pending list.
00637 
00638       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00639         ++ fully_associated_len;
00640         fully_associated_readers.length(fully_associated_len);
00641         fully_associated_readers [fully_associated_len - 1] = readers[i];
00642 
00643         ++ rds_len;
00644         rds.length(rds_len);
00645         rds [rds_len - 1] = readers[i];
00646 
00647       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00648         ++ rds_len;
00649         rds.length(rds_len);
00650         rds [rds_len - 1] = readers[i];
00651 
00652         GuidConverter converter(readers[i]);
00653         ACE_DEBUG((LM_WARNING,
00654                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_associations: ")
00655                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00656                    OPENDDS_STRING(converter).c_str()));
00657       }
00658 
00659       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00660       reader_info_.erase(readers[i]);
00661       //else reader is already removed which indicates remove_association()
00662       //is called multiple times.
00663     }
00664 
00665     if (fully_associated_len > 0 && !is_bit_) {
00666       // The reader should be in the id_to_handle map at this time
00667       this->lookup_instance_handles(fully_associated_readers, handles);
00668 
00669       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00670         id_to_handle_map_.erase(fully_associated_readers[i]);
00671       }
00672     }
00673 
00674     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00675     // association_complete() here.
00676     if (!this->is_bit_) {
00677 
00678       // Derive the change in the number of subscriptions reading this writer.
00679       int matchedSubscriptions =
00680         static_cast<int>(this->id_to_handle_map_.size());
00681       this->publication_match_status_.current_count_change =
00682         matchedSubscriptions - this->publication_match_status_.current_count;
00683 
00684       // Only process status if the number of subscriptions has changed.
00685       if (this->publication_match_status_.current_count_change != 0) {
00686         this->publication_match_status_.current_count = matchedSubscriptions;
00687 
00688         /// Section 7.1.4.1: total_count will not decrement.
00689 
00690         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00691         this->publication_match_status_.last_subscription_handle =
00692           handles[fully_associated_len - 1];
00693 
00694         set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00695 
00696         DDS::DataWriterListener_var listener =
00697           this->listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00698 
00699         if (!CORBA::is_nil(listener.in())) {
00700           listener->on_publication_matched(this, this->publication_match_status_);
00701 
00702           // Listener consumes the change.
00703           this->publication_match_status_.total_count_change = 0;
00704           this->publication_match_status_.current_count_change = 0;
00705         }
00706 
00707         this->notify_status_condition();
00708       }
00709     }
00710   }
00711 
00712   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00713     this->disassociate(rds[i]);
00714   }
00715 
00716   // If this remove_association is invoked when the InfoRepo
00717   // detects a lost reader then make a callback to notify
00718   // subscription lost.
00719   if (notify_lost && handles.length() > 0) {
00720     this->notify_publication_lost(handles);
00721   }
00722 }
00723 
00724 void DataWriterImpl::remove_all_associations()
00725 {
00726   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00727   // stop pending associations
00728   this->stop_associating();
00729 
00730   OpenDDS::DCPS::ReaderIdSeq readers;
00731   CORBA::ULong size;
00732   CORBA::ULong num_pending_readers;
00733   {
00734     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00735 
00736     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00737     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00738     readers.length(size);
00739 
00740     RepoIdSet::iterator itEnd = readers_.end();
00741     int i = 0;
00742 
00743     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00744       readers[i ++] = *it;
00745     }
00746 
00747     itEnd = pending_readers_.end();
00748 
00749     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00750       readers[i ++] = *it;
00751     }
00752 
00753     if (num_pending_readers > 0) {
00754       ACE_DEBUG((LM_WARNING,
00755                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00756                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00757                  num_pending_readers));
00758     }
00759   }
00760 
00761   try {
00762     if (0 < size) {
00763       CORBA::Boolean dont_notify_lost = false;
00764 
00765       this->remove_associations(readers, dont_notify_lost);
00766     }
00767 
00768   } catch (const CORBA::Exception&) {
00769       ACE_DEBUG((LM_WARNING,
00770                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00771                  ACE_TEXT("caught exception from remove_associations.\n")));
00772   }
00773 }
00774 
00775 void
00776 DataWriterImpl::register_for_reader(const RepoId& participant,
00777                                     const RepoId& writerid,
00778                                     const RepoId& readerid,
00779                                     const TransportLocatorSeq& locators,
00780                                     DiscoveryListener* listener)
00781 {
00782   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00783 }
00784 
00785 void
00786 DataWriterImpl::unregister_for_reader(const RepoId& participant,
00787                                       const RepoId& writerid,
00788                                       const RepoId& readerid)
00789 {
00790   TransportClient::unregister_for_reader(participant, writerid, readerid);
00791 }
00792 
00793 void
00794 DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00795 {
00796   DDS::DataWriterListener_var listener =
00797     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00798 
00799   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00800 
00801 #if 0
00802 
00803   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00804     // This test should make the method idempotent.
00805     return;
00806   }
00807 
00808 #endif
00809 
00810   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00811 
00812   // copy status and increment change
00813   offered_incompatible_qos_status_.total_count = status.total_count;
00814   offered_incompatible_qos_status_.total_count_change +=
00815     status.count_since_last_send;
00816   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00817   offered_incompatible_qos_status_.policies = status.policies;
00818 
00819   if (!CORBA::is_nil(listener.in())) {
00820     listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
00821 
00822     // TBD - Why does the spec say to change this but not change the
00823     //       ChangeFlagStatus after a listener call?
00824     offered_incompatible_qos_status_.total_count_change = 0;
00825   }
00826 
00827   notify_status_condition();
00828 }
00829 
00830 void
00831 DataWriterImpl::update_subscription_params(const RepoId& readerId,
00832                                            const DDS::StringSeq& params)
00833 {
00834 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00835   ACE_UNUSED_ARG(readerId);
00836   ACE_UNUSED_ARG(params);
00837 #else
00838   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00839   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00840   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00841 
00842   if (iter != reader_info_.end()) {
00843     iter->second.expression_params_ = params;
00844 
00845   } else if (DCPS_debug_level > 4 &&
00846              TheServiceParticipant->publisher_content_filter()) {
00847     GuidConverter pubConv(this->publication_id_), subConv(readerId);
00848     ACE_DEBUG((LM_WARNING,
00849                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00850                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00851                OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00852   }
00853 
00854 #endif
00855 }
00856 
00857 void
00858 DataWriterImpl::inconsistent_topic()
00859 {
00860   topic_servant_->inconsistent_topic();
00861 }
00862 
00863 DDS::ReturnCode_t
00864 DataWriterImpl::set_qos(const DDS::DataWriterQos & qos)
00865 {
00866 
00867   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00868   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00869   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00870   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00871   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00872 
00873   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00874     if (qos_ == qos)
00875       return DDS::RETCODE_OK;
00876 
00877     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00878       return DDS::RETCODE_IMMUTABLE_POLICY;
00879 
00880     } else {
00881       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00882       DDS::PublisherQos publisherQos;
00883       RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00884 
00885       bool status = false;
00886       if (publisher) {
00887         publisher->get_qos(publisherQos);
00888         status
00889           = disco->update_publication_qos(domain_id_,
00890                                           dp_id_,
00891                                           this->publication_id_,
00892                                           qos,
00893                                           publisherQos);
00894       }
00895       if (!status) {
00896         ACE_ERROR_RETURN((LM_ERROR,
00897                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00898                           ACE_TEXT("qos not updated. \n")),
00899                          DDS::RETCODE_ERROR);
00900       }
00901     }
00902 
00903     if (!(qos_ == qos)) {
00904       // Reset the deadline timer if the period has changed.
00905       if (qos_.deadline.period.sec != qos.deadline.period.sec
00906           || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00907         if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00908             && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00909           this->watchdog_= make_rch<OfferedDeadlineWatchdog>(
00910                                ref(this->lock_),
00911                                qos.deadline,
00912                                ref(*this),
00913                                ref(this->offered_deadline_missed_status_),
00914                                ref(this->last_deadline_missed_total_count_));
00915 
00916         } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00917                    && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00918           this->watchdog_->cancel_all();
00919           this->watchdog_.reset();
00920 
00921         } else {
00922           this->watchdog_->reset_interval(
00923             duration_to_time_value(qos.deadline.period));
00924         }
00925       }
00926 
00927       qos_ = qos;
00928     }
00929 
00930     return DDS::RETCODE_OK;
00931 
00932   } else {
00933     return DDS::RETCODE_INCONSISTENT_POLICY;
00934   }
00935 }
00936 
00937 DDS::ReturnCode_t
00938 DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
00939 {
00940   qos = qos_;
00941   return DDS::RETCODE_OK;
00942 }
00943 
00944 DDS::ReturnCode_t
00945 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
00946                              DDS::StatusMask mask)
00947 {
00948   listener_mask_ = mask;
00949   //note: OK to duplicate  a nil object ref
00950   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00951   return DDS::RETCODE_OK;
00952 }
00953 
00954 DDS::DataWriterListener_ptr
00955 DataWriterImpl::get_listener()
00956 {
00957   return DDS::DataWriterListener::_duplicate(listener_.in());
00958 }
00959 
00960 DDS::Topic_ptr
00961 DataWriterImpl::get_topic()
00962 {
00963   return DDS::Topic::_duplicate(topic_servant_.get());
00964 }
00965 
00966 bool
00967 DataWriterImpl::should_ack() const
00968 {
00969   // N.B. It may be worthwhile to investigate a more efficient
00970   // heuristic for determining if a writer should send SAMPLE_ACK
00971   // control samples. Perhaps based on a sequence number delta?
00972   return this->readers_.size() != 0;
00973 }
00974 
00975 DataWriterImpl::AckToken
00976 DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
00977 {
00978   if (DCPS_debug_level > 0) {
00979     ACE_DEBUG((LM_DEBUG,
00980                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
00981                ACE_TEXT("for sequence %q \n"),
00982                this->sequence_number_.getValue()));
00983   }
00984   return AckToken(max_wait, this->sequence_number_);
00985 }
00986 
00987 
00988 
00989 DDS::ReturnCode_t
00990 DataWriterImpl::send_request_ack()
00991 {
00992   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00993                    guard,
00994                    get_lock(),
00995                    DDS::RETCODE_ERROR);
00996 
00997 
00998   DataSampleElement* element = 0;
00999   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
01000 
01001   if (ret != DDS::RETCODE_OK) {
01002     ACE_ERROR_RETURN((LM_ERROR,
01003                       ACE_TEXT("(%P|%t) ERROR: ")
01004                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
01005                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01006                       ret),
01007                      ret);
01008   }
01009 
01010   Message_Block_Ptr blk;
01011   // Add header with the registration sample data.
01012   Message_Block_Ptr sample(create_control_message(REQUEST_ACK,
01013                                              element->get_header(),
01014                                              move(blk),
01015                                              time_value_to_time( ACE_OS::gettimeofday() )));
01016   element->set_sample(move(sample));
01017 
01018   ret = this->data_container_->enqueue_control(element);
01019 
01020   if (ret != DDS::RETCODE_OK) {
01021     ACE_ERROR_RETURN((LM_ERROR,
01022                       ACE_TEXT("(%P|%t) ERROR: ")
01023                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
01024                       ACE_TEXT("enqueue_control failed.\n")),
01025                      ret);
01026   }
01027 
01028 
01029   send_all_to_flush_control(guard);
01030 
01031   return DDS::RETCODE_OK;
01032 }
01033 
01034 DDS::ReturnCode_t
01035 DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
01036 {
01037   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01038     return DDS::RETCODE_OK;
01039 
01040   DDS::ReturnCode_t ret = send_request_ack();
01041 
01042   if (ret != DDS::RETCODE_OK)
01043     return ret;
01044 
01045   DataWriterImpl::AckToken token = create_ack_token(max_wait);
01046   if (DCPS_debug_level) {
01047     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01048                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01049                           token.sequence_.getValue()));
01050   }
01051   return wait_for_specific_ack(token);
01052 }
01053 
01054 DDS::ReturnCode_t
01055 DataWriterImpl::wait_for_specific_ack(const AckToken& token)
01056 {
01057   return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01058 }
01059 
01060 DDS::Publisher_ptr
01061 DataWriterImpl::get_publisher()
01062 {
01063   return publisher_servant_.lock()._retn();
01064 }
01065 
01066 DDS::ReturnCode_t
01067 DataWriterImpl::get_liveliness_lost_status(
01068   DDS::LivelinessLostStatus & status)
01069 {
01070   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01071                    guard,
01072                    this->lock_,
01073                    DDS::RETCODE_ERROR);
01074   set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01075   status = liveliness_lost_status_;
01076   liveliness_lost_status_.total_count_change = 0;
01077   return DDS::RETCODE_OK;
01078 }
01079 
01080 DDS::ReturnCode_t
01081 DataWriterImpl::get_offered_deadline_missed_status(
01082   DDS::OfferedDeadlineMissedStatus & status)
01083 {
01084   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085                    guard,
01086                    this->lock_,
01087                    DDS::RETCODE_ERROR);
01088 
01089   set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01090 
01091   this->offered_deadline_missed_status_.total_count_change =
01092     this->offered_deadline_missed_status_.total_count
01093     - this->last_deadline_missed_total_count_;
01094 
01095   // Update for next status check.
01096   this->last_deadline_missed_total_count_ =
01097     this->offered_deadline_missed_status_.total_count;
01098 
01099   status = offered_deadline_missed_status_;
01100 
01101   this->offered_deadline_missed_status_.total_count_change = 0;
01102 
01103   return DDS::RETCODE_OK;
01104 }
01105 
01106 DDS::ReturnCode_t
01107 DataWriterImpl::get_offered_incompatible_qos_status(
01108   DDS::OfferedIncompatibleQosStatus & status)
01109 {
01110   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01111                    guard,
01112                    this->lock_,
01113                    DDS::RETCODE_ERROR);
01114   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01115   status = offered_incompatible_qos_status_;
01116   offered_incompatible_qos_status_.total_count_change = 0;
01117   return DDS::RETCODE_OK;
01118 }
01119 
01120 DDS::ReturnCode_t
01121 DataWriterImpl::get_publication_matched_status(
01122   DDS::PublicationMatchedStatus & status)
01123 {
01124   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01125                    guard,
01126                    this->lock_,
01127                    DDS::RETCODE_ERROR);
01128   set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01129   status = publication_match_status_;
01130   publication_match_status_.total_count_change = 0;
01131   publication_match_status_.current_count_change = 0;
01132   return DDS::RETCODE_OK;
01133 }
01134 
01135 DDS::ReturnCode_t
01136 DataWriterImpl::assert_liveliness()
01137 {
01138   switch (this->qos_.liveliness.kind) {
01139   case DDS::AUTOMATIC_LIVELINESS_QOS:
01140     // Do nothing.
01141     break;
01142   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01143     {
01144       RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01145       if (participant)
01146         return participant->assert_liveliness();
01147       return DDS::RETCODE_OK;
01148     }
01149   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01150     if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01151       return DDS::RETCODE_ERROR;
01152     }
01153     break;
01154   }
01155 
01156   return DDS::RETCODE_OK;
01157 }
01158 
01159 DDS::ReturnCode_t
01160 DataWriterImpl::assert_liveliness_by_participant()
01161 {
01162   // This operation is called by participant.
01163 
01164   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01165     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
01166     liveliness_asserted_ = true;
01167   }
01168 
01169   return DDS::RETCODE_OK;
01170 }
01171 
01172 ACE_Time_Value
01173 DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
01174 {
01175   if (this->qos_.liveliness.kind == kind) {
01176     return liveliness_check_interval_;
01177   } else {
01178     return ACE_Time_Value::max_time;
01179   }
01180 }
01181 
01182 bool
01183 DataWriterImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
01184 {
01185   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01186     return last_liveliness_activity_time_ > tv;
01187   } else {
01188     return false;
01189   }
01190 }
01191 
01192 DDS::ReturnCode_t
01193 DataWriterImpl::get_matched_subscriptions(
01194   DDS::InstanceHandleSeq & subscription_handles)
01195 {
01196   if (enabled_ == false) {
01197     ACE_ERROR_RETURN((LM_ERROR,
01198                       ACE_TEXT("(%P|%t) ERROR: ")
01199                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01200                       ACE_TEXT(" Entity is not enabled. \n")),
01201                      DDS::RETCODE_NOT_ENABLED);
01202   }
01203 
01204   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01205                    guard,
01206                    this->lock_,
01207                    DDS::RETCODE_ERROR);
01208 
01209   // Copy out the handles for the current set of subscriptions.
01210   int index = 0;
01211   subscription_handles.length(
01212     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01213 
01214   for (RepoIdToHandleMap::iterator
01215        current = this->id_to_handle_map_.begin();
01216        current != this->id_to_handle_map_.end();
01217        ++current, ++index) {
01218     subscription_handles[index] = current->second;
01219   }
01220 
01221   return DDS::RETCODE_OK;
01222 }
01223 
01224 #if !defined (DDS_HAS_MINIMUM_BIT)
01225 DDS::ReturnCode_t
01226 DataWriterImpl::get_matched_subscription_data(
01227   DDS::SubscriptionBuiltinTopicData & subscription_data,
01228   DDS::InstanceHandle_t subscription_handle)
01229 {
01230   if (enabled_ == false) {
01231     ACE_ERROR_RETURN((LM_ERROR,
01232                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01233                       ACE_TEXT("get_matched_subscription_data: ")
01234                       ACE_TEXT("Entity is not enabled. \n")),
01235                      DDS::RETCODE_NOT_ENABLED);
01236   }
01237   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01238 
01239   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01240   DDS::SubscriptionBuiltinTopicDataSeq data;
01241 
01242   if (participant) {
01243     ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
01244             participant.in(),
01245             BUILT_IN_SUBSCRIPTION_TOPIC,
01246             subscription_handle,
01247             data);
01248   }
01249 
01250   if (ret == DDS::RETCODE_OK) {
01251     subscription_data = data[0];
01252   }
01253 
01254   return ret;
01255 }
01256 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01257 
01258 DDS::ReturnCode_t
01259 DataWriterImpl::enable()
01260 {
01261   //According spec:
01262   // - Calling enable on an already enabled Entity returns OK and has no
01263   // effect.
01264   // - Calling enable on an Entity whose factory is not enabled will fail
01265   // and return PRECONDITION_NOT_MET.
01266 
01267   if (this->is_enabled()) {
01268     return DDS::RETCODE_OK;
01269   }
01270 
01271   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01272   if (!publisher || !publisher->is_enabled()) {
01273     return DDS::RETCODE_PRECONDITION_NOT_MET;
01274   }
01275 
01276   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01277   if (participant) {
01278     dp_id_ = participant->get_id();
01279   }
01280 
01281   // Note: do configuration based on QoS in enable() because
01282   //       before enable is called the QoS can be changed -- even
01283   //       for Changeable=NO
01284 
01285   // Configure WriteDataContainer constructor parameters from qos.
01286 
01287   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01288 
01289   CORBA::Long const max_samples_per_instance =
01290     (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED)
01291     ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance;
01292 
01293   CORBA::Long max_instances = 0, max_total_samples = 0;
01294 
01295   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01296     n_chunks_ = qos_.resource_limits.max_samples;
01297 
01298     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED ||
01299         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01300         || (qos_.resource_limits.max_samples <
01301             (qos_.resource_limits.max_instances * max_samples_per_instance))) {
01302       max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
01303     }
01304   }
01305 
01306   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01307     max_instances = qos_.resource_limits.max_instances;
01308 
01309   const CORBA::Long history_depth =
01310     (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS ||
01311      qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth;
01312 
01313   const CORBA::Long max_durable_per_instance =
01314     qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
01315 
01316   // enable the type specific part of this DataWriter
01317   this->enable_specific();
01318 
01319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01320   // Get data durability cache if DataWriter QoS requires durable
01321   // samples.  Publisher servant retains ownership of the cache.
01322   DataDurabilityCache* const durability_cache =
01323     TheServiceParticipant->get_data_durability_cache(qos_.durability);
01324 #endif
01325 
01326   //Note: the QoS used to set n_chunks_ is Changable=No so
01327   // it is OK that we cannot change the size of our allocators.
01328   data_container_ .reset(new WriteDataContainer(this,
01329                                            max_samples_per_instance,
01330                                            history_depth,
01331                                            max_durable_per_instance,
01332                                            qos_.reliability.max_blocking_time,
01333                                            n_chunks_,
01334                                            domain_id_,
01335                                            topic_name_,
01336                                            get_type_name(),
01337 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01338                                            durability_cache,
01339                                            qos_.durability_service,
01340 #endif
01341                                            max_instances,
01342                                            max_total_samples));
01343 
01344   // +1 because we might allocate one before releasing another
01345   // TBD - see if this +1 can be removed.
01346   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
01347   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
01348   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
01349 
01350   if (DCPS_debug_level >= 2) {
01351     ACE_DEBUG((LM_DEBUG,
01352                "(%P|%t) DataWriterImpl::enable-mb"
01353                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01354                mb_allocator_.get(),
01355                n_chunks_));
01356 
01357     ACE_DEBUG((LM_DEBUG,
01358                "(%P|%t) DataWriterImpl::enable-db"
01359                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01360                db_allocator_.get(),
01361                n_chunks_));
01362 
01363     ACE_DEBUG((LM_DEBUG,
01364                "(%P|%t) DataWriterImpl::enable-header"
01365                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01366                header_allocator_.get(),
01367                n_chunks_));
01368   }
01369 
01370   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01371       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01372     liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01373     liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01374     // Must be at least 1 micro second.
01375     if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01376       liveliness_check_interval_ = ACE_Time_Value (0, 1);
01377     }
01378 
01379     if (reactor_->schedule_timer(liveness_timer_.in(),
01380                                  0,
01381                                  liveliness_check_interval_,
01382                                  liveliness_check_interval_) == -1) {
01383       ACE_ERROR((LM_ERROR,
01384                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01385                  ACE_TEXT("schedule_timer")));
01386 
01387     }
01388   }
01389 
01390   if (!participant)
01391     return DDS::RETCODE_ERROR;
01392 
01393   participant->add_adjust_liveliness_timers(this);
01394 
01395   // Setup the offered deadline watchdog if the configured deadline
01396   // period is not the default (infinite).
01397   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01398 
01399   if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01400       || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01401     this->watchdog_ = make_rch<OfferedDeadlineWatchdog>(
01402                            ref(this->lock_),
01403                            this->qos_.deadline,
01404                            ref(*this),
01405                            ref(this->offered_deadline_missed_status_),
01406                            ref(this->last_deadline_missed_total_count_));
01407   }
01408 
01409   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01410   disco->pre_writer(this);
01411 
01412   this->set_enabled();
01413 
01414   try {
01415     this->enable_transport(reliable,
01416                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01417 
01418   } catch (const Transport::Exception&) {
01419     ACE_ERROR((LM_ERROR,
01420                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01421                ACE_TEXT("Transport Exception.\n")));
01422     data_container_->shutdown_ = true;
01423     return DDS::RETCODE_ERROR;
01424   }
01425 
01426   const TransportLocatorSeq& trans_conf_info = connection_info();
01427 
01428   DDS::PublisherQos pub_qos;
01429 
01430   publisher->get_qos(pub_qos);
01431 
01432   this->publication_id_ =
01433     disco->add_publication(this->domain_id_,
01434                            this->dp_id_,
01435                            this->topic_servant_->get_id(),
01436                            this,
01437                            this->qos_,
01438                            trans_conf_info,
01439                            pub_qos);
01440 
01441 
01442   if (!publisher || this->publication_id_ == GUID_UNKNOWN) {
01443     ACE_DEBUG((LM_WARNING,
01444                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::enable, ")
01445                ACE_TEXT("add_publication returned invalid id. \n")));
01446     data_container_->shutdown_ = true;
01447     return DDS::RETCODE_ERROR;
01448   }
01449 
01450   this->data_container_->publication_id_ = this->publication_id_;
01451 
01452   const DDS::ReturnCode_t writer_enabled_result =
01453     publisher->writer_enabled(topic_name_.in(), this);
01454 
01455   if (this->monitor_) {
01456     this->monitor_->report();
01457   }
01458 
01459 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01460 
01461   // Move cached data from the durability cache to the unsent data
01462   // queue.
01463   if (durability_cache != 0) {
01464 
01465     if (!durability_cache->get_data(this->domain_id_,
01466                                     this->topic_name_,
01467                                     get_type_name(),
01468                                     this,
01469                                     this->mb_allocator_.get(),
01470                                     this->db_allocator_.get(),
01471                                     this->qos_.lifespan)) {
01472       ACE_ERROR((LM_ERROR,
01473                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01474                  ACE_TEXT("unable to retrieve durable data\n")));
01475     }
01476   }
01477 
01478 #endif
01479 
01480   return writer_enabled_result;
01481 }
01482 
01483 void
01484 DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
01485 {
01486   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01487 
01488   SendStateDataSampleList list;
01489 
01490   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01491 
01492   controlTracker.message_sent();
01493 
01494   //need to release guard to call down to transport
01495   guard.release();
01496 
01497   this->send(list, transaction_id);
01498 }
01499 
01500 DDS::ReturnCode_t
01501 DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
01502                                     Message_Block_Ptr data,
01503                                     const DDS::Time_t& source_timestamp)
01504 {
01505   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01506 
01507   if (enabled_ == false) {
01508     ACE_ERROR_RETURN((LM_ERROR,
01509                       ACE_TEXT("(%P|%t) ERROR: ")
01510                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01511                       ACE_TEXT(" Entity is not enabled. \n")),
01512                      DDS::RETCODE_NOT_ENABLED);
01513   }
01514 
01515   DDS::ReturnCode_t ret =
01516     this->data_container_->register_instance(handle, data);
01517 
01518   if (ret != DDS::RETCODE_OK) {
01519     ACE_ERROR_RETURN((LM_ERROR,
01520                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01521                       ACE_TEXT("register instance with container failed.\n")),
01522                      ret);
01523   }
01524 
01525   if (this->monitor_) {
01526     this->monitor_->report();
01527   }
01528 
01529   DataSampleElement* element = 0;
01530   ret = this->data_container_->obtain_buffer_for_control(element);
01531 
01532   if (ret != DDS::RETCODE_OK) {
01533     ACE_ERROR_RETURN((LM_ERROR,
01534                       ACE_TEXT("(%P|%t) ERROR: ")
01535                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01536                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01537                       ret),
01538                      ret);
01539   }
01540 
01541   // Add header with the registration sample data.
01542   Message_Block_Ptr sample(create_control_message(INSTANCE_REGISTRATION,
01543                                              element->get_header(),
01544                                              move(data),
01545                                              source_timestamp));
01546 
01547   element->set_sample(move(sample));
01548 
01549   ret = this->data_container_->enqueue_control(element);
01550 
01551   if (ret != DDS::RETCODE_OK) {
01552     ACE_ERROR_RETURN((LM_ERROR,
01553                       ACE_TEXT("(%P|%t) ERROR: ")
01554                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01555                       ACE_TEXT("enqueue_control failed.\n")),
01556                      ret);
01557   }
01558 
01559   return ret;
01560 }
01561 
01562 DDS::ReturnCode_t
01563 DataWriterImpl::register_instance_from_durable_data(DDS::InstanceHandle_t& handle,
01564                                     Message_Block_Ptr data,
01565                                     const DDS::Time_t & source_timestamp)
01566 {
01567   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01568 
01569   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01570                    guard,
01571                    get_lock(),
01572                    DDS::RETCODE_ERROR);
01573 
01574   DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
01575   if (ret != DDS::RETCODE_OK) {
01576     ACE_ERROR_RETURN((LM_ERROR,
01577                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01578                       ACE_TEXT("register instance with container failed.\n")),
01579                       ret);
01580   }
01581 
01582   send_all_to_flush_control(guard);
01583 
01584   return ret;
01585 }
01586 
01587 DDS::ReturnCode_t
01588 DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
01589                                       const DDS::Time_t& source_timestamp)
01590 {
01591   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01592 
01593   if (enabled_ == false) {
01594     ACE_ERROR_RETURN((LM_ERROR,
01595                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01596                       ACE_TEXT(" Entity is not enabled.\n")),
01597                      DDS::RETCODE_NOT_ENABLED);
01598   }
01599 
01600   // According to spec 1.2, autodispose_unregistered_instances true causes
01601   // dispose on the instance prior to calling unregister operation.
01602   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01603     return this->dispose_and_unregister(handle, source_timestamp);
01604   }
01605 
01606   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01607   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01608   Message_Block_Ptr unregistered_sample_data;
01609   ret = this->data_container_->unregister(handle, unregistered_sample_data);
01610 
01611   if (ret != DDS::RETCODE_OK) {
01612     ACE_ERROR_RETURN((LM_ERROR,
01613                       ACE_TEXT("(%P|%t) ERROR: ")
01614                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01615                       ACE_TEXT(" unregister with container failed. \n")),
01616                      ret);
01617   }
01618 
01619   DataSampleElement* element = 0;
01620   ret = this->data_container_->obtain_buffer_for_control(element);
01621 
01622   if (ret != DDS::RETCODE_OK) {
01623     ACE_ERROR_RETURN((LM_ERROR,
01624                       ACE_TEXT("(%P|%t) ERROR: ")
01625                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01626                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01627                       ret),
01628                      ret);
01629   }
01630 
01631   Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE,
01632                                                   element->get_header(),
01633                                                   move(unregistered_sample_data),
01634                                                   source_timestamp));
01635   element->set_sample(move(sample));
01636   ret = this->data_container_->enqueue_control(element);
01637 
01638   if (ret != DDS::RETCODE_OK) {
01639     ACE_ERROR_RETURN((LM_ERROR,
01640                       ACE_TEXT("(%P|%t) ERROR: ")
01641                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01642                       ACE_TEXT("enqueue_control failed.\n")),
01643                      ret);
01644   }
01645 
01646   send_all_to_flush_control(guard);
01647   return DDS::RETCODE_OK;
01648 }
01649 
01650 DDS::ReturnCode_t
01651 DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
01652                                        const DDS::Time_t& source_timestamp)
01653 {
01654   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01655 
01656   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01657   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01658 
01659   Message_Block_Ptr data_sample;
01660   ret = this->data_container_->dispose(handle, data_sample);
01661 
01662   if (ret != DDS::RETCODE_OK) {
01663     ACE_ERROR_RETURN((LM_ERROR,
01664                       ACE_TEXT("(%P|%t) ERROR: ")
01665                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01666                       ACE_TEXT("dispose on container failed. \n")),
01667                      ret);
01668   }
01669 
01670   ret = this->data_container_->unregister(handle, data_sample, false);
01671 
01672   if (ret != DDS::RETCODE_OK) {
01673     ACE_ERROR_RETURN((LM_ERROR,
01674                       ACE_TEXT("(%P|%t) ERROR: ")
01675                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01676                       ACE_TEXT("unregister with container failed. \n")),
01677                      ret);
01678   }
01679 
01680   DataSampleElement* element = 0;
01681   ret = this->data_container_->obtain_buffer_for_control(element);
01682 
01683   if (ret != DDS::RETCODE_OK) {
01684     ACE_ERROR_RETURN((LM_ERROR,
01685                       ACE_TEXT("(%P|%t) ERROR: ")
01686                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01687                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01688                       ret),
01689                      ret);
01690   }
01691 
01692   Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01693                                                   element->get_header(),
01694                                                   move(data_sample),
01695                                                   source_timestamp));
01696   element->set_sample(move(sample));
01697   ret = this->data_container_->enqueue_control(element);
01698 
01699   if (ret != DDS::RETCODE_OK) {
01700     ACE_ERROR_RETURN((LM_ERROR,
01701                       ACE_TEXT("(%P|%t) ERROR: ")
01702                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01703                       ACE_TEXT("enqueue_control failed.\n")),
01704                      ret);
01705   }
01706 
01707   send_all_to_flush_control(guard);
01708   return DDS::RETCODE_OK;
01709 }
01710 
01711 void
01712 DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
01713 {
01714   {
01715     ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01716 
01717     PublicationInstanceMapType::iterator it =
01718       this->data_container_->instances_.begin();
01719 
01720     while (it != this->data_container_->instances_.end()) {
01721       if (!it->second->unregistered_) {
01722         const DDS::InstanceHandle_t handle = it->first;
01723         ++it; // avoid mangling the iterator
01724         this->unregister_instance_i(handle, source_timestamp);
01725       } else {
01726         ++it;
01727       }
01728     }
01729   }
01730 }
01731 
01732 DDS::ReturnCode_t
01733 DataWriterImpl::write(Message_Block_Ptr data,
01734                       DDS::InstanceHandle_t handle,
01735                       const DDS::Time_t& source_timestamp,
01736                       GUIDSeq* filter_out)
01737 {
01738   DBG_ENTRY_LVL("DataWriterImpl","write",6);
01739 
01740   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01741                     guard,
01742                     get_lock (),
01743                     DDS::RETCODE_ERROR);
01744 
01745   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
01746   GUIDSeq_var filter_out_var(filter_out);
01747 
01748   if (enabled_ == false) {
01749     ACE_ERROR_RETURN((LM_ERROR,
01750                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01751                       ACE_TEXT(" Entity is not enabled. \n")),
01752                      DDS::RETCODE_NOT_ENABLED);
01753   }
01754 
01755   DataSampleElement* element = 0;
01756   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01757 
01758   if (ret == DDS::RETCODE_TIMEOUT) {
01759     return ret; // silent for timeout
01760 
01761   } else if (ret != DDS::RETCODE_OK) {
01762     ACE_ERROR_RETURN((LM_ERROR,
01763                       ACE_TEXT("(%P|%t) ERROR: ")
01764                       ACE_TEXT("DataWriterImpl::write: ")
01765                       ACE_TEXT("obtain_buffer returned %d.\n"),
01766                       ret),
01767                      ret);
01768   }
01769 
01770   Message_Block_Ptr temp;
01771   ret = create_sample_data_message(move(data),
01772                                    handle,
01773                                    element->get_header(),
01774                                    temp,
01775                                    source_timestamp,
01776                                    (filter_out != 0));
01777   element->set_sample(move(temp));
01778 
01779   if (ret != DDS::RETCODE_OK) {
01780     return ret;
01781   }
01782 
01783   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
01784 
01785   ret = this->data_container_->enqueue(element, handle);
01786 
01787   if (ret != DDS::RETCODE_OK) {
01788     ACE_ERROR_RETURN((LM_ERROR,
01789                       ACE_TEXT("(%P|%t) ERROR: ")
01790                       ACE_TEXT("DataWriterImpl::write: ")
01791                       ACE_TEXT("enqueue failed.\n")),
01792                      ret);
01793   }
01794   this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01795 
01796   track_sequence_number(filter_out);
01797 
01798   if (this->coherent_) {
01799     ++this->coherent_samples_;
01800   }
01801   SendStateDataSampleList list;
01802 
01803   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01804 
01805   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01806   if (!publisher || publisher->is_suspended()) {
01807     if (min_suspended_transaction_id_ == 0) {
01808       //provides transaction id for lower bound of suspended transactions
01809       //or transaction id for single suspended write transaction
01810       min_suspended_transaction_id_ = transaction_id;
01811     } else {
01812       //when multiple write transactions have suspended, provides the upper bound
01813       //for suspended transactions.
01814       max_suspended_transaction_id_ = transaction_id;
01815     }
01816     this->available_data_list_.enqueue_tail(list);
01817 
01818   } else {
01819     guard.release();
01820 
01821     this->send(list, transaction_id);
01822   }
01823 
01824   return DDS::RETCODE_OK;
01825 }
01826 
01827 void
01828 DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
01829 {
01830   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01831 
01832 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01833   // Track individual expected sequence numbers in ReaderInfo
01834   RepoIdSet excluded;
01835 
01836   if (filter_out && !reader_info_.empty()) {
01837     const GUID_t* buf = filter_out->get_buffer();
01838     excluded.insert(buf, buf + filter_out->length());
01839   }
01840 
01841   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01842        end = reader_info_.end(); iter != end; ++iter) {
01843     // If not excluding this reader, update expected sequence
01844     if (excluded.count(iter->first) == 0) {
01845       iter->second.expected_sequence_ = sequence_number_;
01846     }
01847   }
01848 
01849 #else
01850   ACE_UNUSED_ARG(filter_out);
01851   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01852        end = reader_info_.end(); iter != end; ++iter) {
01853     iter->second.expected_sequence_ = sequence_number_;
01854   }
01855 
01856 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01857 
01858 }
01859 
01860 void
01861 DataWriterImpl::send_suspended_data()
01862 {
01863   //this serves to get TransportClient's max_transaction_id_seen_
01864   //to the correct value for this list of transactions
01865   if (max_suspended_transaction_id_ != 0) {
01866     this->send(this->available_data_list_, max_suspended_transaction_id_);
01867     max_suspended_transaction_id_ = 0;
01868   }
01869 
01870   //this serves to actually have the send proceed in
01871   //sending the samples to the datalinks by passing it
01872   //the min_suspended_transaction_id_ which should be the
01873   //TransportClient's expected_transaction_id_
01874   this->send(this->available_data_list_, min_suspended_transaction_id_);
01875   min_suspended_transaction_id_ = 0;
01876   this->available_data_list_.reset();
01877 }
01878 
01879 DDS::ReturnCode_t
01880 DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
01881                         const DDS::Time_t & source_timestamp)
01882 {
01883   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01884 
01885   if (enabled_ == false) {
01886     ACE_ERROR_RETURN((LM_ERROR,
01887                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01888                       ACE_TEXT(" Entity is not enabled. \n")),
01889                      DDS::RETCODE_NOT_ENABLED);
01890   }
01891 
01892   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01893 
01894   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01895 
01896   Message_Block_Ptr registered_sample_data;
01897   ret = this->data_container_->dispose(handle, registered_sample_data);
01898 
01899   if (ret != DDS::RETCODE_OK) {
01900     ACE_ERROR_RETURN((LM_ERROR,
01901                       ACE_TEXT("(%P|%t) ERROR: ")
01902                       ACE_TEXT("DataWriterImpl::dispose: ")
01903                       ACE_TEXT("dispose failed.\n")),
01904                      ret);
01905   }
01906 
01907   DataSampleElement* element = 0;
01908   ret = this->data_container_->obtain_buffer_for_control(element);
01909 
01910   if (ret != DDS::RETCODE_OK) {
01911     ACE_ERROR_RETURN((LM_ERROR,
01912                       ACE_TEXT("(%P|%t) ERROR: ")
01913                       ACE_TEXT("DataWriterImpl::dispose: ")
01914                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01915                       ret),
01916                      ret);
01917   }
01918 
01919   Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE,
01920                                                   element->get_header(),
01921                                                   move(registered_sample_data),
01922                                                   source_timestamp));
01923   element->set_sample(move(sample));
01924   ret = this->data_container_->enqueue_control(element);
01925 
01926   if (ret != DDS::RETCODE_OK) {
01927     ACE_ERROR_RETURN((LM_ERROR,
01928                       ACE_TEXT("(%P|%t) ERROR: ")
01929                       ACE_TEXT("DataWriterImpl::dispose: ")
01930                       ACE_TEXT("enqueue_control failed.\n")),
01931                      ret);
01932   }
01933 
01934   send_all_to_flush_control(guard);
01935 
01936   return DDS::RETCODE_OK;
01937 }
01938 
01939 DDS::ReturnCode_t
01940 DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
01941                             size_t&                 size)
01942 {
01943   return data_container_->num_samples(handle, size);
01944 }
01945 
01946 void
01947 DataWriterImpl::unregister_all()
01948 {
01949   data_container_->unregister_all();
01950 }
01951 
01952 RepoId
01953 DataWriterImpl::get_publication_id()
01954 {
01955   return publication_id_;
01956 }
01957 
01958 RepoId
01959 DataWriterImpl::get_dp_id()
01960 {
01961   return dp_id_;
01962 }
01963 
01964 char const *
01965 DataWriterImpl::get_type_name() const
01966 {
01967   return type_name_.in();
01968 }
01969 
01970 ACE_Message_Block*
01971 DataWriterImpl::create_control_message(MessageId message_id,
01972                                        DataSampleHeader& header_data,
01973                                        Message_Block_Ptr data,
01974                                        const DDS::Time_t& source_timestamp)
01975 {
01976   header_data.message_id_ = message_id;
01977   header_data.byte_order_ =
01978     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01979   header_data.coherent_change_ = 0;
01980 
01981   if (data) {
01982     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01983   }
01984 
01985   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01986   header_data.sequence_repair_ = false; // set below
01987   header_data.source_timestamp_sec_ = source_timestamp.sec;
01988   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01989   header_data.publication_id_ = publication_id_;
01990 
01991   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01992   if (!publisher) {
01993     return 0;
01994   }
01995 
01996   header_data.publisher_id_ = publisher->publisher_id_;
01997 
01998   if (message_id == INSTANCE_REGISTRATION
01999       || message_id == DISPOSE_INSTANCE
02000       || message_id == UNREGISTER_INSTANCE
02001       || message_id == DISPOSE_UNREGISTER_INSTANCE
02002       || message_id == REQUEST_ACK) {
02003 
02004     header_data.sequence_repair_ = need_sequence_repair();
02005 
02006     // Use the sequence number here for the sake of RTPS (where these
02007     // control messages map onto the Data Submessage).
02008     if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02009       this->sequence_number_ = SequenceNumber();
02010 
02011     } else {
02012       ++this->sequence_number_;
02013     }
02014 
02015     header_data.sequence_ = this->sequence_number_;
02016     header_data.key_fields_only_ = true;
02017   }
02018 
02019   ACE_Message_Block* message = 0;
02020   ACE_NEW_MALLOC_RETURN(message,
02021                         static_cast<ACE_Message_Block*>(
02022                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02023                         ACE_Message_Block(
02024                           DataSampleHeader::max_marshaled_size(),
02025                           ACE_Message_Block::MB_DATA,
02026                           header_data.message_length_ ? data.release() : 0, //cont
02027                           0, //data
02028                           0, //allocator_strategy
02029                           get_db_lock(), //locking_strategy
02030                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02031                           ACE_Time_Value::zero,
02032                           ACE_Time_Value::max_time,
02033                           db_allocator_.get(),
02034                           mb_allocator_.get()),
02035                         0);
02036 
02037   *message << header_data;
02038 
02039   // If we incremented sequence number for this control message
02040   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02041     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02042     // Update the expected sequence number for all readers
02043     RepoIdToReaderInfoMap::iterator reader;
02044 
02045     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02046       reader->second.expected_sequence_ = sequence_number_;
02047     }
02048   }
02049   if (DCPS_debug_level >= 4) {
02050     const GuidConverter converter(publication_id_);
02051     ACE_DEBUG((LM_DEBUG,
02052                ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02053                ACE_TEXT("from publication %C sending control sample: %C .\n"),
02054                OPENDDS_STRING(converter).c_str(),
02055                to_string(header_data).c_str()));
02056   }
02057   return message;
02058 }
02059 
02060 DDS::ReturnCode_t
02061 DataWriterImpl::create_sample_data_message(Message_Block_Ptr data,
02062                                            DDS::InstanceHandle_t instance_handle,
02063                                            DataSampleHeader& header_data,
02064                                            Message_Block_Ptr& message,
02065                                            const DDS::Time_t& source_timestamp,
02066                                            bool content_filter)
02067 {
02068   PublicationInstance_rch instance =
02069     data_container_->get_handle_instance(instance_handle);
02070 
02071   if (0 == instance) {
02072     ACE_ERROR_RETURN((LM_ERROR,
02073                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02074                       ACE_TEXT("failed to find instance for handle %d\n"),
02075                       instance_handle),
02076                      DDS::RETCODE_ERROR);
02077   }
02078 
02079   header_data.message_id_ = SAMPLE_DATA;
02080   header_data.byte_order_ =
02081     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02082   header_data.coherent_change_ = this->coherent_;
02083 
02084   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02085 
02086   if (!publisher)
02087     return DDS::RETCODE_ERROR;
02088 
02089 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02090   header_data.group_coherent_ =
02091     publisher->qos_.presentation.access_scope
02092     == DDS::GROUP_PRESENTATION_QOS;
02093 #endif
02094   header_data.content_filter_ = content_filter;
02095   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02096   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02097   header_data.sequence_repair_ = need_sequence_repair();
02098 
02099   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02100     this->sequence_number_ = SequenceNumber();
02101 
02102   } else {
02103     ++this->sequence_number_;
02104   }
02105 
02106   header_data.sequence_ = this->sequence_number_;
02107   header_data.source_timestamp_sec_ = source_timestamp.sec;
02108   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02109 
02110   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02111       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02112     header_data.lifespan_duration_ = true;
02113     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02114     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02115   }
02116 
02117   header_data.publication_id_ = publication_id_;
02118   header_data.publisher_id_ = publisher->publisher_id_;
02119   size_t max_marshaled_size = header_data.max_marshaled_size();
02120 
02121   ACE_Message_Block* tmp_message;
02122   ACE_NEW_MALLOC_RETURN(tmp_message,
02123                         static_cast<ACE_Message_Block*>(
02124                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02125                         ACE_Message_Block(max_marshaled_size,
02126                                           ACE_Message_Block::MB_DATA,
02127                                           data.release(), //cont
02128                                           0, //data
02129                                           header_allocator_.get(), //alloc_strategy
02130                                           get_db_lock(), //locking_strategy
02131                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02132                                           ACE_Time_Value::zero,
02133                                           ACE_Time_Value::max_time,
02134                                           db_allocator_.get(),
02135                                           mb_allocator_.get()),
02136                         DDS::RETCODE_ERROR);
02137   message.reset(tmp_message);
02138   *message << header_data;
02139   if (DCPS_debug_level >= 4) {
02140     const GuidConverter converter(publication_id_);
02141     ACE_DEBUG((LM_DEBUG,
02142                ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02143                ACE_TEXT("from publication %C sending data sample: %C .\n"),
02144                OPENDDS_STRING(converter).c_str(),
02145                to_string(header_data).c_str()));
02146   }
02147   return DDS::RETCODE_OK;
02148 }
02149 
02150 void
02151 DataWriterImpl::data_delivered(const DataSampleElement* sample)
02152 {
02153   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02154 
02155   if (!(sample->get_pub_id() == this->publication_id_)) {
02156     GuidConverter sample_converter(sample->get_pub_id());
02157     GuidConverter writer_converter(publication_id_);
02158     ACE_ERROR((LM_ERROR,
02159                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02160                ACE_TEXT(" The publication id %C from delivered element ")
02161                ACE_TEXT("does not match the datawriter's id %C\n"),
02162                OPENDDS_STRING(sample_converter).c_str(),
02163                OPENDDS_STRING(writer_converter).c_str()));
02164     return;
02165   }
02166   //provided for statistics tracking in tests
02167   ++data_delivered_count_;
02168 
02169   this->data_container_->data_delivered(sample);
02170 }
02171 
02172 void
02173 DataWriterImpl::control_delivered(const Message_Block_Ptr&)
02174 {
02175   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02176   controlTracker.message_delivered();
02177 }
02178 
02179 RcHandle<EntityImpl>
02180 DataWriterImpl::parent() const
02181 {
02182   return this->publisher_servant_.lock();
02183 }
02184 
02185 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
02186 bool
02187 DataWriterImpl::filter_out(const DataSampleElement& elt,
02188                            const OPENDDS_STRING& filterClassName,
02189                            const FilterEvaluator& evaluator,
02190                            const DDS::StringSeq& expression_params) const
02191 {
02192   TypeSupportImpl* const typesupport =
02193     dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02194 
02195   if (!typesupport) {
02196     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02197     return false;
02198   }
02199 
02200   if (filterClassName == "DDSSQL" ||
02201       filterClassName == "OPENDDSSQL") {
02202     return !evaluator.eval(elt.get_sample()->cont(),
02203                            elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02204                            elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02205                            expression_params);
02206   }
02207   else {
02208     return false;
02209   }
02210 }
02211 #endif
02212 
02213 bool
02214 DataWriterImpl::check_transport_qos(const TransportInst&)
02215 {
02216   // DataWriter does not impose any constraints on which transports
02217   // may be used based on QoS.
02218   return true;
02219 }
02220 
02221 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02222 
02223 bool
02224 DataWriterImpl::coherent_changes_pending()
02225 {
02226   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02227                    guard,
02228                    get_lock(),
02229                    false);
02230 
02231   return this->coherent_;
02232 }
02233 
02234 void
02235 DataWriterImpl::begin_coherent_changes()
02236 {
02237   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02238             guard,
02239             get_lock());
02240 
02241   this->coherent_ = true;
02242 }
02243 
02244 void
02245 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
02246 {
02247   // PublisherImpl::pi_lock_ should be held.
02248   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02249             guard,
02250             get_lock());
02251 
02252   CoherentChangeControl end_msg;
02253   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02254   end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02255 
02256   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02257 
02258   if (publisher) {
02259     end_msg.group_coherent_
02260       = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02261   }
02262 
02263   if (publisher && end_msg.group_coherent_) {
02264     end_msg.publisher_id_ = publisher->publisher_id_;
02265     end_msg.group_coherent_samples_ = group_samples;
02266   }
02267 
02268   size_t max_marshaled_size = end_msg.max_marshaled_size();
02269 
02270   Message_Block_Ptr data( new ACE_Message_Block(max_marshaled_size,
02271                                   ACE_Message_Block::MB_DATA,
02272                                   0, //cont
02273                                   0, //data
02274                                   0, //alloc_strategy
02275                                   get_db_lock()));
02276 
02277   Serializer serializer(
02278     data.get(),
02279     this->swap_bytes());
02280 
02281   serializer << end_msg;
02282 
02283   DDS::Time_t source_timestamp =
02284     time_value_to_time(ACE_OS::gettimeofday());
02285 
02286   DataSampleHeader header;
02287   Message_Block_Ptr control(
02288     create_control_message(END_COHERENT_CHANGES, header, move(data), source_timestamp));
02289 
02290 
02291   this->coherent_ = false;
02292   this->coherent_samples_ = 0;
02293 
02294   guard.release();
02295   if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
02296     ACE_ERROR((LM_ERROR,
02297                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02298                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02299   }
02300 }
02301 
02302 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
02303 
02304 void
02305 DataWriterImpl::data_dropped(const DataSampleElement* element,
02306                              bool dropped_by_transport)
02307 {
02308   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02309 
02310   //provided for statistics tracking in tests
02311   ++data_dropped_count_;
02312 
02313   this->data_container_->data_dropped(element, dropped_by_transport);
02314 }
02315 
02316 void
02317 DataWriterImpl::control_dropped(const Message_Block_Ptr&,
02318                                 bool /* dropped_by_transport */)
02319 {
02320   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02321   controlTracker.message_dropped();
02322 }
02323 
02324 DDS::DataWriterListener_ptr
02325 DataWriterImpl::listener_for(DDS::StatusKind kind)
02326 {
02327   // per 2.1.4.3.1 Listener Access to Plain Communication Status
02328   // use this entities factory if listener is mask not enabled
02329   // for this kind.
02330   RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
02331   if (!publisher)
02332     return 0;
02333 
02334   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02335     return publisher->listener_for(kind);
02336 
02337   } else {
02338     return DDS::DataWriterListener::_duplicate(listener_.in());
02339   }
02340 }
02341 
02342 int
02343 DataWriterImpl::handle_timeout(const ACE_Time_Value &tv,
02344                                const void * /* arg */)
02345 {
02346   bool liveliness_lost = false;
02347 
02348   ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02349 
02350   // Do we need to send a liveliness message?
02351   if (elapsed >= liveliness_check_interval_) {
02352     switch (this->qos_.liveliness.kind) {
02353     case DDS::AUTOMATIC_LIVELINESS_QOS:
02354       if (this->send_liveliness(tv) == false) {
02355         liveliness_lost = true;
02356       }
02357       break;
02358 
02359     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02360       if (liveliness_asserted_) {
02361         if (this->send_liveliness(tv) == false) {
02362           liveliness_lost = true;
02363         }
02364       }
02365       break;
02366 
02367     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02368       // Do nothing.
02369       break;
02370     }
02371   }
02372   else {
02373     // Reschedule.
02374     if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
02375       ACE_ERROR((LM_ERROR,
02376         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02377         ACE_TEXT("cancel_timer")));
02378     }
02379     if (reactor_->schedule_timer(liveness_timer_.in(), 0, liveliness_check_interval_ - elapsed,
02380       liveliness_check_interval_) == -1)
02381     {
02382       ACE_ERROR((LM_ERROR,
02383         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02384         ACE_TEXT("schedule_timer")));
02385     }
02386     return 0;
02387   }
02388 
02389   liveliness_asserted_ = false;
02390   elapsed = tv - last_liveliness_activity_time_;
02391 
02392   // Have we lost liveliness?
02393   if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02394     liveliness_lost = true;
02395   }
02396 
02397   if (!this->liveliness_lost_ && liveliness_lost) {
02398     ++ this->liveliness_lost_status_.total_count;
02399     ++ this->liveliness_lost_status_.total_count_change;
02400 
02401     DDS::DataWriterListener_var listener =
02402       listener_for(DDS::LIVELINESS_LOST_STATUS);
02403 
02404     if (!CORBA::is_nil(listener.in())) {
02405       listener->on_liveliness_lost(this, this->liveliness_lost_status_);
02406       this->liveliness_lost_status_.total_count_change = 0;
02407     }
02408   }
02409 
02410   this->liveliness_lost_ = liveliness_lost;
02411   return 0;
02412 }
02413 
02414 bool
02415 DataWriterImpl::send_liveliness(const ACE_Time_Value& now)
02416 {
02417   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02418       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02419     DDS::Time_t t = time_value_to_time(now);
02420     DataSampleHeader header;
02421     Message_Block_Ptr empty;
02422     Message_Block_Ptr liveliness_msg(
02423       this->create_control_message(DATAWRITER_LIVELINESS, header, move(empty), t));
02424 
02425     if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
02426       ACE_ERROR_RETURN((LM_ERROR,
02427                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02428                         ACE_TEXT(" send_control failed. \n")),
02429                        false);
02430 
02431     } else {
02432       last_liveliness_activity_time_ = now;
02433       return true;
02434     }
02435   } else {
02436     last_liveliness_activity_time_ = now;
02437     return true;
02438   }
02439 }
02440 
02441 void
02442 DataWriterImpl::prepare_to_delete()
02443 {
02444   this->set_deleted(true);
02445   this->stop_associating();
02446 }
02447 
02448 PublicationInstance_rch
02449 DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
02450 {
02451 
02452   if (0 != data_container_) {
02453     return data_container_->get_handle_instance(handle);
02454   }
02455 
02456   return PublicationInstance_rch();
02457 }
02458 
02459 void
02460 DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
02461 {
02462   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02463 
02464   if (!is_bit_) {
02465     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02466     // is given to this DataWriter then narrow() fails.
02467     DataWriterListener_var the_listener =
02468       DataWriterListener::_narrow(this->listener_.in());
02469 
02470     if (!CORBA::is_nil(the_listener.in())) {
02471       PublicationDisconnectedStatus status;
02472       // Since this callback may come after remove_association which
02473       // removes the reader from id_to_handle map, we can ignore this
02474       // error.
02475       this->lookup_instance_handles(subids,
02476                                     status.subscription_handles);
02477       the_listener->on_publication_disconnected(this, status);
02478     }
02479   }
02480 }
02481 
02482 void
02483 DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
02484 {
02485   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02486 
02487   if (!is_bit_) {
02488     // Narrow to DDS::DCPS::DataWriterListener. If a
02489     // DDS::DataWriterListener is given to this DataWriter then
02490     // narrow() fails.
02491     DataWriterListener_var the_listener =
02492       DataWriterListener::_narrow(this->listener_.in());
02493 
02494     if (!CORBA::is_nil(the_listener.in())) {
02495       PublicationDisconnectedStatus status;
02496 
02497       // If it's reconnected then the reader should be in id_to_handle
02498       this->lookup_instance_handles(subids, status.subscription_handles);
02499 
02500       the_listener->on_publication_reconnected(this, status);
02501     }
02502   }
02503 }
02504 
02505 void
02506 DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
02507 {
02508   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02509 
02510   if (!is_bit_) {
02511     // Narrow to DDS::DCPS::DataWriterListener. If a
02512     // DDS::DataWriterListener is given to this DataWriter then
02513     // narrow() fails.
02514     DataWriterListener_var the_listener =
02515       DataWriterListener::_narrow(this->listener_.in());
02516 
02517     if (!CORBA::is_nil(the_listener.in())) {
02518       PublicationLostStatus status;
02519 
02520       // Since this callback may come after remove_association which removes
02521       // the reader from id_to_handle map, we can ignore this error.
02522       this->lookup_instance_handles(subids,
02523                                     status.subscription_handles);
02524       the_listener->on_publication_lost(this, status);
02525     }
02526   }
02527 }
02528 
02529 void
02530 DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
02531 {
02532   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02533 
02534   if (!is_bit_) {
02535     // Narrow to DDS::DCPS::DataWriterListener. If a
02536     // DDS::DataWriterListener is given to this DataWriter then
02537     // narrow() fails.
02538     DataWriterListener_var the_listener =
02539       DataWriterListener::_narrow(this->listener_.in());
02540 
02541     if (!CORBA::is_nil(the_listener.in())) {
02542       PublicationLostStatus status;
02543 
02544       CORBA::ULong len = handles.length();
02545       status.subscription_handles.length(len);
02546 
02547       for (CORBA::ULong i = 0; i < len; ++ i) {
02548         status.subscription_handles[i] = handles[i];
02549       }
02550 
02551       the_listener->on_publication_lost(this, status);
02552     }
02553   }
02554 }
02555 
02556 
02557 void
02558 DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
02559                                         DDS::InstanceHandleSeq & hdls)
02560 {
02561   CORBA::ULong const num_rds = ids.length();
02562   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563 
02564   if (!participant)
02565     return;
02566 
02567   if (DCPS_debug_level > 9) {
02568     OPENDDS_STRING separator;
02569     OPENDDS_STRING buffer;
02570 
02571     for (CORBA::ULong i = 0; i < num_rds; ++i) {
02572       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02573       separator = ", ";
02574     }
02575 
02576     ACE_DEBUG((LM_DEBUG,
02577                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02578                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02579                buffer.c_str()));
02580   }
02581 
02582   hdls.length(num_rds);
02583 
02584   for (CORBA::ULong i = 0; i < num_rds; ++i) {
02585     hdls[i] = participant->id_to_handle(ids[i]);
02586   }
02587 }
02588 
02589 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
02590 bool
02591 DataWriterImpl::persist_data()
02592 {
02593   return this->data_container_->persist_data();
02594 }
02595 #endif
02596 
02597 void
02598 DataWriterImpl::reschedule_deadline()
02599 {
02600   if (this->watchdog_.in()) {
02601     this->data_container_->reschedule_deadline();
02602   }
02603 }
02604 
02605 void
02606 DataWriterImpl::wait_control_pending()
02607 {
02608   if (!TransportRegistry::instance()->released()) {
02609     OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02610     controlTracker.wait_messages_pending(caller_string);
02611   }
02612 }
02613 
02614 void
02615 DataWriterImpl::wait_pending()
02616 {
02617   if (!TransportRegistry::instance()->released()) {
02618     data_container_->wait_pending();
02619   }
02620 }
02621 
02622 void
02623 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
02624 {
02625   this->data_container_->get_instance_handles(instance_handles);
02626 }
02627 
02628 void
02629 DataWriterImpl::get_readers(RepoIdSet& readers)
02630 {
02631   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02632   readers = this->readers_;
02633 }
02634 
02635 void
02636 DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
02637 {
02638   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02639   if (publisher) {
02640     publisher->get_qos(qos_data.pub_qos);
02641   }
02642   qos_data.dw_qos = this->qos_;
02643   qos_data.topic_name = this->topic_name_.in();
02644 }
02645 
02646 #if defined(OPENDDS_SECURITY)
02647 DDS::Security::ParticipantCryptoHandle DataWriterImpl::get_crypto_handle() const
02648 {
02649   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
02650   return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
02651 }
02652 #endif
02653 
02654 bool
02655 DataWriterImpl::need_sequence_repair()
02656 {
02657   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02658   return need_sequence_repair_i();
02659 }
02660 
02661 bool
02662 DataWriterImpl::need_sequence_repair_i() const
02663 {
02664   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02665        end = reader_info_.end(); it != end; ++it) {
02666     if (it->second.expected_sequence_ != sequence_number_) {
02667       return true;
02668     }
02669   }
02670 
02671   return false;
02672 }
02673 
02674 SendControlStatus
02675 DataWriterImpl::send_control(const DataSampleHeader& header,
02676                              Message_Block_Ptr msg)
02677 {
02678   controlTracker.message_sent();
02679 
02680   SendControlStatus status = TransportClient::send_control(header, move(msg));
02681 
02682   if (status != SEND_CONTROL_OK) {
02683     controlTracker.message_dropped();
02684   }
02685 
02686   return status;
02687 }
02688 
02689 int
02690 LivenessTimer::handle_timeout(const ACE_Time_Value &tv,
02691                              const void *arg)
02692 {
02693   DataWriterImpl_rch writer = this->writer_.lock();
02694   if (writer) {
02695     writer->handle_timeout(tv, arg);
02696   }
02697   else {
02698     this->reactor()->cancel_timer(this);
02699   }
02700   return 0;
02701 }
02702 
02703 
02704 } // namespace DCPS
02705 } // namespace OpenDDS
02706 
02707 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1