LCOV - code coverage report
Current view: top level - DCPS - DataWriterImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 1447 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 109 0.0 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       7             : 
       8             : #include "DataWriterImpl.h"
       9             : 
      10             : #include "FeatureDisabledQosCheck.h"
      11             : #include "DomainParticipantImpl.h"
      12             : #include "PublisherImpl.h"
      13             : #include "Service_Participant.h"
      14             : #include "GuidConverter.h"
      15             : #include "TopicImpl.h"
      16             : #include "PublicationInstance.h"
      17             : #include "Serializer.h"
      18             : #include "Transient_Kludge.h"
      19             : #include "DataDurabilityCache.h"
      20             : #include "MonitorFactory.h"
      21             : #include "SendStateDataSampleList.h"
      22             : #include "DataSampleElement.h"
      23             : #include "Util.h"
      24             : #include "DCPS_Utils.h"
      25             : #include "XTypes/TypeObject.h"
      26             : #include "TypeSupportImpl.h"
      27             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      28             : #  include "CoherentChangeControl.h"
      29             : #endif
      30             : #include "AssociationData.h"
      31             : #include "transport/framework/EntryExit.h"
      32             : #include "transport/framework/TransportExceptions.h"
      33             : #include "transport/framework/TransportRegistry.h"
      34             : #ifndef DDS_HAS_MINIMUM_BIT
      35             : #  include "BuiltInTopicUtils.h"
      36             : #endif
      37             : 
      38             : #ifndef DDS_HAS_MINIMUM_BIT
      39             : #  include <dds/DdsDcpsCoreTypeSupportC.h>
      40             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
      41             : #include <dds/DdsDcpsCoreC.h>
      42             : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
      43             : 
      44             : #include <ace/Reactor.h>
      45             : #include <ace/Auto_Ptr.h>
      46             : 
      47             : #include <stdexcept>
      48             : 
      49             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      50             : 
      51             : namespace OpenDDS {
      52             : namespace DCPS {
      53             : 
      54             : //TBD - add check for enabled in most methods.
      55             : //      currently this is not needed because auto_enable_created_entities
      56             : //      cannot be false.
      57             : 
      58           0 : DataWriterImpl::DataWriterImpl()
      59           0 :   : data_dropped_count_(0)
      60           0 :   , data_delivered_count_(0)
      61           0 :   , controlTracker("DataWriterImpl")
      62           0 :   , n_chunks_(TheServiceParticipant->n_chunks())
      63           0 :   , association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier())
      64           0 :   , qos_(TheServiceParticipant->initial_DataWriterQos())
      65           0 :   , skip_serialize_(false)
      66           0 :   , db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks()))
      67           0 :   , topic_id_(GUID_UNKNOWN)
      68           0 :   , topic_servant_(0)
      69           0 :   , type_support_(0)
      70           0 :   , listener_mask_(DEFAULT_STATUS_MASK)
      71           0 :   , domain_id_(0)
      72           0 :   , publication_id_(GUID_UNKNOWN)
      73           0 :   , sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
      74           0 :   , coherent_(false)
      75           0 :   , coherent_samples_(0)
      76           0 :   , liveliness_lost_(false)
      77           0 :   , reactor_(0)
      78           0 :   , liveliness_check_interval_(TimeDuration::max_value)
      79           0 :   , last_deadline_missed_total_count_(0)
      80           0 :   , is_bit_(false)
      81           0 :   , min_suspended_transaction_id_(0)
      82           0 :   , max_suspended_transaction_id_(0)
      83           0 :   , liveliness_asserted_(false)
      84           0 :   , liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
      85             : {
      86           0 :   liveliness_lost_status_.total_count = 0;
      87           0 :   liveliness_lost_status_.total_count_change = 0;
      88             : 
      89           0 :   offered_deadline_missed_status_.total_count = 0;
      90           0 :   offered_deadline_missed_status_.total_count_change = 0;
      91           0 :   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
      92             : 
      93           0 :   offered_incompatible_qos_status_.total_count = 0;
      94           0 :   offered_incompatible_qos_status_.total_count_change = 0;
      95           0 :   offered_incompatible_qos_status_.last_policy_id = 0;
      96           0 :   offered_incompatible_qos_status_.policies.length(0);
      97             : 
      98           0 :   publication_match_status_.total_count = 0;
      99           0 :   publication_match_status_.total_count_change = 0;
     100           0 :   publication_match_status_.current_count = 0;
     101           0 :   publication_match_status_.current_count_change = 0;
     102           0 :   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
     103             : 
     104           0 :   monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this));
     105           0 :   periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this));
     106           0 : }
     107             : 
     108             : // This method is called when there are no longer any reference to the
     109             : // the servant.
     110           0 : DataWriterImpl::~DataWriterImpl()
     111             : {
     112             :   DBG_ENTRY_LVL("DataWriterImpl", "~DataWriterImpl", 6);
     113             : #ifndef OPENDDS_SAFETY_PROFILE
     114           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     115           0 :   if (participant) {
     116           0 :     XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
     117           0 :     if (type_lookup_service) {
     118           0 :       type_lookup_service->remove_guid_from_dynamic_map(publication_id_);
     119             :     }
     120           0 :   }
     121             : #endif
     122           0 : }
     123             : 
     124             : // this method is called when delete_datawriter is called.
     125             : void
     126           0 : DataWriterImpl::cleanup()
     127             : {
     128             :   // As first step set our listener to nill which will prevent us from calling
     129             :   // back onto the listener at the moment the related DDS entity has been
     130             :   // deleted
     131           0 :   set_listener(0, NO_STATUS_MASK);
     132           0 :   topic_servant_ = 0;
     133           0 :   type_support_ = 0;
     134           0 : }
     135             : 
     136             : void
     137           0 : DataWriterImpl::init(
     138             :   TopicImpl* topic_servant,
     139             :   const DDS::DataWriterQos& qos,
     140             :   DDS::DataWriterListener_ptr a_listener,
     141             :   const DDS::StatusMask& mask,
     142             :   WeakRcHandle<DomainParticipantImpl> participant_servant,
     143             :   PublisherImpl* publisher_servant)
     144             : {
     145             :   DBG_ENTRY_LVL("DataWriterImpl", "init", 6);
     146           0 :   topic_servant_ = topic_servant;
     147           0 :   type_support_ = dynamic_cast<TypeSupportImpl*>(topic_servant->get_type_support());
     148           0 :   topic_name_ = topic_servant_->get_name();
     149           0 :   topic_id_ = topic_servant_->get_id();
     150           0 :   type_name_ = topic_servant_->get_type_name();
     151             : 
     152             : #if !defined (DDS_HAS_MINIMUM_BIT)
     153           0 :   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
     154             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
     155             : 
     156           0 :   qos_ = qos;
     157           0 :   passed_qos_ = qos;
     158             : 
     159           0 :   set_listener(a_listener, mask);
     160             : 
     161             :   // Only store the participant pointer, since it is our "grand"
     162             :   // parent, we will exist as long as it does.
     163           0 :   participant_servant_ = participant_servant;
     164             : 
     165           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
     166           0 :   domain_id_ = participant->get_domain_id();
     167             : 
     168             :   // Only store the publisher pointer, since it is our parent, we will
     169             :   // exist as long as it does.
     170           0 :   publisher_servant_ = *publisher_servant;
     171             : 
     172           0 :   this->reactor_ = TheServiceParticipant->timer();
     173           0 : }
     174             : 
     175             : DDS::InstanceHandle_t
     176           0 : DataWriterImpl::get_instance_handle()
     177             : {
     178           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     179           0 :   return get_entity_instance_handle(publication_id_, participant);
     180           0 : }
     181             : 
     182             : DDS::InstanceHandle_t
     183           0 : DataWriterImpl::get_next_handle()
     184             : {
     185           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
     186           0 :   if (participant) {
     187           0 :     return participant->assign_handle();
     188             :   }
     189           0 :   return DDS::HANDLE_NIL;
     190           0 : }
     191             : 
     192           0 : void DataWriterImpl::return_handle(DDS::InstanceHandle_t handle)
     193             : {
     194           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     195           0 :   if (participant) {
     196           0 :     participant->return_handle(handle);
     197             :   }
     198           0 : }
     199             : 
     200             : RcHandle<BitSubscriber>
     201           0 : DataWriterImpl::get_builtin_subscriber_proxy() const
     202             : {
     203           0 :   RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
     204           0 :   if (participant_servant) {
     205           0 :     return participant_servant->get_builtin_subscriber_proxy();
     206             :   }
     207             : 
     208           0 :   return RcHandle<BitSubscriber>();
     209           0 : }
     210             : 
     211             : void
     212           0 : DataWriterImpl::add_association(const GUID_t& yourId,
     213             :                                 const ReaderAssociation& reader,
     214             :                                 bool active)
     215             : {
     216             :   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
     217             : 
     218           0 :   if (DCPS_debug_level) {
     219           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
     220             :                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
     221             :                LogGuid(yourId).c_str(),
     222             :                LogGuid(reader.readerId).c_str()));
     223             :   }
     224             : 
     225           0 :   if (get_deleted()) {
     226           0 :     if (DCPS_debug_level)
     227           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
     228             :                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
     229             : 
     230           0 :     return;
     231             :   }
     232             : 
     233           0 :   check_and_set_repo_id(yourId);
     234             : 
     235             :   {
     236           0 :     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     237           0 :     reader_info_.insert(std::make_pair(reader.readerId,
     238           0 :                                        ReaderInfo(reader.filterClassName,
     239           0 :                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
     240           0 :                                                   reader.exprParams, participant_servant_,
     241           0 :                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
     242           0 :   }
     243             : 
     244           0 :   if (DCPS_debug_level > 4) {
     245           0 :     ACE_DEBUG((LM_DEBUG,
     246             :                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
     247             :                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
     248             :                LogGuid(get_guid()).c_str(),
     249             :                qos_.transport_priority.value));
     250             :   }
     251             : 
     252           0 :   AssociationData data;
     253           0 :   data.remote_id_ = reader.readerId;
     254           0 :   data.remote_data_ = reader.readerTransInfo;
     255           0 :   data.discovery_locator_ = reader.readerDiscInfo;
     256           0 :   data.participant_discovered_at_ = reader.participantDiscoveredAt;
     257           0 :   data.remote_transport_context_ = reader.transportContext;
     258           0 :   data.remote_reliable_ =
     259           0 :     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
     260           0 :   data.remote_durable_ =
     261           0 :     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     262             : 
     263           0 :   if (associate(data, active)) {
     264           0 :     const Observer_rch observer = get_observer(Observer::e_ASSOCIATED);
     265           0 :     if (observer) {
     266           0 :       observer->on_associated(this, data.remote_id_);
     267             :     }
     268           0 :   } else {
     269             :     //FUTURE: inform inforepo and try again as passive peer
     270           0 :     if (DCPS_debug_level) {
     271           0 :       ACE_ERROR((LM_ERROR,
     272             :                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
     273             :                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
     274             :     }
     275             :   }
     276           0 : }
     277             : 
     278             : void
     279           0 : DataWriterImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
     280             : {
     281             :   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
     282             : 
     283           0 :   if (!(flags & ASSOC_OK)) {
     284           0 :     if (DCPS_debug_level) {
     285           0 :       ACE_ERROR((LM_ERROR,
     286             :                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
     287             :                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
     288             :                  LogGuid(remote_id).c_str()));
     289             :     }
     290             : 
     291           0 :     return;
     292             :   }
     293             : 
     294           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     295             : 
     296           0 :   if (DCPS_debug_level) {
     297           0 :     ACE_DEBUG((LM_INFO,
     298             :                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
     299             :                ACE_TEXT("writer %C succeeded in associating with reader %C\n"),
     300             :                LogGuid(publication_id_).c_str(),
     301             :                LogGuid(remote_id).c_str()));
     302             :   }
     303             : 
     304           0 :   if (flags & ASSOC_ACTIVE) {
     305             : 
     306             :     // Have we already received an association_complete() callback?
     307           0 :     if (DCPS_debug_level) {
     308           0 :       ACE_DEBUG((LM_DEBUG,
     309             :                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
     310             :                  ACE_TEXT("writer %C reader %C calling association_complete_i\n"),
     311             :                  LogGuid(publication_id_).c_str(),
     312             :                  LogGuid(remote_id).c_str()));
     313             :     }
     314           0 :     association_complete_i(remote_id);
     315             : 
     316             :   } else {
     317             :     // In the current implementation, DataWriter is always active, so this
     318             :     // code will not be applicable.
     319           0 :     if (DCPS_debug_level) {
     320           0 :       ACE_ERROR((LM_ERROR,
     321             :                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
     322             :                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
     323             :                  LogGuid(publication_id_).c_str()));
     324             :     }
     325             :   }
     326           0 : }
     327             : 
     328           0 : DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
     329             :                                        const char* filter,
     330             :                                        const DDS::StringSeq& params,
     331             :                                        WeakRcHandle<DomainParticipantImpl> participant,
     332           0 :                                        bool durable)
     333             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     334           0 :   : participant_(participant)
     335           0 :   , filter_class_name_(filterClassName)
     336           0 :   , filter_(filter)
     337           0 :   , expression_params_(params)
     338           0 :   , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
     339           0 :   , durable_(durable)
     340             : {
     341           0 :   RcHandle<DomainParticipantImpl> part = participant_.lock();
     342           0 :   if (part && *filter) {
     343           0 :     eval_ = part->get_filter_eval(filter);
     344             :   }
     345           0 : }
     346             : #else
     347             :   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
     348             :   , durable_(durable)
     349             : {
     350             :   ACE_UNUSED_ARG(filterClassName);
     351             :   ACE_UNUSED_ARG(filter);
     352             :   ACE_UNUSED_ARG(params);
     353             :   ACE_UNUSED_ARG(participant);
     354             : }
     355             : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
     356             : 
     357           0 : DataWriterImpl::ReaderInfo::~ReaderInfo()
     358             : {
     359             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     360           0 :   eval_ = RcHandle<FilterEvaluator>();
     361           0 :   RcHandle<DomainParticipantImpl> participant = participant_.lock();
     362           0 :   if (participant && !filter_.empty()) {
     363           0 :     participant->deref_filter_eval(filter_.c_str());
     364             :   }
     365             : 
     366             : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
     367           0 : }
     368             : 
     369             : void
     370           0 : DataWriterImpl::association_complete_i(const GUID_t& remote_id)
     371             : {
     372             :   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
     373             : 
     374           0 :   bool reader_durable = false;
     375             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     376           0 :   OPENDDS_STRING filterClassName;
     377           0 :   RcHandle<FilterEvaluator> eval;
     378           0 :   DDS::StringSeq expression_params;
     379             : #endif
     380             :   {
     381           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     382             : 
     383           0 :     if (DCPS_debug_level >= 1) {
     384           0 :       ACE_DEBUG((LM_DEBUG,
     385             :                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
     386             :                  ACE_TEXT("bit %d local %C remote %C\n"),
     387             :                  is_bit_,
     388             :                  LogGuid(this->publication_id_).c_str(),
     389             :                  LogGuid(remote_id).c_str()));
     390             :     }
     391             : 
     392           0 :     if (insert(readers_, remote_id) == -1) {
     393           0 :       ACE_ERROR((LM_ERROR,
     394             :                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
     395             :                  ACE_TEXT("insert %C from pending failed.\n"),
     396             :                  LogGuid(remote_id).c_str()));
     397             :     }
     398           0 :   }
     399             :   {
     400           0 :     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     401           0 :     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
     402             : 
     403           0 :     if (it != reader_info_.end()) {
     404           0 :       reader_durable = it->second.durable_;
     405             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     406           0 :       filterClassName = it->second.filter_class_name_;
     407           0 :       eval = it->second.eval_;
     408           0 :       expression_params = it->second.expression_params_;
     409             : #endif
     410             :     }
     411           0 :   }
     412             : 
     413           0 :   if (this->monitor_) {
     414           0 :     this->monitor_->report();
     415             :   }
     416             : 
     417           0 :   if (!is_bit_) {
     418             : 
     419           0 :     RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
     420             : 
     421           0 :     if (!participant)
     422           0 :       return;
     423             : 
     424           0 :     data_container_->add_reader_acks(remote_id, get_max_sn());
     425             : 
     426           0 :     const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
     427             : 
     428             :     {
     429             :       // protect publication_match_status_ and status changed flags.
     430           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     431             : 
     432           0 :       if (DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
     433           0 :         ACE_DEBUG((LM_WARNING,
     434             :                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
     435             :                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
     436             :                    LogGuid(remote_id).c_str(),
     437             :                    handle));
     438           0 :         return;
     439             : 
     440           0 :       } else if (DCPS_debug_level > 4) {
     441           0 :         ACE_DEBUG((LM_DEBUG,
     442             :                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
     443             :                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
     444             :                    LogGuid(remote_id).c_str(),
     445             :                    handle));
     446             :       }
     447             : 
     448           0 :       ++publication_match_status_.total_count;
     449           0 :       ++publication_match_status_.total_count_change;
     450           0 :       ++publication_match_status_.current_count;
     451           0 :       ++publication_match_status_.current_count_change;
     452           0 :       publication_match_status_.last_subscription_handle = handle;
     453           0 :       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
     454           0 :     }
     455             : 
     456             :     DDS::DataWriterListener_var listener =
     457           0 :       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
     458             : 
     459           0 :     if (!CORBA::is_nil(listener.in())) {
     460             : 
     461           0 :       listener->on_publication_matched(this, publication_match_status_);
     462             : 
     463             :       // TBD - why does the spec say to change this but not
     464             :       // change the ChangeFlagStatus after a listener call?
     465           0 :       publication_match_status_.total_count_change = 0;
     466           0 :       publication_match_status_.current_count_change = 0;
     467             :     }
     468             : 
     469           0 :     notify_status_condition();
     470           0 :   } else {
     471           0 :     data_container_->add_reader_acks(remote_id, get_max_sn());
     472             :   }
     473             : 
     474             :   // Support DURABILITY QoS
     475           0 :   if (reader_durable) {
     476             :     // Tell the WriteDataContainer to resend all sending/sent
     477             :     // samples.
     478           0 :     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
     479             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     480           0 :                                          , filterClassName, eval.in(), expression_params
     481             : #endif
     482             :                                         );
     483             : 
     484             :     // Acquire the data writer container lock to avoid deadlock. The
     485             :     // thread calling association_complete() has to acquire lock in the
     486             :     // same order as the write()/register() operation.
     487             : 
     488             :     // Since the thread calling association_complete() is the ORB
     489             :     // thread, it may have some performance penalty. If the
     490             :     // performance is an issue, we may need a new thread to handle the
     491             :     // data_available() calls.
     492           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex,
     493             :               guard,
     494             :               this->get_lock());
     495             : 
     496           0 :     SendStateDataSampleList list = this->get_resend_data();
     497             :     {
     498           0 :       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     499             :       // Update the reader's expected sequence
     500             :       SequenceNumber& seq =
     501           0 :         reader_info_.find(remote_id)->second.expected_sequence_;
     502             : 
     503           0 :       for (SendStateDataSampleList::iterator list_el = list.begin();
     504           0 :            list_el != list.end(); ++list_el) {
     505           0 :         list_el->get_header().historic_sample_ = true;
     506             : 
     507           0 :         if (list_el->get_header().sequence_ > seq) {
     508           0 :           seq = list_el->get_header().sequence_;
     509             :         }
     510             :       }
     511           0 :     }
     512             : 
     513           0 :     RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
     514           0 :     if (!publisher || publisher->is_suspended()) {
     515           0 :       this->available_data_list_.enqueue_tail(list);
     516             : 
     517             :     } else {
     518           0 :       if (DCPS_debug_level >= 4) {
     519           0 :         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
     520             :       }
     521             : 
     522           0 :       const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
     523           0 :       size_t size = 0;
     524           0 :       serialized_size(encoding, size, remote_id);
     525             :       Message_Block_Ptr data(
     526             :         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
     527           0 :                               get_db_lock()));
     528           0 :       Serializer ser(data.get(), encoding);
     529           0 :       ser << remote_id;
     530             : 
     531           0 :       DataSampleHeader header;
     532             :       Message_Block_Ptr end_historic_samples(
     533             :         create_control_message(
     534           0 :           END_HISTORIC_SAMPLES, header, move(data),
     535           0 :           SystemTimePoint::now().to_dds_time()));
     536             : 
     537           0 :       this->controlTracker.message_sent();
     538           0 :       guard.release();
     539           0 :       ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> rev_lock(lock_);
     540           0 :       ACE_Guard<ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> > rev_guard(rev_lock);
     541           0 :       SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
     542           0 :       if (ret == SEND_CONTROL_ERROR) {
     543           0 :         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
     544             :                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
     545             :                              ACE_TEXT("send_w_control failed.\n")));
     546           0 :         this->controlTracker.message_dropped();
     547             :       }
     548           0 :     }
     549           0 :   }
     550           0 : }
     551             : 
     552             : void
     553           0 : DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
     554             :                                     CORBA::Boolean notify_lost)
     555             : {
     556           0 :   if (readers.length() == 0) {
     557           0 :     return;
     558             :   }
     559             : 
     560           0 :   const Observer_rch observer = get_observer(Observer::e_DISASSOCIATED);
     561           0 :   if (observer) {
     562           0 :     for (CORBA::ULong i = 0; i < readers.length(); ++i) {
     563           0 :       observer->on_disassociated(this, readers[i]);
     564             :     }
     565             :   }
     566             : 
     567           0 :   if (DCPS_debug_level >= 1) {
     568           0 :     ACE_DEBUG((LM_DEBUG,
     569             :                ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
     570             :                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     571             :                is_bit_,
     572             :                LogGuid(publication_id_).c_str(),
     573             :                LogGuid(readers[0]).c_str(),
     574             :                readers.length()));
     575             :   }
     576             : 
     577             :   // stop pending associations for these reader ids
     578           0 :   this->stop_associating(readers.get_buffer(), readers.length());
     579             : 
     580           0 :   ReaderIdSeq fully_associated_readers;
     581           0 :   CORBA::ULong fully_associated_len = 0;
     582           0 :   ReaderIdSeq rds;
     583           0 :   CORBA::ULong rds_len = 0;
     584           0 :   DDS::InstanceHandleSeq handles;
     585             : 
     586           0 :   ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
     587             :   {
     588             :     // Ensure the same acquisition order as in wait_for_acknowledgments().
     589           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     590             :     //Remove the readers from fully associated reader list.
     591             :     //If the supplied reader is not in the cached reader list then it is
     592             :     //already removed. We just need remove the readers in the list that have
     593             :     //not been removed.
     594             : 
     595           0 :     CORBA::ULong len = readers.length();
     596             : 
     597           0 :     for (CORBA::ULong i = 0; i < len; ++i) {
     598             :       //Remove the readers from fully associated reader list. If it's not
     599             :       //in there, the association_complete() is not called yet and remove it
     600             :       //from pending list.
     601             : 
     602           0 :       if (remove(readers_, readers[i]) == 0) {
     603           0 :         ++ fully_associated_len;
     604           0 :         fully_associated_readers.length(fully_associated_len);
     605           0 :         fully_associated_readers [fully_associated_len - 1] = readers[i];
     606             : 
     607           0 :         ++ rds_len;
     608           0 :         rds.length(rds_len);
     609           0 :         rds [rds_len - 1] = readers[i];
     610             :       }
     611             : 
     612           0 :       data_container_->remove_reader_acks(readers[i]);
     613             : 
     614           0 :       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     615           0 :       reader_info_.erase(readers[i]);
     616             :       //else reader is already removed which indicates remove_association()
     617             :       //is called multiple times.
     618           0 :     }
     619             : 
     620           0 :     if (fully_associated_len > 0 && !is_bit_) {
     621             :       // The reader should be in the id_to_handle map at this time
     622           0 :       this->lookup_instance_handles(fully_associated_readers, handles);
     623             : 
     624           0 :       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
     625           0 :         id_to_handle_map_.erase(fully_associated_readers[i]);
     626             :       }
     627             :     }
     628             : 
     629             :     // Mirror the PUBLICATION_MATCHED_STATUS processing from
     630             :     // association_complete() here.
     631           0 :     if (!this->is_bit_) {
     632             : 
     633             :       // Derive the change in the number of subscriptions reading this writer.
     634             :       int matchedSubscriptions =
     635           0 :         static_cast<int>(this->id_to_handle_map_.size());
     636           0 :       this->publication_match_status_.current_count_change =
     637           0 :         matchedSubscriptions - this->publication_match_status_.current_count;
     638             : 
     639             :       // Only process status if the number of subscriptions has changed.
     640           0 :       if (this->publication_match_status_.current_count_change != 0) {
     641           0 :         this->publication_match_status_.current_count = matchedSubscriptions;
     642             : 
     643             :         /// Section 7.1.4.1: total_count will not decrement.
     644             : 
     645             :         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
     646           0 :         this->publication_match_status_.last_subscription_handle =
     647           0 :           handles[fully_associated_len - 1];
     648             : 
     649           0 :         set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
     650             : 
     651             :         DDS::DataWriterListener_var listener =
     652           0 :           this->listener_for(DDS::PUBLICATION_MATCHED_STATUS);
     653             : 
     654           0 :         if (!CORBA::is_nil(listener.in())) {
     655           0 :           listener->on_publication_matched(this, this->publication_match_status_);
     656             : 
     657             :           // Listener consumes the change.
     658           0 :           this->publication_match_status_.total_count_change = 0;
     659           0 :           this->publication_match_status_.current_count_change = 0;
     660             :         }
     661             : 
     662           0 :         this->notify_status_condition();
     663           0 :       }
     664             :     }
     665           0 :   }
     666             : 
     667           0 :   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
     668           0 :     this->disassociate(rds[i]);
     669             :   }
     670             : 
     671             :   // If this remove_association is invoked when the InfoRepo
     672             :   // detects a lost reader then make a callback to notify
     673             :   // subscription lost.
     674           0 :   if (notify_lost && handles.length() > 0) {
     675           0 :     this->notify_publication_lost(handles);
     676             :   }
     677             : 
     678           0 :   const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
     679           0 :   for (unsigned int i = 0; i < handles.length(); ++i) {
     680           0 :     participant->return_handle(handles[i]);
     681             :   }
     682           0 : }
     683             : 
     684           0 : void DataWriterImpl::replay_durable_data_for(const GUID_t& remote_id)
     685             : {
     686             :   DBG_ENTRY_LVL("DataWriterImpl", "replay_durable_data_for", 6);
     687             : 
     688           0 :   bool reader_durable = false;
     689             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     690           0 :   OPENDDS_STRING filterClassName;
     691           0 :   RcHandle<FilterEvaluator> eval;
     692           0 :   DDS::StringSeq expression_params;
     693             : #endif
     694             : 
     695             :   {
     696           0 :     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     697           0 :     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
     698             : 
     699           0 :     if (it != reader_info_.end()) {
     700           0 :       reader_durable = it->second.durable_;
     701             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     702           0 :       filterClassName = it->second.filter_class_name_;
     703           0 :       eval = it->second.eval_;
     704           0 :       expression_params = it->second.expression_params_;
     705             : #endif
     706             :     }
     707           0 :   }
     708             : 
     709             :   // Support DURABILITY QoS
     710           0 :   if (reader_durable) {
     711             :     // Tell the WriteDataContainer to resend all sending/sent
     712             :     // samples.
     713           0 :     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
     714             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     715           0 :                                          , filterClassName, eval.in(), expression_params
     716             : #endif
     717             :                                          );
     718             : 
     719             :     // Acquire the data writer container lock to avoid deadlock. The
     720             :     // thread calling association_complete() has to acquire lock in the
     721             :     // same order as the write()/register() operation.
     722             : 
     723             :     // Since the thread calling association_complete() is the ORB
     724             :     // thread, it may have some performance penalty. If the
     725             :     // performance is an issue, we may need a new thread to handle the
     726             :     // data_available() calls.
     727           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex,
     728             :               guard,
     729             :               this->get_lock());
     730             : 
     731           0 :     SendStateDataSampleList list = this->get_resend_data();
     732             :     {
     733           0 :       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     734             :       // Update the reader's expected sequence
     735             :       SequenceNumber& seq =
     736           0 :         reader_info_.find(remote_id)->second.expected_sequence_;
     737             : 
     738           0 :       for (SendStateDataSampleList::iterator list_el = list.begin();
     739           0 :            list_el != list.end(); ++list_el) {
     740           0 :         list_el->get_header().historic_sample_ = true;
     741             : 
     742           0 :         if (list_el->get_header().sequence_ > seq) {
     743           0 :           seq = list_el->get_header().sequence_;
     744             :         }
     745             :       }
     746           0 :     }
     747             : 
     748           0 :     RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
     749           0 :     if (!publisher || publisher->is_suspended()) {
     750           0 :       this->available_data_list_.enqueue_tail(list);
     751             : 
     752             :     } else {
     753           0 :       if (DCPS_debug_level >= 4) {
     754           0 :         ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) DataWriterImpl::replay_durable_data_for: Sending historic samples\n")));
     755             :       }
     756             : 
     757           0 :       const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
     758           0 :       size_t size = 0;
     759           0 :       serialized_size(encoding, size, remote_id);
     760             :       Message_Block_Ptr data(
     761             :         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
     762           0 :                               get_db_lock()));
     763           0 :       Serializer ser(data.get(), encoding);
     764           0 :       ser << remote_id;
     765             : 
     766           0 :       DataSampleHeader header;
     767           0 :       Message_Block_Ptr end_historic_samples(create_control_message(END_HISTORIC_SAMPLES, header, move(data),
     768           0 :                                                                     SystemTimePoint::now().to_dds_time()));
     769             : 
     770           0 :       this->controlTracker.message_sent();
     771           0 :       guard.release();
     772           0 :       const SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
     773           0 :       if (ret == SEND_CONTROL_ERROR) {
     774           0 :         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     775             :                    ACE_TEXT("DataWriterImpl::replay_durable_data_for: ")
     776             :                    ACE_TEXT("send_w_control failed.\n")));
     777           0 :         this->controlTracker.message_dropped();
     778             :       }
     779           0 :     }
     780           0 :   }
     781           0 : }
     782             : 
     783           0 : void DataWriterImpl::remove_all_associations()
     784             : {
     785             :   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
     786             :   // stop pending associations
     787           0 :   this->stop_associating();
     788             : 
     789           0 :   ReaderIdSeq readers;
     790             :   CORBA::ULong size;
     791             :   {
     792           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
     793             : 
     794           0 :     size = static_cast<CORBA::ULong>(readers_.size());
     795           0 :     readers.length(size);
     796             : 
     797           0 :     RepoIdSet::iterator itEnd = readers_.end();
     798           0 :     int i = 0;
     799             : 
     800           0 :     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
     801           0 :       readers[i ++] = *it;
     802             :     }
     803           0 :   }
     804             : 
     805             :   try {
     806           0 :     if (0 < size) {
     807           0 :       CORBA::Boolean dont_notify_lost = false;
     808             : 
     809           0 :       this->remove_associations(readers, dont_notify_lost);
     810             :     }
     811             : 
     812           0 :   } catch (const CORBA::Exception&) {
     813           0 :       ACE_DEBUG((LM_WARNING,
     814             :                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
     815             :                  ACE_TEXT("caught exception from remove_associations.\n")));
     816           0 :   }
     817             : 
     818           0 :   transport_stop();
     819           0 : }
     820             : 
     821             : void
     822           0 : DataWriterImpl::register_for_reader(const GUID_t& participant,
     823             :                                     const GUID_t& writerid,
     824             :                                     const GUID_t& readerid,
     825             :                                     const TransportLocatorSeq& locators,
     826             :                                     DiscoveryListener* listener)
     827             : {
     828           0 :   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
     829           0 : }
     830             : 
     831             : void
     832           0 : DataWriterImpl::unregister_for_reader(const GUID_t& participant,
     833             :                                       const GUID_t& writerid,
     834             :                                       const GUID_t& readerid)
     835             : {
     836           0 :   TransportClient::unregister_for_reader(participant, writerid, readerid);
     837           0 : }
     838             : 
     839             : void
     840           0 : DataWriterImpl::update_locators(const GUID_t& readerId,
     841             :                                 const TransportLocatorSeq& locators)
     842             : {
     843             :   {
     844           0 :     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_);
     845           0 :     RepoIdToReaderInfoMap::const_iterator iter = reader_info_.find(readerId);
     846           0 :     if (iter == reader_info_.end()) {
     847           0 :       return;
     848             :     }
     849           0 :   }
     850           0 :   TransportClient::update_locators(readerId, locators);
     851             : }
     852             : 
     853             : void
     854           0 : DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
     855             : {
     856             :   DDS::DataWriterListener_var listener =
     857           0 :     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
     858             : 
     859           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     860             : 
     861             : #if 0
     862             : 
     863             :   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
     864             :     // This test should make the method idempotent.
     865             :     return;
     866             :   }
     867             : 
     868             : #endif
     869             : 
     870           0 :   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
     871             : 
     872             :   // copy status and increment change
     873           0 :   offered_incompatible_qos_status_.total_count = status.total_count;
     874           0 :   offered_incompatible_qos_status_.total_count_change +=
     875           0 :     status.count_since_last_send;
     876           0 :   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
     877           0 :   offered_incompatible_qos_status_.policies = status.policies;
     878             : 
     879           0 :   if (!CORBA::is_nil(listener.in())) {
     880           0 :     listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
     881             : 
     882             :     // TBD - Why does the spec say to change this but not change the
     883             :     //       ChangeFlagStatus after a listener call?
     884           0 :     offered_incompatible_qos_status_.total_count_change = 0;
     885             :   }
     886             : 
     887           0 :   notify_status_condition();
     888           0 : }
     889             : 
     890             : void
     891           0 : DataWriterImpl::update_subscription_params(const GUID_t& readerId,
     892             :                                            const DDS::StringSeq& params)
     893             : {
     894             : #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     895             :   ACE_UNUSED_ARG(readerId);
     896             :   ACE_UNUSED_ARG(params);
     897             : #else
     898           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
     899           0 :   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
     900           0 :   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
     901             : 
     902           0 :   if (iter != reader_info_.end()) {
     903           0 :     iter->second.expression_params_ = params;
     904             : 
     905           0 :   } else if (DCPS_debug_level > 4 &&
     906           0 :              TheServiceParticipant->publisher_content_filter()) {
     907           0 :     ACE_DEBUG((LM_WARNING,
     908             :                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
     909             :                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
     910             :                LogGuid(this->publication_id_).c_str(), LogGuid(readerId).c_str()));
     911             :   }
     912             : 
     913             : #endif
     914           0 : }
     915             : 
     916           0 : DDS::ReturnCode_t DataWriterImpl::set_qos(const DDS::DataWriterQos& qos)
     917             : {
     918             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     919             :   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     920             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     921             :   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     922             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     923             : 
     924           0 :   DDS::DataWriterQos new_qos = qos;
     925           0 :   new_qos.representation.value = qos_.representation.value;
     926           0 :   if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
     927           0 :     if (qos_ == new_qos)
     928           0 :       return DDS::RETCODE_OK;
     929             : 
     930           0 :     if (enabled_) {
     931           0 :       if (!Qos_Helper::changeable(qos_, new_qos)) {
     932           0 :         return DDS::RETCODE_IMMUTABLE_POLICY;
     933             :       }
     934             : 
     935           0 :       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     936           0 :       DDS::PublisherQos publisherQos;
     937           0 :       RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
     938             : 
     939           0 :       bool status = false;
     940           0 :       if (publisher) {
     941           0 :         publisher->get_qos(publisherQos);
     942             :         status
     943           0 :           = disco->update_publication_qos(domain_id_,
     944           0 :                                           dp_id_,
     945           0 :                                           this->publication_id_,
     946             :                                           new_qos,
     947             :                                           publisherQos);
     948             :       }
     949           0 :       if (!status) {
     950           0 :         ACE_ERROR_RETURN((LM_ERROR,
     951             :                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
     952             :                           ACE_TEXT("qos not updated.\n")),
     953             :                          DDS::RETCODE_ERROR);
     954             :       }
     955             : 
     956           0 :       if (!(qos_ == new_qos)) {
     957           0 :         data_container_->set_deadline_period(TimeDuration(qos.deadline.period));
     958           0 :         qos_ = new_qos;
     959             :       }
     960           0 :     }
     961             : 
     962           0 :     qos_ = new_qos;
     963           0 :     passed_qos_ = qos;
     964             : 
     965           0 :     const Observer_rch observer = get_observer(Observer::e_QOS_CHANGED);
     966           0 :     if (observer) {
     967           0 :       observer->on_qos_changed(this);
     968             :     }
     969             : 
     970           0 :     return DDS::RETCODE_OK;
     971             : 
     972           0 :   } else {
     973           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     974             :   }
     975           0 : }
     976             : 
     977             : DDS::ReturnCode_t
     978           0 : DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
     979             : {
     980           0 :   qos = passed_qos_;
     981           0 :   return DDS::RETCODE_OK;
     982             : }
     983             : 
     984             : DDS::ReturnCode_t
     985           0 : DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
     986             :                              DDS::StatusMask mask)
     987             : {
     988           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     989           0 :   listener_mask_ = mask;
     990             :   //note: OK to duplicate  a nil object ref
     991           0 :   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
     992           0 :   return DDS::RETCODE_OK;
     993           0 : }
     994             : 
     995             : DDS::DataWriterListener_ptr
     996           0 : DataWriterImpl::get_listener()
     997             : {
     998           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     999           0 :   return DDS::DataWriterListener::_duplicate(listener_.in());
    1000           0 : }
    1001             : 
    1002             : DataWriterListener_ptr
    1003           0 : DataWriterImpl::get_ext_listener()
    1004             : {
    1005           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    1006           0 :   return DataWriterListener::_narrow(listener_.in());
    1007           0 : }
    1008             : 
    1009             : DDS::Topic_ptr
    1010           0 : DataWriterImpl::get_topic()
    1011             : {
    1012           0 :   return DDS::Topic::_duplicate(topic_servant_.get());
    1013             : }
    1014             : 
    1015             : bool
    1016           0 : DataWriterImpl::should_ack() const
    1017             : {
    1018             :   // N.B. It may be worthwhile to investigate a more efficient
    1019             :   // heuristic for determining if a writer should send SAMPLE_ACK
    1020             :   // control samples. Perhaps based on a sequence number delta?
    1021           0 :   return this->readers_.size() != 0;
    1022             : }
    1023             : 
    1024             : DataWriterImpl::AckToken
    1025           0 : DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
    1026             : {
    1027           0 :   const SequenceNumber sn = get_max_sn();
    1028           0 :   if (DCPS_debug_level > 0) {
    1029           0 :     ACE_DEBUG((LM_DEBUG,
    1030             :                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
    1031             :                ACE_TEXT("for sequence %q\n"),
    1032             :                sn.getValue()));
    1033             :   }
    1034           0 :   return AckToken(max_wait, sn);
    1035             : }
    1036             : 
    1037             : 
    1038             : 
    1039             : DDS::ReturnCode_t
    1040           0 : DataWriterImpl::send_request_ack()
    1041             : {
    1042           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1043             :                    guard,
    1044             :                    get_lock(),
    1045             :                    DDS::RETCODE_ERROR);
    1046             : 
    1047             : 
    1048           0 :   DataSampleElement* element = 0;
    1049           0 :   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
    1050             : 
    1051           0 :   if (ret != DDS::RETCODE_OK) {
    1052           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1053             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1054             :                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
    1055             :                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
    1056             :                       ret),
    1057             :                      ret);
    1058             :   }
    1059             : 
    1060           0 :   Message_Block_Ptr blk;
    1061             :   // Add header with the registration sample data.
    1062             :   Message_Block_Ptr sample(
    1063             :     create_control_message(
    1064             :       REQUEST_ACK,
    1065             :       element->get_header(),
    1066           0 :       move(blk),
    1067           0 :       SystemTimePoint::now().to_dds_time()));
    1068             : 
    1069           0 :   element->set_sample(move(sample));
    1070             : 
    1071           0 :   ret = this->data_container_->enqueue_control(element);
    1072             : 
    1073           0 :   if (ret != DDS::RETCODE_OK) {
    1074           0 :     data_container_->release_buffer(element);
    1075           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1076             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1077             :                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
    1078             :                       ACE_TEXT("enqueue_control failed.\n")),
    1079             :                      ret);
    1080             :   }
    1081             : 
    1082             : 
    1083           0 :   send_all_to_flush_control(guard);
    1084             : 
    1085           0 :   return DDS::RETCODE_OK;
    1086           0 : }
    1087             : 
    1088             : DDS::ReturnCode_t
    1089           0 : DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
    1090             : {
    1091           0 :   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
    1092           0 :     return DDS::RETCODE_OK;
    1093             : 
    1094           0 :   DDS::ReturnCode_t ret = send_request_ack();
    1095             : 
    1096           0 :   if (ret != DDS::RETCODE_OK)
    1097           0 :     return ret;
    1098             : 
    1099           0 :   DataWriterImpl::AckToken token = create_ack_token(max_wait);
    1100           0 :   if (DCPS_debug_level) {
    1101           0 :     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
    1102             :                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
    1103             :                           token.sequence_.getValue()));
    1104             :   }
    1105           0 :   return wait_for_specific_ack(token);
    1106           0 : }
    1107             : 
    1108             : DDS::ReturnCode_t
    1109           0 : DataWriterImpl::wait_for_specific_ack(const AckToken& token)
    1110             : {
    1111           0 :   return this->data_container_->wait_ack_of_seq(token.deadline(), token.deadline_is_infinite(), token.sequence_);
    1112             : }
    1113             : 
    1114             : DDS::Publisher_ptr
    1115           0 : DataWriterImpl::get_publisher()
    1116             : {
    1117           0 :   return publisher_servant_.lock()._retn();
    1118             : }
    1119             : 
    1120             : DDS::ReturnCode_t
    1121           0 : DataWriterImpl::get_liveliness_lost_status(
    1122             :   DDS::LivelinessLostStatus & status)
    1123             : {
    1124           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1125             :                    guard,
    1126             :                    this->lock_,
    1127             :                    DDS::RETCODE_ERROR);
    1128           0 :   set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
    1129           0 :   status = liveliness_lost_status_;
    1130           0 :   liveliness_lost_status_.total_count_change = 0;
    1131           0 :   return DDS::RETCODE_OK;
    1132           0 : }
    1133             : 
    1134             : DDS::ReturnCode_t
    1135           0 : DataWriterImpl::get_offered_deadline_missed_status(
    1136             :   DDS::OfferedDeadlineMissedStatus & status)
    1137             : {
    1138           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1139             :                    guard,
    1140             :                    this->lock_,
    1141             :                    DDS::RETCODE_ERROR);
    1142             : 
    1143           0 :   set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
    1144             : 
    1145           0 :   this->offered_deadline_missed_status_.total_count_change =
    1146           0 :     this->offered_deadline_missed_status_.total_count
    1147           0 :     - this->last_deadline_missed_total_count_;
    1148             : 
    1149             :   // Update for next status check.
    1150           0 :   this->last_deadline_missed_total_count_ =
    1151           0 :     this->offered_deadline_missed_status_.total_count;
    1152             : 
    1153           0 :   status = offered_deadline_missed_status_;
    1154             : 
    1155           0 :   this->offered_deadline_missed_status_.total_count_change = 0;
    1156             : 
    1157           0 :   return DDS::RETCODE_OK;
    1158           0 : }
    1159             : 
    1160             : DDS::ReturnCode_t
    1161           0 : DataWriterImpl::get_offered_incompatible_qos_status(
    1162             :   DDS::OfferedIncompatibleQosStatus & status)
    1163             : {
    1164           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1165             :                    guard,
    1166             :                    this->lock_,
    1167             :                    DDS::RETCODE_ERROR);
    1168           0 :   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
    1169           0 :   status = offered_incompatible_qos_status_;
    1170           0 :   offered_incompatible_qos_status_.total_count_change = 0;
    1171           0 :   return DDS::RETCODE_OK;
    1172           0 : }
    1173             : 
    1174             : DDS::ReturnCode_t
    1175           0 : DataWriterImpl::get_publication_matched_status(
    1176             :   DDS::PublicationMatchedStatus & status)
    1177             : {
    1178           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1179             :                    guard,
    1180             :                    this->lock_,
    1181             :                    DDS::RETCODE_ERROR);
    1182           0 :   set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
    1183           0 :   status = publication_match_status_;
    1184           0 :   publication_match_status_.total_count_change = 0;
    1185           0 :   publication_match_status_.current_count_change = 0;
    1186           0 :   return DDS::RETCODE_OK;
    1187           0 : }
    1188             : 
    1189             : DDS::ReturnCode_t
    1190           0 : DataWriterImpl::assert_liveliness()
    1191             : {
    1192           0 :   switch (this->qos_.liveliness.kind) {
    1193           0 :   case DDS::AUTOMATIC_LIVELINESS_QOS:
    1194             :     // Do nothing.
    1195           0 :     break;
    1196           0 :   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
    1197             :     {
    1198           0 :       RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    1199           0 :       if (participant) {
    1200           0 :         return participant->assert_liveliness();
    1201             :       }
    1202           0 :     }
    1203           0 :     break;
    1204           0 :   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
    1205           0 :     if (!send_liveliness(MonotonicTimePoint::now())) {
    1206           0 :       return DDS::RETCODE_ERROR;
    1207             :     }
    1208           0 :     break;
    1209             :   }
    1210             : 
    1211           0 :   return DDS::RETCODE_OK;
    1212             : }
    1213             : 
    1214             : DDS::ReturnCode_t
    1215           0 : DataWriterImpl::assert_liveliness_by_participant()
    1216             : {
    1217             :   // This operation is called by participant.
    1218           0 :   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
    1219             :     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
    1220           0 :     liveliness_asserted_ = true;
    1221             :   }
    1222             : 
    1223           0 :   return DDS::RETCODE_OK;
    1224             : }
    1225             : 
    1226             : TimeDuration
    1227           0 : DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
    1228             : {
    1229           0 :   if (this->qos_.liveliness.kind == kind) {
    1230           0 :     return liveliness_check_interval_;
    1231             :   } else {
    1232           0 :     return TimeDuration::max_value;
    1233             :   }
    1234             : }
    1235             : 
    1236             : bool
    1237           0 : DataWriterImpl::participant_liveliness_activity_after(const MonotonicTimePoint& tv)
    1238             : {
    1239           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
    1240           0 :   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
    1241           0 :     return last_liveliness_activity_time_ > tv;
    1242             :   } else {
    1243           0 :     return false;
    1244             :   }
    1245           0 : }
    1246             : 
    1247             : DDS::ReturnCode_t
    1248           0 : DataWriterImpl::get_matched_subscriptions(
    1249             :   DDS::InstanceHandleSeq & subscription_handles)
    1250             : {
    1251           0 :   if (!enabled_) {
    1252           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1253             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1254             :                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
    1255             :                       ACE_TEXT(" Entity is not enabled.\n")),
    1256             :                      DDS::RETCODE_NOT_ENABLED);
    1257             :   }
    1258             : 
    1259           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1260             :                    guard,
    1261             :                    this->lock_,
    1262             :                    DDS::RETCODE_ERROR);
    1263             : 
    1264             :   // Copy out the handles for the current set of subscriptions.
    1265           0 :   int index = 0;
    1266           0 :   subscription_handles.length(
    1267           0 :     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
    1268             : 
    1269           0 :   for (RepoIdToHandleMap::iterator
    1270           0 :        current = this->id_to_handle_map_.begin();
    1271           0 :        current != this->id_to_handle_map_.end();
    1272           0 :        ++current, ++index) {
    1273           0 :     subscription_handles[index] = current->second;
    1274             :   }
    1275             : 
    1276           0 :   return DDS::RETCODE_OK;
    1277           0 : }
    1278             : 
    1279             : #if !defined (DDS_HAS_MINIMUM_BIT)
    1280             : DDS::ReturnCode_t
    1281           0 : DataWriterImpl::get_matched_subscription_data(
    1282             :   DDS::SubscriptionBuiltinTopicData & subscription_data,
    1283             :   DDS::InstanceHandle_t subscription_handle)
    1284             : {
    1285           0 :   if (!enabled_) {
    1286           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1287             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
    1288             :                       ACE_TEXT("get_matched_subscription_data: ")
    1289             :                       ACE_TEXT("Entity is not enabled.\n")),
    1290             :                      DDS::RETCODE_NOT_ENABLED);
    1291             :   }
    1292           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    1293             : 
    1294           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
    1295           0 :   DDS::SubscriptionBuiltinTopicDataSeq data;
    1296             : 
    1297           0 :   if (participant) {
    1298           0 :     ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
    1299             :             participant.in(),
    1300             :             BUILT_IN_SUBSCRIPTION_TOPIC,
    1301             :             subscription_handle,
    1302             :             data);
    1303             :   }
    1304             : 
    1305           0 :   if (ret == DDS::RETCODE_OK) {
    1306           0 :     subscription_data = data[0];
    1307             :   }
    1308             : 
    1309           0 :   return ret;
    1310           0 : }
    1311             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1312             : 
    1313             : DDS::ReturnCode_t
    1314           0 : DataWriterImpl::enable()
    1315             : {
    1316             :   //According spec:
    1317             :   // - Calling enable on an already enabled Entity returns OK and has no
    1318             :   // effect.
    1319             :   // - Calling enable on an Entity whose factory is not enabled will fail
    1320             :   // and return PRECONDITION_NOT_MET.
    1321             : 
    1322           0 :   if (this->is_enabled()) {
    1323           0 :     return DDS::RETCODE_OK;
    1324             :   }
    1325             : 
    1326           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    1327           0 :   if (!publisher || !publisher->is_enabled()) {
    1328           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1329             :   }
    1330             : 
    1331           0 :   if (!topic_servant_->is_enabled()) {
    1332           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1333             :   }
    1334             : 
    1335           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
    1336           0 :   if (participant) {
    1337           0 :     dp_id_ = participant->get_id();
    1338             :   }
    1339             : 
    1340             :   // Note: do configuration based on QoS in enable() because
    1341             :   //       before enable is called the QoS can be changed -- even
    1342             :   //       for Changeable=NO
    1343             : 
    1344             :   // Configure WriteDataContainer constructor parameters from qos.
    1345             : 
    1346           0 :   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
    1347             : 
    1348           0 :   CORBA::Long const max_samples_per_instance =
    1349           0 :     (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED)
    1350           0 :     ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance;
    1351             : 
    1352           0 :   CORBA::Long max_instances = 0, max_total_samples = 0;
    1353             : 
    1354           0 :   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
    1355           0 :     n_chunks_ = qos_.resource_limits.max_samples;
    1356             : 
    1357           0 :     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED ||
    1358           0 :         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
    1359           0 :         || (qos_.resource_limits.max_samples <
    1360           0 :             (qos_.resource_limits.max_instances * max_samples_per_instance))) {
    1361           0 :       max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
    1362             :     }
    1363             :   }
    1364             : 
    1365           0 :   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
    1366           0 :     max_instances = qos_.resource_limits.max_instances;
    1367             : 
    1368           0 :   const CORBA::Long history_depth =
    1369           0 :     (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS ||
    1370           0 :      qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth;
    1371             : 
    1372           0 :   const CORBA::Long max_durable_per_instance =
    1373           0 :     qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
    1374             : 
    1375             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    1376             :   // Get data durability cache if DataWriter QoS requires durable
    1377             :   // samples.  Publisher servant retains ownership of the cache.
    1378             :   DataDurabilityCache* const durability_cache =
    1379           0 :     TheServiceParticipant->get_data_durability_cache(qos_.durability);
    1380             : #endif
    1381             : 
    1382             :   //Note: the QoS used to set n_chunks_ is Changeable=No so
    1383             :   // it is OK that we cannot change the size of our allocators.
    1384           0 :   data_container_ = RcHandle<WriteDataContainer>(
    1385             :     new WriteDataContainer(
    1386             :       this,
    1387             :       max_samples_per_instance,
    1388             :       history_depth,
    1389             :       max_durable_per_instance,
    1390           0 :       qos_.reliability.max_blocking_time,
    1391             :       n_chunks_,
    1392             :       domain_id_,
    1393             :       topic_name_,
    1394           0 :       get_type_name(),
    1395             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    1396             :       durability_cache,
    1397           0 :       qos_.durability_service,
    1398             : #endif
    1399             :       max_instances,
    1400             :       max_total_samples,
    1401           0 :       lock_,
    1402           0 :       offered_deadline_missed_status_,
    1403           0 :       last_deadline_missed_total_count_),
    1404           0 :      keep_count());
    1405             : 
    1406             :   // +1 because we might allocate one before releasing another
    1407             :   // TBD - see if this +1 can be removed.
    1408           0 :   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
    1409           0 :   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
    1410           0 :   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
    1411             : 
    1412           0 :   if (DCPS_debug_level >= 2) {
    1413           0 :     ACE_DEBUG((LM_DEBUG,
    1414             :                "(%P|%t) DataWriterImpl::enable-mb"
    1415             :                " Cached_Allocator_With_Overflow %x with %B chunks\n",
    1416             :                mb_allocator_.get(),
    1417             :                n_chunks_));
    1418             : 
    1419           0 :     ACE_DEBUG((LM_DEBUG,
    1420             :                "(%P|%t) DataWriterImpl::enable-db"
    1421             :                " Cached_Allocator_With_Overflow %x with %B chunks\n",
    1422             :                db_allocator_.get(),
    1423             :                n_chunks_));
    1424             : 
    1425           0 :     ACE_DEBUG((LM_DEBUG,
    1426             :                "(%P|%t) DataWriterImpl::enable-header"
    1427             :                " Cached_Allocator_With_Overflow %x with %B chunks\n",
    1428             :                header_allocator_.get(),
    1429             :                n_chunks_));
    1430             :   }
    1431             : 
    1432           0 :   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
    1433           0 :       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
    1434             :     // Must be at least 1 micro second.
    1435             :     liveliness_check_interval_ = std::max(
    1436           0 :       TimeDuration(qos_.liveliness.lease_duration) * (TheServiceParticipant->liveliness_factor() / 100.0),
    1437           0 :       TimeDuration(0, 1));
    1438             : 
    1439           0 :     if (reactor_->schedule_timer(liveness_timer_.in(),
    1440             :                                  0,
    1441             :                                  liveliness_check_interval_.value(),
    1442           0 :                                  liveliness_check_interval_.value()) == -1) {
    1443           0 :       ACE_ERROR((LM_ERROR,
    1444             :                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
    1445             :                  ACE_TEXT("schedule_timer")));
    1446             : 
    1447             :     }
    1448             :   }
    1449             : 
    1450           0 :   if (!participant) {
    1451           0 :     return DDS::RETCODE_ERROR;
    1452             :   }
    1453             : 
    1454           0 :   participant->add_adjust_liveliness_timers(this);
    1455             : 
    1456           0 :   data_container_->set_deadline_period(TimeDuration(qos_.deadline.period));
    1457             : 
    1458           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
    1459           0 :   disco->pre_writer(this);
    1460             : 
    1461           0 :   this->set_enabled();
    1462             : 
    1463             :   try {
    1464           0 :     this->enable_transport(reliable,
    1465           0 :                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
    1466             : 
    1467           0 :   } catch (const Transport::Exception&) {
    1468           0 :     ACE_ERROR((LM_ERROR,
    1469             :                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
    1470             :                ACE_TEXT("Transport Exception.\n")));
    1471           0 :     data_container_->shutdown_ = true;
    1472           0 :     return DDS::RETCODE_ERROR;
    1473           0 :   }
    1474             : 
    1475             :   // Must be done after transport enabled.
    1476           0 :   set_writer_effective_data_rep_qos(qos_.representation.value, cdr_encapsulation());
    1477           0 :   if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
    1478           0 :     data_container_->shutdown_ = true;
    1479           0 :     return DDS::RETCODE_ERROR;
    1480             :   }
    1481             : 
    1482             :   // Done after enable_transport so we know its swap_bytes.
    1483           0 :   const DDS::ReturnCode_t setup_serialization_result = setup_serialization();
    1484           0 :   if (setup_serialization_result != DDS::RETCODE_OK) {
    1485           0 :     data_container_->shutdown_ = true;
    1486           0 :     return setup_serialization_result;
    1487             :   }
    1488             : 
    1489           0 :   const TransportLocatorSeq& trans_conf_info = connection_info();
    1490           0 :   DDS::PublisherQos pub_qos;
    1491           0 :   publisher->get_qos(pub_qos);
    1492             : 
    1493           0 :   XTypes::TypeInformation type_info;
    1494           0 :   type_support_->to_type_info(type_info);
    1495             : 
    1496           0 :   XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
    1497           0 :   type_support_->add_types(type_lookup_service);
    1498             : 
    1499             :   const GUID_t publication_id =
    1500           0 :     disco->add_publication(this->domain_id_,
    1501           0 :                            this->dp_id_,
    1502           0 :                            this->topic_servant_->get_id(),
    1503           0 :                            rchandle_from(this),
    1504           0 :                            this->qos_,
    1505             :                            trans_conf_info,
    1506             :                            pub_qos,
    1507             :                            type_info);
    1508             : 
    1509           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
    1510           0 :   publication_id_ = publication_id;
    1511             : 
    1512           0 :   if (publication_id_ == GUID_UNKNOWN) {
    1513           0 :     if (DCPS_debug_level >= 1) {
    1514           0 :       ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::enable: "
    1515             :         "add_publication failed\n"));
    1516             :     }
    1517           0 :     data_container_->shutdown_ = true;
    1518           0 :     return DDS::RETCODE_ERROR;
    1519             :   }
    1520             : 
    1521             : #if defined(OPENDDS_SECURITY)
    1522           0 :   security_config_ = participant->get_security_config();
    1523           0 :   participant_permissions_handle_ = participant->permissions_handle();
    1524           0 :   dynamic_type_ = type_support_->get_type();
    1525             : #endif
    1526             : 
    1527           0 :   if (DCPS_debug_level >= 2) {
    1528           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::enable: "
    1529             :       "got GUID %C, publishing to topic name \"%C\" type \"%C\"\n",
    1530             :       LogGuid(publication_id_).c_str(),
    1531             :       topic_servant_->topic_name(), topic_servant_->type_name()));
    1532             :   }
    1533             : 
    1534           0 :   this->data_container_->publication_id_ = this->publication_id_;
    1535             : 
    1536           0 :   guard.release();
    1537             : 
    1538             :   const DDS::ReturnCode_t writer_enabled_result =
    1539           0 :     publisher->writer_enabled(topic_name_.in(), this);
    1540             : 
    1541           0 :   if (this->monitor_) {
    1542           0 :     this->monitor_->report();
    1543             :   }
    1544             : 
    1545             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    1546             : 
    1547             :   // Move cached data from the durability cache to the unsent data
    1548             :   // queue.
    1549           0 :   if (durability_cache != 0) {
    1550             : 
    1551           0 :     if (!durability_cache->get_data(this->domain_id_,
    1552             :                                     this->topic_name_,
    1553             :                                     get_type_name(),
    1554             :                                     this,
    1555           0 :                                     this->mb_allocator_.get(),
    1556           0 :                                     this->db_allocator_.get(),
    1557           0 :                                     this->qos_.lifespan)) {
    1558           0 :       ACE_ERROR((LM_ERROR,
    1559             :                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
    1560             :                  ACE_TEXT("unable to retrieve durable data\n")));
    1561             :     }
    1562             :   }
    1563             : 
    1564             : #endif
    1565             : 
    1566           0 :   if (writer_enabled_result == DDS::RETCODE_OK) {
    1567           0 :     const Observer_rch observer = get_observer(Observer::e_ENABLED);
    1568           0 :     if (observer) {
    1569           0 :       observer->on_enabled(this);
    1570             :     }
    1571           0 :   }
    1572             : 
    1573           0 :   return writer_enabled_result;
    1574           0 : }
    1575             : 
    1576             : void
    1577           0 : DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
    1578             : {
    1579             :   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
    1580             : 
    1581           0 :   SendStateDataSampleList list;
    1582             : 
    1583           0 :   ACE_UINT64 transaction_id = this->get_unsent_data(list);
    1584             : 
    1585           0 :   controlTracker.message_sent();
    1586             : 
    1587             :   //need to release guard to call down to transport
    1588           0 :   guard.release();
    1589             : 
    1590           0 :   this->send(list, transaction_id);
    1591           0 : }
    1592             : 
    1593             : DDS::ReturnCode_t
    1594           0 : DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
    1595             :                                     Message_Block_Ptr data,
    1596             :                                     const DDS::Time_t& source_timestamp)
    1597             : {
    1598             :   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
    1599             : 
    1600           0 :   if (!enabled_) {
    1601           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1602             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1603             :                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
    1604             :                       ACE_TEXT("Entity is not enabled.\n")),
    1605             :                      DDS::RETCODE_NOT_ENABLED);
    1606             :   }
    1607             : 
    1608           0 :   DDS::ReturnCode_t ret = data_container_->register_instance(handle, data);
    1609           0 :   if (ret != DDS::RETCODE_OK) {
    1610           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1611             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
    1612             :                       ACE_TEXT("register instance with container failed, returned <%C>.\n"),
    1613             :                       retcode_to_string(ret)),
    1614             :                      ret);
    1615             :   }
    1616             : 
    1617           0 :   if (this->monitor_) {
    1618           0 :     this->monitor_->report();
    1619             :   }
    1620             : 
    1621           0 :   DataSampleElement* element = 0;
    1622           0 :   ret = this->data_container_->obtain_buffer_for_control(element);
    1623           0 :   if (ret != DDS::RETCODE_OK) {
    1624           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1625             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1626             :                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
    1627             :                       ACE_TEXT("obtain_buffer_for_control failed, returned <%C>.\n"),
    1628             :                       retcode_to_string(ret)),
    1629             :                      ret);
    1630             :   }
    1631             : 
    1632             :   // Add header with the registration sample data.
    1633             :   Message_Block_Ptr sample(
    1634             :     create_control_message(
    1635             :      INSTANCE_REGISTRATION,
    1636             :      element->get_header(),
    1637           0 :      move(data),
    1638           0 :      source_timestamp));
    1639             : 
    1640           0 :   element->set_sample(move(sample));
    1641             : 
    1642           0 :   ret = this->data_container_->enqueue_control(element);
    1643             : 
    1644           0 :   if (ret != DDS::RETCODE_OK) {
    1645           0 :     data_container_->release_buffer(element);
    1646           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1647             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1648             :                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
    1649             :                       ACE_TEXT("enqueue_control failed, returned <%C>\n"),
    1650             :                       retcode_to_string(ret)),
    1651             :                      ret);
    1652             :   }
    1653             : 
    1654           0 :   return ret;
    1655           0 : }
    1656             : 
    1657             : DDS::ReturnCode_t
    1658           0 : DataWriterImpl::register_instance_from_durable_data(
    1659             :   DDS::InstanceHandle_t& handle,
    1660             :   Message_Block_Ptr data,
    1661             :   const DDS::Time_t& source_timestamp)
    1662             : {
    1663             :   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
    1664             : 
    1665           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1666             :                    guard,
    1667             :                    get_lock(),
    1668             :                    DDS::RETCODE_ERROR);
    1669             : 
    1670           0 :   const DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
    1671           0 :   if (ret != DDS::RETCODE_OK) {
    1672           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1673             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
    1674             :                       ACE_TEXT("register instance with container failed, returned <%C>.\n"),
    1675             :                       retcode_to_string(ret)),
    1676             :                      ret);
    1677             :   }
    1678             : 
    1679           0 :   send_all_to_flush_control(guard);
    1680             : 
    1681           0 :   return ret;
    1682           0 : }
    1683             : 
    1684             : DDS::ReturnCode_t
    1685           0 : DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
    1686             :                                       const DDS::Time_t& source_timestamp)
    1687             : {
    1688             :   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
    1689             : 
    1690           0 :   if (!enabled_) {
    1691           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1692             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
    1693             :                       ACE_TEXT("Entity is not enabled.\n")),
    1694             :                      DDS::RETCODE_NOT_ENABLED);
    1695             :   }
    1696             : 
    1697             :   // According to spec 1.2, autodispose_unregistered_instances true causes
    1698             :   // dispose on the instance prior to calling unregister operation.
    1699           0 :   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
    1700           0 :     return this->dispose_and_unregister(handle, source_timestamp);
    1701             :   }
    1702             : 
    1703           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
    1704           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
    1705           0 :   Message_Block_Ptr unregistered_sample_data;
    1706           0 :   ret = this->data_container_->unregister(handle, unregistered_sample_data);
    1707             : 
    1708           0 :   if (ret != DDS::RETCODE_OK) {
    1709           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1710             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1711             :                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
    1712             :                       ACE_TEXT("unregister with container failed.\n")),
    1713             :                      ret);
    1714             :   }
    1715             : 
    1716           0 :   DataSampleElement* element = 0;
    1717           0 :   ret = this->data_container_->obtain_buffer_for_control(element);
    1718             : 
    1719           0 :   if (ret != DDS::RETCODE_OK) {
    1720           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1721             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1722             :                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
    1723             :                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
    1724             :                       ret),
    1725             :                      ret);
    1726             :   }
    1727             : 
    1728             :   Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE,
    1729             :                                                   element->get_header(),
    1730           0 :                                                   move(unregistered_sample_data),
    1731           0 :                                                   source_timestamp));
    1732           0 :   element->set_sample(move(sample));
    1733             : 
    1734           0 :   ret = this->data_container_->enqueue_control(element);
    1735             : 
    1736           0 :   if (ret != DDS::RETCODE_OK) {
    1737           0 :     data_container_->release_buffer(element);
    1738           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1739             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1740             :                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
    1741             :                       ACE_TEXT("enqueue_control failed.\n")),
    1742             :                      ret);
    1743             :   }
    1744             : 
    1745           0 :   send_all_to_flush_control(guard);
    1746           0 :   return DDS::RETCODE_OK;
    1747           0 : }
    1748             : 
    1749             : DDS::ReturnCode_t
    1750           0 : DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
    1751             :                                        const DDS::Time_t& source_timestamp)
    1752             : {
    1753             :   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
    1754             : 
    1755           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
    1756           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
    1757             : 
    1758           0 :   Message_Block_Ptr data_sample;
    1759           0 :   ret = this->data_container_->dispose(handle, data_sample);
    1760             : 
    1761           0 :   if (ret != DDS::RETCODE_OK) {
    1762           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1763             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1764             :                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
    1765             :                       ACE_TEXT("dispose on container failed.\n")),
    1766             :                      ret);
    1767             :   }
    1768             : 
    1769           0 :   ret = this->data_container_->unregister(handle, data_sample, false);
    1770             : 
    1771           0 :   if (ret != DDS::RETCODE_OK) {
    1772           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1773             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1774             :                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
    1775             :                       ACE_TEXT("unregister with container failed.\n")),
    1776             :                      ret);
    1777             :   }
    1778             : 
    1779           0 :   DataSampleElement* element = 0;
    1780           0 :   ret = this->data_container_->obtain_buffer_for_control(element);
    1781             : 
    1782           0 :   if (ret != DDS::RETCODE_OK) {
    1783           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1784             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1785             :                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
    1786             :                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
    1787             :                       ret),
    1788             :                      ret);
    1789             :   }
    1790             : 
    1791             :   Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
    1792             :                                                   element->get_header(),
    1793           0 :                                                   move(data_sample),
    1794           0 :                                                   source_timestamp));
    1795           0 :   element->set_sample(move(sample));
    1796             : 
    1797           0 :   ret = this->data_container_->enqueue_control(element);
    1798             : 
    1799           0 :   if (ret != DDS::RETCODE_OK) {
    1800           0 :     data_container_->release_buffer(element);
    1801           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1802             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1803             :                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
    1804             :                       ACE_TEXT("enqueue_control failed.\n")),
    1805             :                      ret);
    1806             :   }
    1807             : 
    1808           0 :   send_all_to_flush_control(guard);
    1809           0 :   return DDS::RETCODE_OK;
    1810           0 : }
    1811             : 
    1812             : void
    1813           0 : DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
    1814             : {
    1815           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
    1816             : 
    1817           0 :   while (!this->data_container_->instances_.empty()) {
    1818           0 :     this->unregister_instance_i(this->data_container_->instances_.begin()->first, source_timestamp);
    1819             :   }
    1820           0 : }
    1821             : 
    1822             : DDS::ReturnCode_t
    1823           0 : DataWriterImpl::write(Message_Block_Ptr data,
    1824             :                       DDS::InstanceHandle_t handle,
    1825             :                       const DDS::Time_t& source_timestamp,
    1826             :                       GUIDSeq* filter_out,
    1827             :                       const void* real_data)
    1828             : {
    1829             :   DBG_ENTRY_LVL("DataWriterImpl","write",6);
    1830             : 
    1831           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
    1832             : 
    1833             :   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
    1834           0 :   GUIDSeq_var filter_out_var(filter_out);
    1835             : 
    1836           0 :   if (!enabled_) {
    1837           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1838             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
    1839             :                       ACE_TEXT("Entity is not enabled.\n")),
    1840             :                      DDS::RETCODE_NOT_ENABLED);
    1841             :   }
    1842             : 
    1843           0 :   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
    1844             :                     dc_guard,
    1845             :                     get_lock(),
    1846             :                     DDS::RETCODE_ERROR);
    1847             : 
    1848           0 :   DataSampleElement* element = 0;
    1849           0 :   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
    1850             : 
    1851           0 :   if (ret == DDS::RETCODE_TIMEOUT) {
    1852           0 :     return ret; // silent for timeout
    1853             : 
    1854           0 :   } else if (ret != DDS::RETCODE_OK) {
    1855           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1856             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1857             :                       ACE_TEXT("DataWriterImpl::write: ")
    1858             :                       ACE_TEXT("obtain_buffer returned %d.\n"),
    1859             :                       ret),
    1860             :                      ret);
    1861             :   }
    1862             : 
    1863           0 :   Message_Block_Ptr temp;
    1864           0 :   ret = create_sample_data_message(move(data),
    1865             :                                    handle,
    1866             :                                    element->get_header(),
    1867             :                                    temp,
    1868             :                                    source_timestamp,
    1869             :                                    (filter_out != 0));
    1870           0 :   element->set_sample(move(temp));
    1871             : 
    1872           0 :   if (ret != DDS::RETCODE_OK) {
    1873           0 :     data_container_->release_buffer(element);
    1874           0 :     return ret;
    1875             :   }
    1876             : 
    1877           0 :   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
    1878             : 
    1879           0 :   ret = this->data_container_->enqueue(element, handle);
    1880             : 
    1881           0 :   if (ret != DDS::RETCODE_OK) {
    1882           0 :     data_container_->release_buffer(element);
    1883           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1884             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1885             :                       ACE_TEXT("DataWriterImpl::write: ")
    1886             :                       ACE_TEXT("enqueue failed.\n")),
    1887             :                      ret);
    1888             :   }
    1889           0 :   last_liveliness_activity_time_.set_to_now();
    1890             : 
    1891           0 :   track_sequence_number(filter_out);
    1892             : 
    1893           0 :   if (this->coherent_) {
    1894           0 :     ++this->coherent_samples_;
    1895             :   }
    1896           0 :   SendStateDataSampleList list;
    1897             : 
    1898           0 :   ACE_UINT64 transaction_id = this->get_unsent_data(list);
    1899             : 
    1900           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    1901           0 :   if (!publisher || publisher->is_suspended()) {
    1902           0 :     if (min_suspended_transaction_id_ == 0) {
    1903             :       //provides transaction id for lower bound of suspended transactions
    1904             :       //or transaction id for single suspended write transaction
    1905           0 :       min_suspended_transaction_id_ = transaction_id;
    1906             :     } else {
    1907             :       //when multiple write transactions have suspended, provides the upper bound
    1908             :       //for suspended transactions.
    1909           0 :       max_suspended_transaction_id_ = transaction_id;
    1910             :     }
    1911           0 :     this->available_data_list_.enqueue_tail(list);
    1912             : 
    1913             :   } else {
    1914           0 :     dc_guard.release();
    1915           0 :     guard.release();
    1916           0 :     this->send(list, transaction_id);
    1917             :   }
    1918             : 
    1919           0 :   const ValueDispatcher* vd = get_value_dispatcher();
    1920           0 :   const Observer_rch observer = get_observer(Observer::e_SAMPLE_SENT);
    1921           0 :   if (observer && real_data && vd) {
    1922           0 :     Observer::Sample s(handle, element->get_header().instance_state(), source_timestamp, element->get_header().sequence_, real_data, *vd);
    1923           0 :     observer->on_sample_sent(this, s);
    1924             :   }
    1925             : 
    1926           0 :   return DDS::RETCODE_OK;
    1927           0 : }
    1928             : 
    1929             : void
    1930           0 : DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
    1931             : {
    1932           0 :   const SequenceNumber sn = get_max_sn();
    1933           0 :   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
    1934             : 
    1935             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1936             :   // Track individual expected sequence numbers in ReaderInfo
    1937           0 :   RepoIdSet excluded;
    1938             : 
    1939           0 :   if (filter_out && !reader_info_.empty()) {
    1940           0 :     const GUID_t* buf = filter_out->get_buffer();
    1941           0 :     excluded.insert(buf, buf + filter_out->length());
    1942             :   }
    1943             : 
    1944           0 :   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
    1945           0 :        end = reader_info_.end(); iter != end; ++iter) {
    1946             :     // If not excluding this reader, update expected sequence
    1947           0 :     if (excluded.count(iter->first) == 0) {
    1948           0 :       iter->second.expected_sequence_ = sn;
    1949             :     }
    1950             :   }
    1951             : 
    1952             : #else
    1953             :   ACE_UNUSED_ARG(filter_out);
    1954             :   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
    1955             :        end = reader_info_.end(); iter != end; ++iter) {
    1956             :     iter->second.expected_sequence_ = sn;
    1957             :   }
    1958             : 
    1959             : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1960             : 
    1961           0 : }
    1962             : 
    1963             : void
    1964           0 : DataWriterImpl::send_suspended_data()
    1965             : {
    1966             :   //this serves to get TransportClient's max_transaction_id_seen_
    1967             :   //to the correct value for this list of transactions
    1968           0 :   if (max_suspended_transaction_id_ != 0) {
    1969           0 :     this->send(this->available_data_list_, max_suspended_transaction_id_);
    1970           0 :     max_suspended_transaction_id_ = 0;
    1971             :   }
    1972             : 
    1973             :   //this serves to actually have the send proceed in
    1974             :   //sending the samples to the datalinks by passing it
    1975             :   //the min_suspended_transaction_id_ which should be the
    1976             :   //TransportClient's expected_transaction_id_
    1977           0 :   this->send(this->available_data_list_, min_suspended_transaction_id_);
    1978           0 :   min_suspended_transaction_id_ = 0;
    1979           0 :   this->available_data_list_.reset();
    1980           0 : }
    1981             : 
    1982             : DDS::ReturnCode_t
    1983           0 : DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
    1984             :                         const DDS::Time_t & source_timestamp)
    1985             : {
    1986             :   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
    1987             : 
    1988           0 :   if (!enabled_) {
    1989           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1990             :                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
    1991             :                       ACE_TEXT("Entity is not enabled.\n")),
    1992             :                      DDS::RETCODE_NOT_ENABLED);
    1993             :   }
    1994             : 
    1995           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
    1996             : 
    1997           0 :   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
    1998             : 
    1999           0 :   Message_Block_Ptr registered_sample_data;
    2000           0 :   ret = this->data_container_->dispose(handle, registered_sample_data);
    2001             : 
    2002           0 :   if (ret != DDS::RETCODE_OK) {
    2003           0 :     ACE_ERROR_RETURN((LM_ERROR,
    2004             :                       ACE_TEXT("(%P|%t) ERROR: ")
    2005             :                       ACE_TEXT("DataWriterImpl::dispose: ")
    2006             :                       ACE_TEXT("dispose failed.\n")),
    2007             :                      ret);
    2008             :   }
    2009             : 
    2010           0 :   DataSampleElement* element = 0;
    2011           0 :   ret = this->data_container_->obtain_buffer_for_control(element);
    2012             : 
    2013           0 :   if (ret != DDS::RETCODE_OK) {
    2014           0 :     ACE_ERROR_RETURN((LM_ERROR,
    2015             :                       ACE_TEXT("(%P|%t) ERROR: ")
    2016             :                       ACE_TEXT("DataWriterImpl::dispose: ")
    2017             :                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
    2018             :                       ret),
    2019             :                      ret);
    2020             :   }
    2021             : 
    2022             :   Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE,
    2023             :                                                   element->get_header(),
    2024           0 :                                                   move(registered_sample_data),
    2025           0 :                                                   source_timestamp));
    2026           0 :   element->set_sample(move(sample));
    2027             : 
    2028           0 :   ret = this->data_container_->enqueue_control(element);
    2029             : 
    2030           0 :   if (ret != DDS::RETCODE_OK) {
    2031           0 :     data_container_->release_buffer(element);
    2032           0 :     ACE_ERROR_RETURN((LM_ERROR,
    2033             :                       ACE_TEXT("(%P|%t) ERROR: ")
    2034             :                       ACE_TEXT("DataWriterImpl::dispose: ")
    2035             :                       ACE_TEXT("enqueue_control failed.\n")),
    2036             :                      ret);
    2037             :   }
    2038             : 
    2039           0 :   send_all_to_flush_control(guard);
    2040             : 
    2041           0 :   return DDS::RETCODE_OK;
    2042           0 : }
    2043             : 
    2044             : DDS::ReturnCode_t
    2045           0 : DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
    2046             :                             size_t&                 size)
    2047             : {
    2048           0 :   return data_container_->num_samples(handle, size);
    2049             : }
    2050             : 
    2051             : void
    2052           0 : DataWriterImpl::unregister_all()
    2053             : {
    2054           0 :   data_container_->unregister_all();
    2055           0 : }
    2056             : 
    2057             : GUID_t
    2058           0 : DataWriterImpl::get_dp_id()
    2059             : {
    2060           0 :   return dp_id_;
    2061             : }
    2062             : 
    2063             : char const *
    2064           0 : DataWriterImpl::get_type_name() const
    2065             : {
    2066           0 :   return type_name_.in();
    2067             : }
    2068             : 
    2069             : ACE_Message_Block*
    2070           0 : DataWriterImpl::create_control_message(MessageId message_id,
    2071             :                                        DataSampleHeader& header_data,
    2072             :                                        Message_Block_Ptr data,
    2073             :                                        const DDS::Time_t& source_timestamp)
    2074             : {
    2075           0 :   header_data.message_id_ = message_id;
    2076           0 :   header_data.byte_order_ =
    2077           0 :     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
    2078           0 :   header_data.coherent_change_ = false;
    2079             : 
    2080           0 :   if (data) {
    2081           0 :     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
    2082             :   }
    2083             : 
    2084           0 :   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
    2085           0 :   header_data.sequence_repair_ = false; // set below
    2086           0 :   header_data.source_timestamp_sec_ = source_timestamp.sec;
    2087           0 :   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
    2088           0 :   header_data.publication_id_ = publication_id_;
    2089             : 
    2090           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    2091           0 :   if (!publisher) {
    2092           0 :     return 0;
    2093             :   }
    2094             : 
    2095           0 :   header_data.publisher_id_ = publisher->publisher_id_;
    2096             : 
    2097           0 :   ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
    2098           0 :   SequenceNumber sequence = sequence_number_;
    2099           0 :   if (message_id == INSTANCE_REGISTRATION
    2100           0 :       || message_id == DISPOSE_INSTANCE
    2101           0 :       || message_id == UNREGISTER_INSTANCE
    2102           0 :       || message_id == DISPOSE_UNREGISTER_INSTANCE
    2103           0 :       || message_id == REQUEST_ACK) {
    2104             : 
    2105           0 :     header_data.sequence_repair_ = need_sequence_repair();
    2106           0 :     header_data.sequence_ = get_next_sn_i();
    2107           0 :     header_data.key_fields_only_ = true;
    2108           0 :     sequence = sequence_number_;
    2109             :   }
    2110           0 :   guard.release();
    2111             : 
    2112           0 :   ACE_Message_Block* message = 0;
    2113           0 :   ACE_NEW_MALLOC_RETURN(message,
    2114             :                         static_cast<ACE_Message_Block*>(
    2115             :                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
    2116             :                         ACE_Message_Block(
    2117             :                           DataSampleHeader::get_max_serialized_size(),
    2118             :                           ACE_Message_Block::MB_DATA,
    2119             :                           header_data.message_length_ ? data.release() : 0, //cont
    2120             :                           0, //data
    2121             :                           0, //allocator_strategy
    2122             :                           get_db_lock(), //locking_strategy
    2123             :                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
    2124             :                           ACE_Time_Value::zero,
    2125             :                           ACE_Time_Value::max_time,
    2126             :                           db_allocator_.get(),
    2127             :                           mb_allocator_.get()),
    2128             :                         0);
    2129             : 
    2130           0 :   *message << header_data;
    2131             : 
    2132             :   // If we incremented sequence number for this control message
    2133           0 :   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
    2134           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
    2135             :     // Update the expected sequence number for all readers
    2136           0 :     RepoIdToReaderInfoMap::iterator reader;
    2137             : 
    2138           0 :     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
    2139           0 :       reader->second.expected_sequence_ = sequence;
    2140             :     }
    2141           0 :   }
    2142           0 :   if (DCPS_debug_level >= 4) {
    2143           0 :     ACE_DEBUG((LM_DEBUG,
    2144             :                ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
    2145             :                ACE_TEXT("from publication %C sending control sample: %C .\n"),
    2146             :                LogGuid(publication_id_).c_str(),
    2147             :                to_string(header_data).c_str()));
    2148             :   }
    2149           0 :   return message;
    2150           0 : }
    2151             : 
    2152             : DDS::ReturnCode_t
    2153           0 : DataWriterImpl::create_sample_data_message(Message_Block_Ptr data,
    2154             :                                            DDS::InstanceHandle_t instance_handle,
    2155             :                                            DataSampleHeader& header_data,
    2156             :                                            Message_Block_Ptr& message,
    2157             :                                            const DDS::Time_t& source_timestamp,
    2158             :                                            bool content_filter)
    2159             : {
    2160             :   PublicationInstance_rch instance =
    2161           0 :     data_container_->get_handle_instance(instance_handle);
    2162             : 
    2163           0 :   if (0 == instance) {
    2164           0 :     ACE_ERROR_RETURN((LM_ERROR,
    2165             :                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
    2166             :                       ACE_TEXT("failed to find instance for handle %d\n"),
    2167             :                       instance_handle),
    2168             :                      DDS::RETCODE_ERROR);
    2169             :   }
    2170             : 
    2171           0 :   header_data.message_id_ = SAMPLE_DATA;
    2172           0 :   header_data.byte_order_ =
    2173           0 :     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
    2174           0 :   header_data.coherent_change_ = this->coherent_;
    2175             : 
    2176           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    2177             : 
    2178           0 :   if (!publisher) {
    2179           0 :     return DDS::RETCODE_ERROR;
    2180             :   }
    2181             : 
    2182             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    2183           0 :   header_data.group_coherent_ =
    2184           0 :     publisher->qos_.presentation.access_scope
    2185           0 :     == DDS::GROUP_PRESENTATION_QOS;
    2186             : #endif
    2187           0 :   header_data.content_filter_ = content_filter;
    2188           0 :   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
    2189           0 :   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
    2190             :   {
    2191           0 :     ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
    2192           0 :     header_data.sequence_repair_ = need_sequence_repair();
    2193           0 :     header_data.sequence_ = get_next_sn_i();
    2194           0 :   }
    2195           0 :   header_data.source_timestamp_sec_ = source_timestamp.sec;
    2196           0 :   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
    2197             : 
    2198           0 :   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
    2199           0 :       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
    2200           0 :     header_data.lifespan_duration_ = true;
    2201           0 :     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
    2202           0 :     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
    2203             :   }
    2204             : 
    2205           0 :   header_data.publication_id_ = publication_id_;
    2206           0 :   header_data.publisher_id_ = publisher->publisher_id_;
    2207             : 
    2208             :   ACE_Message_Block* tmp_message;
    2209           0 :   ACE_NEW_MALLOC_RETURN(tmp_message,
    2210             :                         static_cast<ACE_Message_Block*>(
    2211             :                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
    2212             :                         ACE_Message_Block(DataSampleHeader::get_max_serialized_size(),
    2213             :                                           ACE_Message_Block::MB_DATA,
    2214             :                                           data.release(), //cont
    2215             :                                           0, //data
    2216             :                                           header_allocator_.get(), //alloc_strategy
    2217             :                                           get_db_lock(), //locking_strategy
    2218             :                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
    2219             :                                           ACE_Time_Value::zero,
    2220             :                                           ACE_Time_Value::max_time,
    2221             :                                           db_allocator_.get(),
    2222             :                                           mb_allocator_.get()),
    2223             :                         DDS::RETCODE_ERROR);
    2224           0 :   message.reset(tmp_message);
    2225           0 :   *message << header_data;
    2226           0 :   if (DCPS_debug_level >= 4) {
    2227           0 :     ACE_DEBUG((LM_DEBUG,
    2228             :                ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
    2229             :                ACE_TEXT("from publication %C sending data sample: %C .\n"),
    2230             :                LogGuid(publication_id_).c_str(),
    2231             :                to_string(header_data).c_str()));
    2232             :   }
    2233           0 :   return DDS::RETCODE_OK;
    2234           0 : }
    2235             : 
    2236             : void
    2237           0 : DataWriterImpl::data_delivered(const DataSampleElement* sample)
    2238             : {
    2239             :   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
    2240             : 
    2241           0 :   if (!(sample->get_pub_id() == this->publication_id_)) {
    2242           0 :     ACE_ERROR((LM_ERROR,
    2243             :                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
    2244             :                ACE_TEXT("The publication id %C from delivered element ")
    2245             :                ACE_TEXT("does not match the datawriter's id %C\n"),
    2246             :                LogGuid(sample->get_pub_id()).c_str(),
    2247             :                LogGuid(publication_id_).c_str()));
    2248           0 :     return;
    2249             :   }
    2250             :   //provided for statistics tracking in tests
    2251           0 :   ++data_delivered_count_;
    2252             : 
    2253           0 :   this->data_container_->data_delivered(sample);
    2254             : }
    2255             : 
    2256             : void
    2257           0 : DataWriterImpl::control_delivered(const Message_Block_Ptr&)
    2258             : {
    2259             :   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
    2260           0 :   controlTracker.message_delivered();
    2261           0 : }
    2262             : 
    2263             : RcHandle<EntityImpl>
    2264           0 : DataWriterImpl::parent() const
    2265             : {
    2266           0 :   return this->publisher_servant_.lock();
    2267             : }
    2268             : 
    2269             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    2270             : bool
    2271           0 : DataWriterImpl::filter_out(const DataSampleElement& elt,
    2272             :                            const OPENDDS_STRING& filterClassName,
    2273             :                            const FilterEvaluator& evaluator,
    2274             :                            const DDS::StringSeq& expression_params) const
    2275             : {
    2276           0 :   if (!type_support_) {
    2277           0 :     if (log_level >= LogLevel::Error) {
    2278           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::filter_out: Could not cast type support, not filtering\n"));
    2279             :     }
    2280           0 :     return false;
    2281             :   }
    2282             : 
    2283           0 :   if (filterClassName == "DDSSQL" ||
    2284           0 :       filterClassName == "OPENDDSSQL") {
    2285           0 :     if (!elt.get_header().valid_data() && evaluator.has_non_key_fields(*type_support_)) {
    2286           0 :       return true;
    2287             :     }
    2288             :     try {
    2289           0 :       return !evaluator.eval(elt.get_sample()->cont(), encoding_mode_.encoding(),
    2290           0 :                              *type_support_, expression_params);
    2291           0 :     } catch (const std::runtime_error&) {
    2292             :       // if the eval fails, the throws will do the logging
    2293             :       // return false here so that the sample is not filtered
    2294           0 :       return false;
    2295           0 :     }
    2296             :   } else {
    2297           0 :     return false;
    2298             :   }
    2299             : }
    2300             : #endif
    2301             : 
    2302             : bool
    2303           0 : DataWriterImpl::check_transport_qos(const TransportInst&)
    2304             : {
    2305             :   // DataWriter does not impose any constraints on which transports
    2306             :   // may be used based on QoS.
    2307           0 :   return true;
    2308             : }
    2309             : 
    2310             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    2311             : 
    2312             : bool
    2313           0 : DataWriterImpl::coherent_changes_pending()
    2314             : {
    2315           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2316             :                    guard,
    2317             :                    get_lock(),
    2318             :                    false);
    2319             : 
    2320           0 :   return this->coherent_;
    2321           0 : }
    2322             : 
    2323             : void
    2324           0 : DataWriterImpl::begin_coherent_changes()
    2325             : {
    2326           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    2327             :             guard,
    2328             :             get_lock());
    2329             : 
    2330           0 :   this->coherent_ = true;
    2331           0 : }
    2332             : 
    2333             : void
    2334           0 : DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
    2335             : {
    2336             :   // PublisherImpl::pi_lock_ should be held.
    2337           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    2338             :             guard,
    2339             :             get_lock());
    2340             : 
    2341           0 :   CoherentChangeControl end_msg;
    2342           0 :   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
    2343           0 :   end_msg.coherent_samples_.last_sample_ = get_max_sn();
    2344             : 
    2345           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    2346             : 
    2347           0 :   if (publisher) {
    2348             :     end_msg.group_coherent_
    2349           0 :       = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
    2350             :   }
    2351             : 
    2352           0 :   if (publisher && end_msg.group_coherent_) {
    2353           0 :     end_msg.publisher_id_ = publisher->publisher_id_;
    2354           0 :     end_msg.group_coherent_samples_ = group_samples;
    2355             :   }
    2356             : 
    2357             :   Message_Block_Ptr data(
    2358             :     new ACE_Message_Block(
    2359           0 :       end_msg.get_max_serialized_size(),
    2360             :       ACE_Message_Block::MB_DATA,
    2361             :       0, // cont
    2362             :       0, // data
    2363             :       0, // alloc_strategy
    2364           0 :       get_db_lock()));
    2365             : 
    2366             :   Serializer serializer(data.get(), Encoding::KIND_UNALIGNED_CDR,
    2367           0 :     this->swap_bytes());
    2368             : 
    2369           0 :   serializer << end_msg;
    2370             : 
    2371           0 :   DataSampleHeader header;
    2372             :   Message_Block_Ptr control(
    2373             :     create_control_message(
    2374           0 :       END_COHERENT_CHANGES, header, move(data),
    2375           0 :       SystemTimePoint::now().to_dds_time()));
    2376             : 
    2377           0 :   this->coherent_ = false;
    2378           0 :   this->coherent_samples_ = 0;
    2379             : 
    2380           0 :   guard.release();
    2381           0 :   if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
    2382           0 :     ACE_ERROR((LM_ERROR,
    2383             :                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
    2384             :                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
    2385             :   }
    2386           0 : }
    2387             : 
    2388             : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
    2389             : 
    2390             : void
    2391           0 : DataWriterImpl::data_dropped(const DataSampleElement* element,
    2392             :                              bool dropped_by_transport)
    2393             : {
    2394             :   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
    2395             : 
    2396             :   //provided for statistics tracking in tests
    2397           0 :   ++data_dropped_count_;
    2398             : 
    2399           0 :   this->data_container_->data_dropped(element, dropped_by_transport);
    2400           0 : }
    2401             : 
    2402             : void
    2403           0 : DataWriterImpl::control_dropped(const Message_Block_Ptr&,
    2404             :                                 bool /* dropped_by_transport */)
    2405             : {
    2406             :   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
    2407           0 :   controlTracker.message_dropped();
    2408           0 : }
    2409             : 
    2410             : DDS::DataWriterListener_ptr
    2411           0 : DataWriterImpl::listener_for(DDS::StatusKind kind)
    2412             : {
    2413             :   // per 2.1.4.3.1 Listener Access to Plain Communication Status
    2414             :   // use this entities factory if listener is mask not enabled
    2415             :   // for this kind.
    2416           0 :   RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
    2417           0 :   if (!publisher)
    2418           0 :     return 0;
    2419             : 
    2420           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    2421           0 :   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
    2422           0 :     g.release();
    2423           0 :     return publisher->listener_for(kind);
    2424             : 
    2425             :   } else {
    2426           0 :     return DDS::DataWriterListener::_duplicate(listener_.in());
    2427             :   }
    2428           0 : }
    2429             : 
    2430             : int
    2431           0 : DataWriterImpl::handle_timeout(const ACE_Time_Value& tv,
    2432             :                                const void* /* arg */)
    2433             : {
    2434           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    2435             : 
    2436           0 :   const MonotonicTimePoint now(tv);
    2437           0 :   bool liveliness_lost = false;
    2438             : 
    2439           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
    2440             : 
    2441           0 :   TimeDuration elapsed = now - last_liveliness_activity_time_;
    2442             : 
    2443             :   // Do we need to send a liveliness message?
    2444           0 :   if (elapsed >= liveliness_check_interval_) {
    2445           0 :     switch (this->qos_.liveliness.kind) {
    2446           0 :     case DDS::AUTOMATIC_LIVELINESS_QOS:
    2447           0 :       if (!send_liveliness(now)) {
    2448           0 :         liveliness_lost = true;
    2449             :       }
    2450           0 :       break;
    2451             : 
    2452           0 :     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
    2453           0 :       if (liveliness_asserted_) {
    2454           0 :         if (!send_liveliness(now)) {
    2455           0 :           liveliness_lost = true;
    2456             :         }
    2457             :       }
    2458           0 :       break;
    2459             : 
    2460           0 :     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
    2461             :       // Do nothing.
    2462           0 :       break;
    2463             :     }
    2464             :   }
    2465             :   else {
    2466             :     // Reschedule.
    2467           0 :     if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
    2468           0 :       ACE_ERROR((LM_ERROR,
    2469             :         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
    2470             :         ACE_TEXT("cancel_timer")));
    2471             :     }
    2472           0 :     if (reactor_->schedule_timer(liveness_timer_.in(), 0,
    2473           0 :       (liveliness_check_interval_ - elapsed).value(),
    2474           0 :       liveliness_check_interval_.value()) == -1)
    2475             :     {
    2476           0 :       ACE_ERROR((LM_ERROR,
    2477             :         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
    2478             :         ACE_TEXT("schedule_timer")));
    2479             :     }
    2480           0 :     return 0;
    2481             :   }
    2482             : 
    2483           0 :   liveliness_asserted_ = false;
    2484           0 :   elapsed = now - last_liveliness_activity_time_;
    2485             : 
    2486             :   // Have we lost liveliness?
    2487           0 :   if (elapsed >= TimeDuration(qos_.liveliness.lease_duration)) {
    2488           0 :     liveliness_lost = true;
    2489             :   }
    2490             : 
    2491           0 :   if (!this->liveliness_lost_ && liveliness_lost) {
    2492           0 :     ++ this->liveliness_lost_status_.total_count;
    2493           0 :     ++ this->liveliness_lost_status_.total_count_change;
    2494             : 
    2495             :     DDS::DataWriterListener_var listener =
    2496           0 :       listener_for(DDS::LIVELINESS_LOST_STATUS);
    2497             : 
    2498           0 :     if (!CORBA::is_nil(listener.in())) {
    2499             :       {
    2500           0 :         ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> rev_lock(lock_);
    2501           0 :         ACE_Guard<ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> > rev_guard(rev_lock);
    2502           0 :         listener->on_liveliness_lost(this, this->liveliness_lost_status_);
    2503           0 :       }
    2504           0 :       this->liveliness_lost_status_.total_count_change = 0;
    2505             :     }
    2506           0 :   }
    2507             : 
    2508           0 :   this->liveliness_lost_ = liveliness_lost;
    2509           0 :   return 0;
    2510           0 : }
    2511             : 
    2512             : bool
    2513           0 : DataWriterImpl::send_liveliness(const MonotonicTimePoint& now)
    2514             : {
    2515           0 :   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
    2516           0 :       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
    2517           0 :     DataSampleHeader header;
    2518           0 :     Message_Block_Ptr empty;
    2519             :     Message_Block_Ptr liveliness_msg(
    2520             :       create_control_message(
    2521           0 :         DATAWRITER_LIVELINESS, header, move(empty),
    2522           0 :         SystemTimePoint::now().to_dds_time()));
    2523             : 
    2524           0 :     if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
    2525           0 :       ACE_ERROR_RETURN((LM_ERROR,
    2526             :                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
    2527             :                         ACE_TEXT("send_control failed.\n")),
    2528             :                        false);
    2529             :     }
    2530           0 :   }
    2531           0 :   last_liveliness_activity_time_ = now;
    2532           0 :   return true;
    2533             : }
    2534             : 
    2535             : void
    2536           0 : DataWriterImpl::prepare_to_delete()
    2537             : {
    2538           0 :   const Observer_rch observer = get_observer(Observer::e_DELETED);
    2539           0 :   if (observer) {
    2540           0 :     observer->on_deleted(this);
    2541             :   }
    2542             : 
    2543           0 :   this->set_deleted(true);
    2544           0 :   this->stop_associating();
    2545           0 :   this->terminate_send_if_suspended();
    2546             : 
    2547             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    2548             :   // Trigger data to be persisted, i.e. made durable, if so
    2549             :   // configured. This needs be called before unregister_instances
    2550             :   // because unregister_instances may cause instance dispose.
    2551           0 :   if (!persist_data() && DCPS_debug_level >= 2) {
    2552           0 :     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::prepare_to_delete: ")
    2553             :       ACE_TEXT("failed to make data durable.\n")));
    2554             :   }
    2555             : #endif
    2556             : 
    2557             :   // Unregister all registered instances prior to deletion.
    2558           0 :   unregister_instances(SystemTimePoint::now().to_dds_time());
    2559           0 : }
    2560             : 
    2561             : PublicationInstance_rch
    2562           0 : DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
    2563             : {
    2564             : 
    2565           0 :   if (0 != data_container_) {
    2566           0 :     return data_container_->get_handle_instance(handle);
    2567             :   }
    2568             : 
    2569           0 :   return PublicationInstance_rch();
    2570             : }
    2571             : 
    2572             : void
    2573           0 : DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
    2574             : {
    2575             :   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
    2576             : 
    2577           0 :   if (!is_bit_) {
    2578             :     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
    2579             :     // is given to this DataWriter then narrow() fails.
    2580           0 :     DataWriterListener_var the_listener = get_ext_listener();
    2581             : 
    2582           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2583           0 :       PublicationDisconnectedStatus status;
    2584             :       // Since this callback may come after remove_association which
    2585             :       // removes the reader from id_to_handle map, we can ignore this
    2586             :       // error.
    2587           0 :       this->lookup_instance_handles(subids,
    2588             :                                     status.subscription_handles);
    2589           0 :       the_listener->on_publication_disconnected(this, status);
    2590           0 :     }
    2591           0 :   }
    2592           0 : }
    2593             : 
    2594             : void
    2595           0 : DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
    2596             : {
    2597             :   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
    2598             : 
    2599           0 :   if (!is_bit_) {
    2600             :     // Narrow to DDS::DCPS::DataWriterListener. If a
    2601             :     // DDS::DataWriterListener is given to this DataWriter then
    2602             :     // narrow() fails.
    2603           0 :     DataWriterListener_var the_listener = get_ext_listener();
    2604             : 
    2605           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2606           0 :       PublicationDisconnectedStatus status;
    2607             : 
    2608             :       // If it's reconnected then the reader should be in id_to_handle
    2609           0 :       this->lookup_instance_handles(subids, status.subscription_handles);
    2610             : 
    2611           0 :       the_listener->on_publication_reconnected(this, status);
    2612           0 :     }
    2613           0 :   }
    2614           0 : }
    2615             : 
    2616             : void
    2617           0 : DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
    2618             : {
    2619             :   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
    2620             : 
    2621           0 :   if (!is_bit_) {
    2622             :     // Narrow to DDS::DCPS::DataWriterListener. If a
    2623             :     // DDS::DataWriterListener is given to this DataWriter then
    2624             :     // narrow() fails.
    2625           0 :     DataWriterListener_var the_listener = get_ext_listener();
    2626             : 
    2627           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2628           0 :       PublicationLostStatus status;
    2629             : 
    2630             :       // Since this callback may come after remove_association which removes
    2631             :       // the reader from id_to_handle map, we can ignore this error.
    2632           0 :       this->lookup_instance_handles(subids,
    2633             :                                     status.subscription_handles);
    2634           0 :       the_listener->on_publication_lost(this, status);
    2635           0 :     }
    2636           0 :   }
    2637           0 : }
    2638             : 
    2639             : void
    2640           0 : DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
    2641             : {
    2642             :   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
    2643             : 
    2644           0 :   if (!is_bit_) {
    2645             :     // Narrow to DDS::DCPS::DataWriterListener. If a
    2646             :     // DDS::DataWriterListener is given to this DataWriter then
    2647             :     // narrow() fails.
    2648           0 :     DataWriterListener_var the_listener = get_ext_listener();
    2649             : 
    2650           0 :     if (!CORBA::is_nil(the_listener.in())) {
    2651           0 :       PublicationLostStatus status;
    2652             : 
    2653           0 :       CORBA::ULong len = handles.length();
    2654           0 :       status.subscription_handles.length(len);
    2655             : 
    2656           0 :       for (CORBA::ULong i = 0; i < len; ++ i) {
    2657           0 :         status.subscription_handles[i] = handles[i];
    2658             :       }
    2659             : 
    2660           0 :       the_listener->on_publication_lost(this, status);
    2661           0 :     }
    2662           0 :   }
    2663           0 : }
    2664             : 
    2665             : 
    2666             : void
    2667           0 : DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
    2668             :                                         DDS::InstanceHandleSeq & hdls)
    2669             : {
    2670           0 :   CORBA::ULong const num_rds = ids.length();
    2671           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
    2672             : 
    2673           0 :   if (!participant)
    2674           0 :     return;
    2675             : 
    2676           0 :   if (DCPS_debug_level > 9) {
    2677           0 :     OPENDDS_STRING separator;
    2678           0 :     OPENDDS_STRING buffer;
    2679             : 
    2680           0 :     for (CORBA::ULong i = 0; i < num_rds; ++i) {
    2681           0 :       buffer += separator + LogGuid(ids[i]).conv_;
    2682           0 :       separator = ", ";
    2683             :     }
    2684             : 
    2685           0 :     ACE_DEBUG((LM_DEBUG,
    2686             :                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
    2687             :                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
    2688             :                buffer.c_str()));
    2689           0 :   }
    2690             : 
    2691           0 :   hdls.length(num_rds);
    2692             : 
    2693           0 :   for (CORBA::ULong i = 0; i < num_rds; ++i) {
    2694           0 :     hdls[i] = participant->lookup_handle(ids[i]);
    2695             :   }
    2696           0 : }
    2697             : 
    2698             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    2699             : bool
    2700           0 : DataWriterImpl::persist_data()
    2701             : {
    2702           0 :   return this->data_container_->persist_data();
    2703             : }
    2704             : #endif
    2705             : 
    2706           0 : void DataWriterImpl::wait_pending()
    2707             : {
    2708           0 :   if (!TransportRegistry::instance()->released()) {
    2709           0 :     data_container_->wait_pending(wait_pending_deadline_);
    2710           0 :     controlTracker.wait_messages_pending("DataWriterImpl::wait_pending", wait_pending_deadline_);
    2711             :   }
    2712           0 : }
    2713             : 
    2714             : void
    2715           0 : DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
    2716             : {
    2717           0 :   this->data_container_->get_instance_handles(instance_handles);
    2718           0 : }
    2719             : 
    2720             : void
    2721           0 : DataWriterImpl::get_readers(RepoIdSet& readers)
    2722             : {
    2723           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
    2724           0 :   readers = this->readers_;
    2725           0 : }
    2726             : 
    2727             : void
    2728           0 : DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
    2729             : {
    2730           0 :   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
    2731           0 :   if (publisher) {
    2732           0 :     publisher->get_qos(qos_data.pub_qos);
    2733             :   }
    2734           0 :   qos_data.dw_qos = this->qos_;
    2735           0 :   qos_data.topic_name = this->topic_name_.in();
    2736           0 : }
    2737             : 
    2738             : #if defined(OPENDDS_SECURITY)
    2739           0 : DDS::Security::ParticipantCryptoHandle DataWriterImpl::get_crypto_handle() const
    2740             : {
    2741           0 :   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
    2742           0 :   return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
    2743           0 : }
    2744             : #endif
    2745             : 
    2746             : bool
    2747           0 : DataWriterImpl::need_sequence_repair()
    2748             : {
    2749           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
    2750           0 :   return need_sequence_repair_i();
    2751           0 : }
    2752             : 
    2753             : bool
    2754           0 : DataWriterImpl::need_sequence_repair_i() const
    2755             : {
    2756           0 :   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
    2757           0 :        end = reader_info_.end(); it != end; ++it) {
    2758           0 :     if (it->second.expected_sequence_ != sequence_number_) {
    2759           0 :       return true;
    2760             :     }
    2761             :   }
    2762             : 
    2763           0 :   return false;
    2764             : }
    2765             : 
    2766             : SendControlStatus
    2767           0 : DataWriterImpl::send_control(const DataSampleHeader& header,
    2768             :                              Message_Block_Ptr msg)
    2769             : {
    2770           0 :   controlTracker.message_sent();
    2771             : 
    2772           0 :   SendControlStatus status = TransportClient::send_control(header, move(msg));
    2773             : 
    2774           0 :   if (status != SEND_CONTROL_OK) {
    2775           0 :     controlTracker.message_dropped();
    2776             :   }
    2777             : 
    2778           0 :   return status;
    2779             : }
    2780             : 
    2781             : WeakRcHandle<ICE::Endpoint>
    2782           0 : DataWriterImpl::get_ice_endpoint()
    2783             : {
    2784           0 :   return TransportClient::get_ice_endpoint();
    2785             : }
    2786             : 
    2787           0 : void DataWriterImpl::set_wait_pending_deadline(const MonotonicTimePoint& deadline)
    2788             : {
    2789           0 :   wait_pending_deadline_ = deadline;
    2790           0 : }
    2791             : 
    2792           0 : int LivenessTimer::handle_timeout(const ACE_Time_Value& tv, const void* arg)
    2793             : {
    2794           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    2795             : 
    2796           0 :   DataWriterImpl_rch writer = this->writer_.lock();
    2797           0 :   if (writer) {
    2798           0 :     writer->handle_timeout(tv, arg);
    2799             :   } else {
    2800           0 :     this->reactor()->cancel_timer(this);
    2801             :   }
    2802           0 :   return 0;
    2803           0 : }
    2804             : 
    2805           0 : void DataWriterImpl::transport_discovery_change()
    2806             : {
    2807           0 :   populate_connection_info();
    2808           0 :   const TransportLocatorSeq& trans_conf_info = connection_info();
    2809             : 
    2810           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
    2811           0 :   const GUID_t dp_id_copy = dp_id_;
    2812           0 :   const GUID_t publication_id_copy = publication_id_;
    2813           0 :   const int domain_id = domain_id_;
    2814           0 :   guard.release();
    2815             : 
    2816           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id);
    2817           0 :   disco->update_publication_locators(domain_id,
    2818             :                                      dp_id_copy,
    2819             :                                      publication_id_copy,
    2820             :                                      trans_conf_info);
    2821           0 : }
    2822             : 
    2823           0 : DDS::ReturnCode_t DataWriterImpl::setup_serialization()
    2824             : {
    2825           0 :   if (qos_.representation.value.length() > 0 &&
    2826           0 :       qos_.representation.value[0] != UNALIGNED_CDR_DATA_REPRESENTATION) {
    2827             :     // If the QoS explicitly sets XCDR, XCDR2, or XML, force encapsulation
    2828           0 :     cdr_encapsulation(true);
    2829             :   }
    2830             : 
    2831           0 :   if (cdr_encapsulation()) {
    2832             :     Encoding::Kind encoding_kind;
    2833             :     // There should only be one data representation in a DataWriter, so
    2834             :     // simply use qos_.representation.value[0].
    2835           0 :     if (repr_to_encoding_kind(qos_.representation.value[0], encoding_kind)) {
    2836           0 :       encoding_mode_ = EncodingMode(type_support_, encoding_kind, swap_bytes());
    2837           0 :       if (encoding_kind == Encoding::KIND_XCDR1 &&
    2838           0 :           type_support_->max_extensibility() == MUTABLE) {
    2839           0 :         if (log_level >= LogLevel::Notice) {
    2840           0 :           ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
    2841             :             "Encountered unsupported combination of XCDR1 encoding and mutable extensibility "
    2842             :             "for writer of type %C\n",
    2843             :             type_support_->name()));
    2844             :         }
    2845           0 :         return DDS::RETCODE_ERROR;
    2846           0 :       } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR) {
    2847           0 :         if (log_level >= LogLevel::Notice) {
    2848           0 :           ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
    2849             :             "Unaligned CDR is not supported by transport types that require encapsulation\n"));
    2850             :         }
    2851           0 :         return DDS::RETCODE_ERROR;
    2852             :       }
    2853           0 :     } else if (log_level >= LogLevel::Warning) {
    2854           0 :       ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::setup_serialization: "
    2855             :                  "Encountered unsupported or unknown data representation: %C ",
    2856             :                  "for writer of type %C\n",
    2857             :                  repr_to_string(qos_.representation.value[0]).c_str(),
    2858             :                  type_support_->name()));
    2859             :     }
    2860             :   } else {
    2861             :     // Pick unaligned CDR as it is the implicit representation for non-encapsulated
    2862           0 :     encoding_mode_ = EncodingMode(type_support_, Encoding::KIND_UNALIGNED_CDR, swap_bytes());
    2863             :   }
    2864           0 :   if (!encoding_mode_.valid()) {
    2865           0 :     if (log_level >= LogLevel::Notice) {
    2866           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
    2867             :                  "Could not find a valid data representation\n"));
    2868             :     }
    2869           0 :     return DDS::RETCODE_ERROR;
    2870             :   }
    2871             : 
    2872           0 :   if (DCPS_debug_level >= 2) {
    2873           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) WriterImpl::setup_serialization: "
    2874             :       "Setup successfully with %C data representation.\n",
    2875             :       Encoding::kind_to_string(encoding_mode_.encoding().kind()).c_str()));
    2876             :   }
    2877             : 
    2878             :   // Set up allocator with reserved space for data if it is bounded
    2879           0 :   const SerializedSizeBound buffer_size_bound = encoding_mode_.buffer_size_bound();
    2880           0 :   if (buffer_size_bound) {
    2881           0 :     const size_t chunk_size = buffer_size_bound.get();
    2882           0 :     data_allocator_.reset(new DataAllocator(n_chunks_, chunk_size));
    2883           0 :     if (DCPS_debug_level >= 2) {
    2884           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
    2885             :         "using data allocator at %x with %B %B byte chunks\n",
    2886             :         data_allocator_.get(),
    2887             :         n_chunks_,
    2888             :         chunk_size));
    2889             :     }
    2890           0 :   } else if (DCPS_debug_level >= 2) {
    2891           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
    2892             :       "sample size is unbounded, not using data allocator, "
    2893             :       "always allocating from heap\n"));
    2894             :   }
    2895           0 :   return DDS::RETCODE_OK;
    2896             : }
    2897             : 
    2898           0 : DDS::ReturnCode_t DataWriterImpl::get_key_value(Sample_rch& sample, DDS::InstanceHandle_t handle)
    2899             : {
    2900           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
    2901           0 :   const InstanceHandlesToValues::iterator it = instance_handles_to_values_.find(handle);
    2902           0 :   if (it == instance_handles_to_values_.end()) {
    2903           0 :     return DDS::RETCODE_BAD_PARAMETER;
    2904             :   }
    2905           0 :   sample = it->second->copy(Sample::Mutable);
    2906           0 :   return DDS::RETCODE_OK;
    2907           0 : }
    2908             : 
    2909           0 : DDS::InstanceHandle_t DataWriterImpl::lookup_instance(const Sample& sample)
    2910             : {
    2911           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
    2912           0 :   const InstanceValuesToHandles::iterator it = find_instance(sample);
    2913           0 :   return it == instance_values_to_handles_.end() ? DDS::HANDLE_NIL : it->second;
    2914           0 : }
    2915             : 
    2916           0 : DDS::InstanceHandle_t DataWriterImpl::register_instance_w_timestamp(
    2917             :   const Sample& sample, const DDS::Time_t& timestamp)
    2918             : {
    2919           0 :   DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
    2920           0 :   const DDS::ReturnCode_t ret = get_or_create_instance_handle(registered_handle, sample, timestamp);
    2921           0 :   if (ret != DDS::RETCODE_OK && log_level >= LogLevel::Notice) {
    2922           0 :     ACE_ERROR((LM_NOTICE, ACE_TEXT("(%P|%t) NOTICE: DataWriterImpl::register_instance_w_timestamp: ")
    2923             :                ACE_TEXT("register failed: %C\n"),
    2924             :                retcode_to_string(ret)));
    2925             :   }
    2926           0 :   return registered_handle;
    2927             : }
    2928             : 
    2929           0 : DDS::ReturnCode_t DataWriterImpl::unregister_instance_w_timestamp(
    2930             :   const Sample& sample,
    2931             :   DDS::InstanceHandle_t instance_handle,
    2932             :   const DDS::Time_t& timestamp)
    2933             : {
    2934           0 :   const DDS::ReturnCode_t rc = instance_must_exist(
    2935             :     "unregister_instance_w_timestamp", sample, instance_handle, /* remove = */ true);
    2936           0 :   if (rc != DDS::RETCODE_OK) {
    2937           0 :     return rc;
    2938             :   }
    2939           0 :   return unregister_instance_i(instance_handle, timestamp);
    2940             : }
    2941             : 
    2942           0 : DDS::ReturnCode_t DataWriterImpl::dispose_w_timestamp(
    2943             :   const Sample& sample,
    2944             :   DDS::InstanceHandle_t instance_handle,
    2945             :   const DDS::Time_t& source_timestamp)
    2946             : {
    2947             : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
    2948           0 :   DDS::DynamicData_var dynamic_data = sample.get_dynamic_data(dynamic_type_);
    2949           0 :   DDS::Security::SecurityException ex;
    2950           0 :   if (dynamic_data && security_config_ &&
    2951           0 :       participant_permissions_handle_ != DDS::HANDLE_NIL &&
    2952           0 :       !security_config_->get_access_control()->check_local_datawriter_dispose_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
    2953           0 :     if (log_level >= LogLevel::Notice) {
    2954           0 :       ACE_ERROR((LM_NOTICE,
    2955             :                  "(%P|%t) NOTICE: DataWriterImpl::dispose_w_timestamp: unable to dispose instance SecurityException[%d.%d]: %C\n",
    2956             :                  ex.code, ex.minor_code, ex.message.in()));
    2957             :     }
    2958           0 :     return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    2959             :   }
    2960             : #endif
    2961             : 
    2962           0 :   const DDS::ReturnCode_t rc = instance_must_exist(
    2963             :     "dispose_w_timestamp", sample, instance_handle);
    2964           0 :   if (rc != DDS::RETCODE_OK) {
    2965           0 :     return rc;
    2966             :   }
    2967           0 :   return dispose(instance_handle, source_timestamp);
    2968           0 : }
    2969             : 
    2970           0 : ACE_Message_Block* DataWriterImpl::serialize_sample(const Sample& sample)
    2971             : {
    2972           0 :   const bool encapsulated = cdr_encapsulation();
    2973           0 :   const Encoding& encoding = encoding_mode_.encoding();
    2974           0 :   Message_Block_Ptr mb;
    2975             :   ACE_Message_Block* tmp_mb;
    2976             : 
    2977             :   // Don't use the cached allocator for the registered sample message
    2978             :   // block.
    2979           0 :   if (sample.key_only() && !skip_serialize_) {
    2980           0 :     ACE_NEW_RETURN(tmp_mb,
    2981             :       ACE_Message_Block(
    2982             :         encoding_mode_.buffer_size(sample),
    2983             :         ACE_Message_Block::MB_DATA,
    2984             :         0, // cont
    2985             :         0, // data
    2986             :         0, // alloc_strategy
    2987             :         get_db_lock()),
    2988             :       0);
    2989             :   } else {
    2990           0 :     ACE_NEW_MALLOC_RETURN(tmp_mb,
    2991             :       static_cast<ACE_Message_Block*>(
    2992             :         mb_allocator_->malloc(sizeof(ACE_Message_Block))),
    2993             :       ACE_Message_Block(
    2994             :         encoding_mode_.buffer_size(sample),
    2995             :         ACE_Message_Block::MB_DATA,
    2996             :         0, // cont
    2997             :         0, // data
    2998             :         data_allocator_.get(), // allocator_strategy
    2999             :         get_db_lock(), // data block locking_strategy
    3000             :         ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
    3001             :         ACE_Time_Value::zero,
    3002             :         ACE_Time_Value::max_time,
    3003             :         db_allocator_.get(),
    3004             :         mb_allocator_.get()),
    3005             :       0);
    3006             :   }
    3007           0 :   mb.reset(tmp_mb);
    3008             : 
    3009           0 :   if (skip_serialize_) {
    3010           0 :     if (!sample.to_message_block(*mb)) {
    3011           0 :       if (log_level >= LogLevel::Error) {
    3012           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
    3013             :                    "to_message_block failed\n"));
    3014             :       }
    3015           0 :       return 0;
    3016             :     }
    3017             :   } else {
    3018           0 :     Serializer serializer(mb.get(), encoding);
    3019           0 :     if (encapsulated) {
    3020           0 :       EncapsulationHeader encap;
    3021           0 :       if (!encap.from_encoding(encoding, type_support_->base_extensibility())) {
    3022             :         // from_encoding logged the error
    3023           0 :         return 0;
    3024             :       }
    3025           0 :       if (!(serializer << encap)) {
    3026           0 :         if (log_level >= LogLevel::Error) {
    3027           0 :           ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
    3028             :             "failed to serialize data encapsulation header\n"));
    3029             :         }
    3030           0 :         return 0;
    3031             :       }
    3032             :     }
    3033           0 :     if (!sample.serialize(serializer)) {
    3034           0 :       if (log_level >= LogLevel::Error) {
    3035           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
    3036             :           "failed to serialize sample data\n"));
    3037             :       }
    3038           0 :       return 0;
    3039             :     }
    3040           0 :     if (encapsulated && !EncapsulationHeader::set_encapsulation_options(mb)) {
    3041           0 :       if (log_level >= LogLevel::Error) {
    3042           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
    3043             :           "set_encapsulation_options failed\n"));
    3044             :       }
    3045           0 :       return 0;
    3046             :     }
    3047           0 :   }
    3048             : 
    3049           0 :   return mb.release();
    3050           0 : }
    3051             : 
    3052           0 : bool DataWriterImpl::insert_instance(DDS::InstanceHandle_t handle, Sample_rch& sample)
    3053             : {
    3054           0 :   OPENDDS_ASSERT(sample->key_only());
    3055           0 :   if (!instance_handles_to_values_.insert(
    3056           0 :         InstanceHandlesToValues::value_type(handle, sample)).second) {
    3057           0 :     return false;
    3058             :   }
    3059           0 :   if (!instance_values_to_handles_.insert(
    3060           0 :         InstanceValuesToHandles::value_type(sample, handle)).second) {
    3061           0 :     instance_handles_to_values_.erase(handle);
    3062           0 :     return false;
    3063             :   }
    3064           0 :   return true;
    3065             : }
    3066             : 
    3067             : DataWriterImpl::InstanceValuesToHandles::iterator
    3068           0 : DataWriterImpl::find_instance(const Sample& sample)
    3069             : {
    3070           0 :   Sample_rch dummy_rch(const_cast<Sample*>(&sample), keep_count());
    3071           0 :   InstanceValuesToHandles::iterator pos = instance_values_to_handles_.find(dummy_rch);
    3072           0 :   dummy_rch._retn();
    3073           0 :   return pos;
    3074           0 : }
    3075             : 
    3076           0 : DDS::ReturnCode_t DataWriterImpl::get_or_create_instance_handle(
    3077             :   DDS::InstanceHandle_t& handle,
    3078             :   const Sample& sample,
    3079             :   const DDS::Time_t& source_timestamp)
    3080             : {
    3081           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
    3082             : 
    3083           0 :   handle = lookup_instance(sample);
    3084           0 :   if (handle == DDS::HANDLE_NIL || !get_handle_instance(handle)) {
    3085           0 :     Sample_rch copy = sample.copy(Sample::ReadOnly, Sample::KeyOnly);
    3086             : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
    3087           0 :     DDS::DynamicData_var dynamic_data = copy->get_dynamic_data(dynamic_type_);
    3088           0 :     DDS::Security::SecurityException ex;
    3089           0 :     if (dynamic_data && security_config_ &&
    3090           0 :         participant_permissions_handle_ != DDS::HANDLE_NIL &&
    3091           0 :         !security_config_->get_access_control()->check_local_datawriter_register_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
    3092           0 :       if (log_level >= LogLevel::Notice) {
    3093           0 :         ACE_ERROR((LM_NOTICE,
    3094             :                    "(%P|%t) NOTICE: DataWriterImpl::get_or_create_instance_handle: unable to register instance SecurityException[%d.%d]: %C\n",
    3095             :                    ex.code, ex.minor_code, ex.message.in()));
    3096             :       }
    3097           0 :       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    3098             :     }
    3099             : #endif
    3100             : 
    3101             :     // don't use fast allocator for registration.
    3102           0 :     const TypeSupportImpl* const ts = get_type_support();
    3103           0 :     Message_Block_Ptr serialized(serialize_sample(*copy));
    3104           0 :     if (!serialized) {
    3105           0 :       if (log_level >= LogLevel::Notice) {
    3106           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
    3107             :           "failed to serialize sample\n", ts->name()));
    3108             :       }
    3109           0 :       return DDS::RETCODE_ERROR;
    3110             :     }
    3111             : 
    3112             :     // tell DataWriterLocal and Publisher about the instance.
    3113           0 :     const DDS::ReturnCode_t ret = register_instance_i(handle, move(serialized), source_timestamp);
    3114             :     // note: the WriteDataContainer/PublicationInstance maintains ownership
    3115             :     // of the marshalled sample.
    3116           0 :     if (ret != DDS::RETCODE_OK) {
    3117           0 :       handle = DDS::HANDLE_NIL;
    3118           0 :       return ret;
    3119             :     }
    3120             : 
    3121           0 :     if (!insert_instance(handle, copy)) {
    3122           0 :       handle = DDS::HANDLE_NIL;
    3123           0 :       if (log_level >= LogLevel::Notice) {
    3124           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
    3125             :            "insert instance failed\n", ts->name()));
    3126             :       }
    3127           0 :       return DDS::RETCODE_ERROR;
    3128             :     }
    3129             : 
    3130           0 :     send_all_to_flush_control(guard);
    3131           0 :   }
    3132             : 
    3133           0 :   return DDS::RETCODE_OK;
    3134           0 : }
    3135             : 
    3136           0 : DDS::ReturnCode_t DataWriterImpl::instance_must_exist(
    3137             :   const char* const method_name,
    3138             :   const Sample& sample,
    3139             :   DDS::InstanceHandle_t& instance_handle,
    3140             :   bool remove)
    3141             : {
    3142           0 :   OPENDDS_ASSERT(sample.key_only());
    3143             : 
    3144           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
    3145             : 
    3146           0 :   const InstanceValuesToHandles::iterator pos = find_instance(sample);
    3147           0 :   if (pos == instance_values_to_handles_.end()) {
    3148           0 :     if (log_level >= LogLevel::Notice) {
    3149           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::%C: "
    3150             :         "The instance sample is not registered\n",
    3151             :         method_name));
    3152             :     }
    3153           0 :     return DDS::RETCODE_ERROR;
    3154             :   }
    3155             : 
    3156           0 :   if (instance_handle != DDS::HANDLE_NIL && instance_handle != pos->second) {
    3157           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    3158             :   }
    3159             : 
    3160           0 :   instance_handle = pos->second;
    3161             : 
    3162           0 :   if (remove) {
    3163           0 :     instance_values_to_handles_.erase(pos);
    3164           0 :     instance_handles_to_values_.erase(instance_handle);
    3165             :   }
    3166             : 
    3167           0 :   return DDS::RETCODE_OK;
    3168           0 : }
    3169             : 
    3170           0 : DDS::ReturnCode_t DataWriterImpl::write_w_timestamp(
    3171             :   const Sample& sample,
    3172             :   DDS::InstanceHandle_t handle,
    3173             :   const DDS::Time_t& source_timestamp)
    3174             : {
    3175             :   // This operation assumes the provided handle is valid. The handle provided
    3176             :   // will not be verified.
    3177             : 
    3178           0 :   if (handle == DDS::HANDLE_NIL) {
    3179           0 :     DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
    3180             :     const DDS::ReturnCode_t ret =
    3181           0 :       get_or_create_instance_handle(registered_handle, sample, source_timestamp);
    3182           0 :     if (ret != DDS::RETCODE_OK) {
    3183           0 :       if (log_level >= LogLevel::Notice) {
    3184           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::write_w_timestamp: "
    3185             :                    "register failed: %C\n",
    3186             :                    get_type_support()->name(),
    3187             :                    retcode_to_string(ret)));
    3188             :       }
    3189           0 :       return ret;
    3190             :     }
    3191             : 
    3192           0 :     handle = registered_handle;
    3193             :   }
    3194             : 
    3195             :   // list of reader GUID_ts that should not get data
    3196           0 :   GUIDSeq_var filter_out;
    3197             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    3198           0 :   if (TheServiceParticipant->publisher_content_filter()) {
    3199           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_, DDS::RETCODE_ERROR);
    3200           0 :     for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
    3201           0 :          end = reader_info_.end(); iter != end; ++iter) {
    3202           0 :       const ReaderInfo& ri = iter->second;
    3203           0 :       if (!ri.eval_.is_nil()) {
    3204           0 :         if (!filter_out.ptr()) {
    3205           0 :           filter_out = new OpenDDS::DCPS::GUIDSeq;
    3206             :         }
    3207           0 :         if (!sample.eval(*ri.eval_, ri.expression_params_)) {
    3208           0 :           push_back(filter_out.inout(), iter->first);
    3209             :         }
    3210             :       }
    3211             :     }
    3212           0 :   }
    3213             : #endif
    3214             : 
    3215           0 :   return write_sample(sample, handle, source_timestamp, filter_out._retn());
    3216           0 : }
    3217             : 
    3218           0 : DDS::ReturnCode_t DataWriterImpl::write_sample(
    3219             :   const Sample& sample,
    3220             :   DDS::InstanceHandle_t handle,
    3221             :   const DDS::Time_t& source_timestamp,
    3222             :   GUIDSeq* filter_out)
    3223             : {
    3224           0 :   Message_Block_Ptr serialized(serialize_sample(sample));
    3225           0 :   if (!serialized) {
    3226           0 :     if (log_level >= LogLevel::Notice) {
    3227           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::write_sample: "
    3228             :         "failed to serialize sample\n"));
    3229             :     }
    3230           0 :     return DDS::RETCODE_ERROR;
    3231             :   }
    3232             : 
    3233           0 :   return write(move(serialized), handle, source_timestamp, filter_out, sample.native_data());
    3234           0 : }
    3235             : 
    3236             : } // namespace DCPS
    3237             : } // namespace OpenDDS
    3238             : 
    3239             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16