LCOV - code coverage report
Current view: top level - DCPS - RecorderImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 380 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 33 0.0 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       7             : 
       8             : #include "RecorderImpl.h"
       9             : 
      10             : #include "SubscriptionInstance.h"
      11             : #include "ReceivedDataElementList.h"
      12             : #include "DomainParticipantImpl.h"
      13             : #include "Service_Participant.h"
      14             : #include "Qos_Helper.h"
      15             : #include "FeatureDisabledQosCheck.h"
      16             : #include "GuidConverter.h"
      17             : #include "Serializer.h"
      18             : #include "SubscriberImpl.h"
      19             : #include "Transient_Kludge.h"
      20             : #include "Util.h"
      21             : #include "QueryConditionImpl.h"
      22             : #include "ReadConditionImpl.h"
      23             : #include "MonitorFactory.h"
      24             : #include "SafetyProfileStreams.h"
      25             : #include "TypeSupportImpl.h"
      26             : #include "PoolAllocator.h"
      27             : #include "DCPS_Utils.h"
      28             : #ifndef DDS_HAS_MINIMUM_BIT
      29             : #  include "BuiltInTopicUtils.h"
      30             : #endif
      31             : #include "XTypes/DynamicDataXcdrReadImpl.h"
      32             : #include "transport/framework/EntryExit.h"
      33             : #include "transport/framework/TransportExceptions.h"
      34             : 
      35             : #include <dds/DdsDcpsCoreC.h>
      36             : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
      37             : #ifndef DDS_HAS_MINIMUM_BIT
      38             : #  include <dds/DdsDcpsCoreTypeSupportC.h>
      39             : #endif
      40             : 
      41             : #include <tao/ORB_Core.h>
      42             : 
      43             : #include <ace/Reactor.h>
      44             : 
      45             : #include <stdexcept>
      46             : 
      47             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      48             : 
      49             : namespace OpenDDS {
      50             : namespace DCPS {
      51             : 
      52           0 : RecorderImpl::RecorderImpl()
      53           0 :   : qos_(TheServiceParticipant->initial_DataReaderQos())
      54           0 :   , participant_servant_(0)
      55           0 :   , topic_servant_(0)
      56             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
      57           0 :   , is_exclusive_ownership_(false)
      58             : #endif
      59             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
      60           0 :   , owner_manager_(0)
      61             : #endif
      62           0 :   , subqos_(TheServiceParticipant->initial_SubscriberQos())
      63           0 :   , topic_desc_(0)
      64           0 :   , listener_mask_(DEFAULT_STATUS_MASK)
      65           0 :   , domain_id_(0)
      66           0 :   , is_bit_(false)
      67           0 :   , check_encap_(true)
      68           0 :   , mb_alloc_(DEFAULT_TRANSPORT_RECEIVE_BUFFERS)
      69             : {
      70           0 :   requested_incompatible_qos_status_.total_count = 0;
      71           0 :   requested_incompatible_qos_status_.total_count_change = 0;
      72           0 :   requested_incompatible_qos_status_.last_policy_id = 0;
      73           0 :   requested_incompatible_qos_status_.policies.length(0);
      74             : 
      75           0 :   subscription_match_status_.total_count = 0;
      76           0 :   subscription_match_status_.total_count_change = 0;
      77           0 :   subscription_match_status_.current_count = 0;
      78           0 :   subscription_match_status_.current_count_change = 0;
      79           0 :   subscription_match_status_.last_publication_handle = DDS::HANDLE_NIL;
      80           0 : }
      81             : 
      82             : // This method is called when there are no longer any reference to the
      83             : // the servant.
      84           0 : RecorderImpl::~RecorderImpl()
      85             : {
      86             :   DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
      87           0 : }
      88             : 
      89             : 
      90             : DDS::ReturnCode_t
      91           0 : RecorderImpl::cleanup()
      92             : {
      93             : 
      94           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
      95           0 :   if (!disco || !disco->remove_subscription(domain_id_,
      96           0 :                                   participant_servant_->get_id(),
      97           0 :                                   subscription_id_)) {
      98           0 :     if (log_level >= LogLevel::Notice) {
      99           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::cleanup: "
     100             :         "could not remove subscription from discovery\n"));
     101             :     }
     102           0 :     return DDS::RETCODE_ERROR;
     103             :   }
     104             : 
     105             :   // Call remove association before unregistering the datareader from the transport,
     106             :   // otherwise some callbacks resulted from remove_association may lost.
     107             : 
     108           0 :   remove_all_associations();
     109             : 
     110           0 :   return DDS::RETCODE_OK;
     111           0 : }
     112             : 
     113           0 : void RecorderImpl::init(
     114             :   TopicDescriptionImpl*      a_topic_desc,
     115             :   const DDS::DataReaderQos & qos,
     116             :   RecorderListener_rch       a_listener,
     117             :   const DDS::StatusMask &    mask,
     118             :   DomainParticipantImpl*     participant,
     119             :   DDS::SubscriberQos         subqos)
     120             : {
     121           0 :   if (DCPS_debug_level >= 2) {
     122           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::init\n"));
     123             :   }
     124             : 
     125             : 
     126           0 :   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
     127           0 :   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
     128           0 :     topic_servant_ = a_topic;
     129             :   }
     130             : 
     131           0 :   CORBA::String_var topic_name = a_topic_desc->get_name();
     132           0 :   qos_ = qos;
     133           0 :   passed_qos_ = qos;
     134             : 
     135             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     136           0 :   is_exclusive_ownership_ = qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
     137             : #endif
     138             : 
     139           0 :   listener_ = a_listener;
     140           0 :   listener_mask_ = mask;
     141             : 
     142             :   // Only store the participant pointer, since it is our "grand"
     143             :   // parent, we will exist as long as it does
     144           0 :   participant_servant_ = participant;
     145             : 
     146             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     147           0 :   if (is_exclusive_ownership_) {
     148           0 :     owner_manager_ = participant_servant_->ownership_manager ();
     149             :   }
     150             : #endif
     151             : 
     152           0 :   domain_id_ = participant_servant_->get_domain_id();
     153           0 :   subqos_ = subqos;
     154           0 : }
     155             : 
     156           0 : bool RecorderImpl::check_transport_qos(const TransportInst& ti)
     157             : {
     158           0 :   if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
     159           0 :     return ti.is_reliable();
     160             :   }
     161           0 :   return true;
     162             : }
     163             : 
     164           0 : GUID_t RecorderImpl::get_guid() const
     165             : {
     166           0 :   return subscription_id_;
     167             : }
     168             : 
     169           0 : CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
     170             : {
     171           0 :   return data.publication_transport_priority_;
     172             : }
     173             : 
     174             : 
     175           0 : void RecorderImpl::data_received(const ReceivedDataSample& sample)
     176             : {
     177             :   DBG_ENTRY_LVL("RecorderImpl","data_received",6);
     178             : 
     179             :   // Ensure some other thread is not changing the sample container
     180             :   // or statuses related to samples.
     181           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     182             : 
     183           0 :   if (DCPS_debug_level >= 8) {
     184           0 :     ACE_DEBUG((LM_DEBUG,
     185             :                "(%P|%t) RecorderImpl::data_received: "
     186             :                "%C received sample: %C\n",
     187             :                LogGuid(subscription_id_).c_str(),
     188             :                to_string(sample.header_).c_str()));
     189             :   }
     190             : 
     191             :   // we only support SAMPLE_DATA messages
     192           0 :   if (sample.header_.message_id_ == SAMPLE_DATA && listener_.in()) {
     193           0 :     Message_Block_Ptr payload(sample.data(&mb_alloc_));
     194           0 :     Encoding::Kind kind = Encoding::KIND_UNALIGNED_CDR;
     195           0 :     if (sample.header_.cdr_encapsulation_ && check_encap_) {
     196           0 :       Encoding enc;
     197           0 :       Serializer ser(payload.get(), enc);
     198           0 :       EncapsulationHeader encap;
     199           0 :       if (ser >> encap && encap.to_any_encoding(enc)) {
     200           0 :         kind = enc.kind();
     201             :       }
     202           0 :     }
     203           0 :     RawDataSample rawSample(sample.header_,
     204           0 :                             static_cast<MessageId> (sample.header_.message_id_),
     205           0 :                             sample.header_.source_timestamp_sec_,
     206           0 :                             sample.header_.source_timestamp_nanosec_,
     207             :                             sample.header_.publication_id_,
     208           0 :                             sample.header_.byte_order_,
     209             :                             payload.get(),
     210           0 :                             kind);
     211           0 :     listener_->on_sample_data_received(this, rawSample);
     212           0 :   }
     213           0 : }
     214             : 
     215           0 : void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
     216             : {
     217           0 : }
     218             : 
     219           0 : void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
     220             : {
     221           0 : }
     222             : 
     223             : void
     224           0 : RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
     225             : {
     226           0 : }
     227             : 
     228           0 : void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
     229             : {
     230           0 : }
     231             : 
     232             : #ifndef OPENDDS_SAFETY_PROFILE
     233             : void
     234           0 : RecorderImpl::add_to_dynamic_type_map(const GUID_t& pub_id, const XTypes::TypeIdentifier& ti)
     235             : {
     236           0 :   XTypes::TypeLookupService_rch tls = participant_servant_->get_type_lookup_service();
     237           0 :   DDS::DynamicType_var dt = tls->type_identifier_to_dynamic(ti, pub_id);
     238           0 :   if (DCPS_debug_level >= 4) {
     239           0 :     ACE_DEBUG((LM_DEBUG,
     240             :                "(%P|%t) RecorderImpl::add_association: "
     241             :                "DynamicType added to map with guid: %C\n", LogGuid(pub_id).c_str()));
     242             :   }
     243           0 :   dt_map_.insert(std::make_pair(pub_id, dt));
     244           0 : }
     245             : #endif
     246             : 
     247             : void
     248           0 : RecorderImpl::add_association(const GUID_t&            yourId,
     249             :                               const WriterAssociation& writer,
     250             :                               bool                     active)
     251             : {
     252           0 :   if (DCPS_debug_level >= 4) {
     253           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::add_association: "
     254             :                "bit %d local %C remote %C\n",
     255             :                is_bit_,
     256             :                LogGuid(yourId).c_str(),
     257             :                LogGuid(writer.writerId).c_str()));
     258             :   }
     259             : 
     260             :   //
     261             :   // This block prevents adding associations to deleted readers.
     262             :   // Presumably this is a "good thing(tm)".
     263             :   //
     264             :   // if (entity_deleted_) {
     265             :   //   if (DCPS_debug_level >= 1)
     266             :   //     ACE_DEBUG((LM_DEBUG,
     267             :   //                ACE_TEXT("(%P|%t) RecorderImpl::add_association")
     268             :   //                ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
     269             :   //
     270             :   //   return;
     271             :   // }
     272             : 
     273             :   //
     274             :   // We are being called back from the repository before we are done
     275             :   // processing after our call to the repository that caused this call
     276             :   // (from the repository) to be made.
     277             :   //
     278           0 :   if (GUID_UNKNOWN == subscription_id_) {
     279             :     // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
     280           0 :     subscription_id_ = yourId;
     281             :   }
     282             : 
     283             :   //
     284             :   // We do the following while holding the publication_handle_lock_.
     285             :   //
     286             :   {
     287           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     288             : 
     289             :     //
     290             :     // For each writer in the list of writers to associate with, we
     291             :     // create a WriterInfo and a WriterStats object and store them in
     292             :     // our internal maps.
     293             :     //
     294             :     {
     295           0 :       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
     296             : 
     297           0 :       const GUID_t& writer_id = writer.writerId;
     298           0 :       RcHandle<WriterInfo> info ( make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos));
     299             :       /*std::pair<WriterMapType::iterator, bool> bpair =*/
     300           0 :       writers_.insert(
     301             :         // This insertion is idempotent.
     302           0 :         WriterMapType::value_type(
     303             :           writer_id,
     304             :           info));
     305             :       // statistics_.insert(
     306             :       //   StatsMapType::value_type(
     307             :       //     writer_id,
     308             :       //     WriterStats(
     309             :       //       raw_latency_buffer_size_,
     310             :       //       raw_latency_buffer_type_)));
     311             : 
     312             :       // if (DCPS_debug_level > 4) {
     313             :       //   GuidConverter converter(writer_id);
     314             :       //   ACE_DEBUG((LM_DEBUG,
     315             :       //              "(%P|%t) RecorderImpl::add_association: "
     316             :       //              "inserted writer %C.return %d\n",
     317             :       //              OPENDDS_STRING(converter).c_str(), bpair.second));
     318             :       //
     319             :       //   WriterMapType::iterator iter = writers_.find(writer_id);
     320             :       //   if (iter != writers_.end()) {
     321             :       //     // This may not be an error since it could happen that the sample
     322             :       //     // is delivered to the datareader after the write is dis-associated
     323             :       //     // with this datareader.
     324             :       //     GuidConverter reader_converter(subscription_id_);
     325             :       //     GuidConverter writer_converter(writer_id);
     326             :       //     ACE_DEBUG((LM_DEBUG,
     327             :       //               ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
     328             :       //               ACE_TEXT("reader %C is associated with writer %C.\n"),
     329             :       //               OPENDDS_STRING(reader_converter).c_str(),
     330             :       //               OPENDDS_STRING(writer_converter).c_str()));
     331             :       //   }
     332             :       // }
     333           0 :     }
     334             : 
     335             :     //
     336             :     // Propagate the add_associations processing down into the Transport
     337             :     // layer here.  This will establish the transport support and reserve
     338             :     // usage of an existing connection or initiate creation of a new
     339             :     // connection if no suitable connection is available.
     340             :     //
     341           0 :     AssociationData data;
     342           0 :     data.remote_id_ = writer.writerId;
     343           0 :     data.remote_data_ = writer.writerTransInfo;
     344           0 :     data.discovery_locator_ = writer.writerDiscInfo;
     345           0 :     data.remote_transport_context_ = writer.transportContext;
     346           0 :     data.publication_transport_priority_ =
     347           0 :       writer.writerQos.transport_priority.value;
     348           0 :     data.remote_reliable_ =
     349           0 :       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
     350           0 :     data.remote_durable_ =
     351           0 :       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     352             : 
     353           0 :     if (!associate(data, active)) {
     354           0 :       if (log_level >= LogLevel::Warning) {
     355           0 :         ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::add_association: "
     356             :                    "transport layer failed to associate\n"));
     357             :       }
     358           0 :       return;
     359             :     }
     360             : 
     361             :     // Check if any publications have already sent a REQUEST_ACK message.
     362             :     // {
     363             :     //   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
     364             :     //
     365             :     //   WriterMapType::iterator where = writers_.find(writer.writerId);
     366             :     //
     367             :     //   if (where != writers_.end()) {
     368             :     //     const MonotonicTimePoint now = MonotonicTimePoint::now();
     369             :     //
     370             :     //     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     371             :     //
     372             :     //     if (where->second->should_ack(now)) {
     373             :     //       const SequenceNumber sequence = where->second->ack_sequence();
     374             :     //       if (send_sample_ack(writer.writerId, sequence, now.to_dds_time())) {
     375             :     //         where->second->clear_acks(sequence);
     376             :     //       }
     377             :     //     }
     378             :     //   }
     379             :     // }
     380             : 
     381             :     //
     382             :     // LIVELINESS policy timers are managed here.
     383             :     //
     384             :     // if (liveliness_lease_duration_ != TimeDuration::zero) {
     385             :     //   // this call will start the timer if it is not already set
     386             :     //   const MonotonicTimePoint now = MonotonicTimePoint::now();
     387             :     //
     388             :     //   if (DCPS_debug_level >= 5) {
     389             :     //     GuidConverter converter(subscription_id_);
     390             :     //     ACE_DEBUG((LM_DEBUG,
     391             :     //                ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
     392             :     //                ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
     393             :     //                OPENDDS_STRING(converter).c_str()));
     394             :     //   }
     395             :     //
     396             :     //   handle_timeout(now, this);
     397             :     // }
     398             : 
     399             :     // else - no timer needed when LIVELINESS.lease_duration is INFINITE
     400             : 
     401           0 :   }
     402             :   //
     403             :   // We no longer hold the publication_handle_lock_.
     404             :   //
     405             : 
     406             :   //
     407             :   // We only do the following processing for readers that are *not*
     408             :   // readers of Builtin Topics.
     409             :   //
     410           0 :   if (!is_bit_) {
     411             : 
     412           0 :     const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(writer.writerId);
     413             : 
     414             :     //
     415             :     // We acquire the publication_handle_lock_ for the remainder of our
     416             :     // processing.
     417             :     //
     418             :     {
     419           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     420             : 
     421             :       // This insertion is idempotent.
     422           0 :       id_to_handle_map_.insert(
     423           0 :         RepoIdToHandleMap::value_type(writer.writerId, handle));
     424             : 
     425           0 :       if (DCPS_debug_level > 4) {
     426           0 :         ACE_DEBUG((LM_DEBUG,
     427             :                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
     428             :                    ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
     429             :                    LogGuid(writer.writerId).c_str(),
     430             :                    handle));
     431             :       }
     432             : 
     433             :       // We need to adjust these after the insertions have all completed
     434             :       // since insertions are not guaranteed to increase the number of
     435             :       // currently matched publications.
     436           0 :       int matchedPublications = static_cast<int>(id_to_handle_map_.size());
     437             :       subscription_match_status_.current_count_change
     438           0 :         = matchedPublications - subscription_match_status_.current_count;
     439           0 :       subscription_match_status_.current_count = matchedPublications;
     440             : 
     441           0 :       ++subscription_match_status_.total_count;
     442           0 :       ++subscription_match_status_.total_count_change;
     443             : 
     444           0 :       subscription_match_status_.last_publication_handle = handle;
     445             : 
     446             :       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
     447             : 
     448             : 
     449           0 :       if (listener_.in()) {
     450           0 :         listener_->on_recorder_matched(
     451             :           this,
     452           0 :           subscription_match_status_);
     453             : 
     454             :         // TBD - why does the spec say to change this but not change
     455             :         //       the ChangeFlagStatus after a listener call?
     456             : 
     457             :         // Client will look at it so next time it looks the change should be 0
     458           0 :         subscription_match_status_.total_count_change = 0;
     459           0 :         subscription_match_status_.current_count_change = 0;
     460             :       }
     461             : 
     462             :       // notify_status_condition();
     463           0 :     }
     464             : 
     465             :     {
     466           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     467           0 :       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
     468             : 
     469           0 :       if (!writers_.count(writer.writerId)) {
     470           0 :         return;
     471             :       }
     472             : 
     473           0 :       writers_[writer.writerId]->handle(handle);
     474           0 :     }
     475             :   }
     476             : 
     477             :   // if (monitor_) {
     478             :   //   monitor_->report();
     479             :   // }
     480             : }
     481             : 
     482             : void
     483           0 : RecorderImpl::remove_associations(const WriterIdSeq& writers,
     484             :                                   bool               notify_lost)
     485             : {
     486             :   DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
     487           0 :   if (writers.length() == 0) {
     488           0 :     return;
     489             :   }
     490             : 
     491           0 :   if (DCPS_debug_level >= 4) {
     492           0 :     ACE_DEBUG((LM_DEBUG,
     493             :                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
     494             :                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     495             :                is_bit_,
     496             :                LogGuid(subscription_id_).c_str(),
     497             :                LogGuid(writers[0]).c_str(),
     498             :                writers.length()));
     499             :   }
     500           0 :   if (!get_deleted()) {
     501             :     // stop pending associations for these writer ids
     502           0 :     stop_associating(writers.get_buffer(), writers.length());
     503             :   }
     504             : 
     505           0 :   remove_associations_i(writers, notify_lost);
     506             : }
     507             : 
     508             : void
     509           0 : RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
     510             :     bool notify_lost)
     511             : {
     512             :   DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
     513             : 
     514           0 :   if (writers.length() == 0) {
     515           0 :     return;
     516             :   }
     517             : 
     518           0 :   if (DCPS_debug_level >= 4) {
     519           0 :     ACE_DEBUG((LM_DEBUG,
     520             :                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
     521             :                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
     522             :                is_bit_,
     523             :                LogGuid(subscription_id_).c_str(),
     524             :                LogGuid(writers[0]).c_str(),
     525             :                writers.length()));
     526             :   }
     527           0 :   DDS::InstanceHandleSeq handles;
     528             : 
     529           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     530             : 
     531             :   // This is used to hold the list of writers which were actually
     532             :   // removed, which is a proper subset of the writers which were
     533             :   // requested to be removed.
     534           0 :   WriterIdSeq updated_writers;
     535             : 
     536             :   CORBA::ULong wr_len;
     537             : 
     538             :   //Remove the writers from writer list. If the supplied writer
     539             :   //is not in the cached writers list then it is already removed.
     540             :   //We just need remove the writers in the list that have not been
     541             :   //removed.
     542             :   {
     543           0 :     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
     544             : 
     545           0 :     wr_len = writers.length();
     546             : 
     547           0 :     for (CORBA::ULong i = 0; i < wr_len; i++) {
     548           0 :       GUID_t writer_id = writers[i];
     549             : 
     550             : #ifndef OPENDDS_SAFETY_PROFILE
     551           0 :       if (dt_map_.erase(writer_id) == 0) {
     552           0 :         if (DCPS_debug_level >= 4) {
     553           0 :           ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::remove_associations_i: -"
     554             :             "failed to find writer_id in the DynamicTypeByPubId map.\n"));
     555             :         }
     556             :       }
     557             : #endif
     558             : 
     559           0 :       WriterMapType::iterator it = writers_.find(writer_id);
     560           0 :       if (it != writers_.end()) {
     561           0 :         it->second->removed();
     562             :       }
     563             : 
     564           0 :       if (writers_.erase(writer_id) == 0) {
     565           0 :         if (DCPS_debug_level >= 4) {
     566           0 :           ACE_DEBUG((LM_DEBUG,
     567             :                      ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
     568             :                      ACE_TEXT("the writer local %C was already removed.\n"),
     569             :                      LogGuid(writer_id).c_str()));
     570             :         }
     571             : 
     572             :       } else {
     573           0 :         push_back(updated_writers, writer_id);
     574             :       }
     575             :     }
     576           0 :   }
     577             : 
     578           0 :   wr_len = updated_writers.length();
     579             : 
     580             :   // Return now if the supplied writers have been removed already.
     581           0 :   if (wr_len == 0) {
     582           0 :     return;
     583             :   }
     584             : 
     585           0 :   if (!is_bit_) {
     586             :     // The writer should be in the id_to_handle map at this time.  Note
     587             :     // it if it not there.
     588           0 :     lookup_instance_handles(updated_writers, handles);
     589             : 
     590           0 :     for (CORBA::ULong i = 0; i < wr_len; ++i) {
     591           0 :       id_to_handle_map_.erase(updated_writers[i]);
     592             :     }
     593             :   }
     594           0 :   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
     595           0 :     disassociate(updated_writers[i]);
     596             :   }
     597             : 
     598             :   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
     599           0 :   if (!is_bit_) {
     600             :     // Derive the change in the number of publications writing to this reader.
     601           0 :     int matchedPublications = static_cast<int>(id_to_handle_map_.size());
     602             :     subscription_match_status_.current_count_change
     603           0 :       = matchedPublications - subscription_match_status_.current_count;
     604             : 
     605             :     // Only process status if the number of publications has changed.
     606           0 :     if (subscription_match_status_.current_count_change != 0) {
     607           0 :       subscription_match_status_.current_count = matchedPublications;
     608             :       /// Section 7.1.4.1: total_count will not decrement.
     609             : 
     610             :       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
     611             :       subscription_match_status_.last_publication_handle
     612           0 :         = handles[ wr_len - 1];
     613             : 
     614             :       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
     615             : 
     616             :       // DDS::DataReaderListener_var listener
     617             :       // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
     618             : 
     619           0 :       if (listener_.in()) {
     620           0 :         listener_->on_recorder_matched(
     621             :           this,
     622           0 :           subscription_match_status_);
     623             : 
     624             :         // Client will look at it so next time it looks the change should be 0
     625           0 :         subscription_match_status_.total_count_change = 0;
     626           0 :         subscription_match_status_.current_count_change = 0;
     627             :       }
     628             : 
     629             :       // notify_status_condition();
     630             :     }
     631             :   }
     632             : 
     633             :   // If this remove_association is invoked when the InfoRepo
     634             :   // detects a lost writer then make a callback to notify
     635             :   // subscription lost.
     636           0 :   if (notify_lost) {
     637           0 :     notify_subscription_lost(handles);
     638             :   }
     639             : 
     640             :   // if (monitor_) {
     641             :   //   monitor_->report();
     642             :   // }
     643             : 
     644           0 :   for (unsigned int i = 0; i < handles.length(); ++i) {
     645           0 :     participant_servant_->return_handle(handles[i]);
     646             :   }
     647           0 : }
     648             : 
     649             : void
     650           0 : RecorderImpl::remove_all_associations()
     651             : {
     652             :   DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
     653             : 
     654           0 :   OpenDDS::DCPS::WriterIdSeq writers;
     655             :   int size;
     656             : 
     657           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     658             : 
     659             :   {
     660           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
     661             : 
     662           0 :     size = static_cast<int>(writers_.size());
     663           0 :     writers.length(size);
     664             : 
     665           0 :     WriterMapType::iterator curr_writer = writers_.begin();
     666           0 :     WriterMapType::iterator end_writer = writers_.end();
     667             : 
     668           0 :     int i = 0;
     669             : 
     670           0 :     while (curr_writer != end_writer) {
     671           0 :       writers[i++] = curr_writer->first;
     672           0 :       ++curr_writer;
     673             :     }
     674           0 :   }
     675             : 
     676             :   try {
     677           0 :     CORBA::Boolean dont_notify_lost = false;
     678             : 
     679           0 :     if (0 < size) {
     680           0 :       remove_associations(writers, dont_notify_lost);
     681             :     }
     682             : 
     683           0 :   } catch (const CORBA::Exception&) {
     684           0 :   }
     685             : 
     686           0 :   transport_stop();
     687           0 : }
     688             : 
     689             : void
     690           0 : RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
     691             : {
     692           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
     693             :             guard,
     694             :             publication_handle_lock_);
     695             : 
     696           0 :   if (requested_incompatible_qos_status_.total_count == status.total_count) {
     697             :     // This test should make the method idempotent.
     698           0 :     return;
     699             :   }
     700             : 
     701             :   // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
     702             :   //                         true);
     703             : 
     704             :   // copy status and increment change
     705           0 :   requested_incompatible_qos_status_.total_count = status.total_count;
     706           0 :   requested_incompatible_qos_status_.total_count_change +=
     707           0 :     status.count_since_last_send;
     708           0 :   requested_incompatible_qos_status_.last_policy_id =
     709           0 :     status.last_policy_id;
     710           0 :   requested_incompatible_qos_status_.policies = status.policies;
     711             : 
     712             :   // if (!CORBA::is_nil(listener.in())) {
     713             :   //   listener->on_requested_incompatible_qos(this,
     714             :   //                                           requested_incompatible_qos_status_);
     715             :   //
     716             :   //   // TBD - why does the spec say to change total_count_change but not
     717             :   //   // change the ChangeFlagStatus after a listener call?
     718             :   //
     719             :   //   // client just looked at it so next time it looks the
     720             :   //   // change should be 0
     721             :   //   requested_incompatible_qos_status_.total_count_change = 0;
     722             :   // }
     723             :   //
     724             :   // notify_status_condition();
     725           0 : }
     726             : 
     727             : void
     728           0 : RecorderImpl::signal_liveliness(const GUID_t& remote_participant)
     729             : {
     730           0 :   GUID_t prefix = remote_participant;
     731           0 :   prefix.entityId = EntityId_t();
     732             : 
     733           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     734             : 
     735             :   typedef std::pair<GUID_t, RcHandle<WriterInfo> > WriterSetElement;
     736             :   typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
     737           0 :   WriterSet writers;
     738             : 
     739             :   {
     740           0 :     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
     741           0 :     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
     742           0 :            limit = writers_.end();
     743           0 :          pos != limit && equal_guid_prefixes(pos->first, prefix);
     744           0 :          ++pos) {
     745           0 :       writers.push_back(std::make_pair(pos->first, pos->second));
     746             :     }
     747           0 :   }
     748             : 
     749           0 :   const MonotonicTimePoint now = MonotonicTimePoint::now();
     750           0 :   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
     751           0 :        pos != limit;
     752           0 :        ++pos) {
     753           0 :     pos->second->received_activity(now);
     754             :   }
     755           0 : }
     756             : 
     757           0 : DDS::ReturnCode_t RecorderImpl::set_qos(
     758             :   const DDS::SubscriberQos & subscriber_qos,
     759             :   const DDS::DataReaderQos & qos)
     760             : {
     761             :   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
     762             : 
     763           0 :   if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
     764           0 :     if (subqos_ != subscriber_qos) {
     765             :       // for the not changeable qos, it can be changed before enable
     766           0 :       if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_) {
     767           0 :         return DDS::RETCODE_IMMUTABLE_POLICY;
     768             : 
     769             :       } else {
     770           0 :         subqos_ = subscriber_qos;
     771             :       }
     772             :     }
     773             :   } else {
     774           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     775             :   }
     776             : 
     777             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     778             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     779             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     780             : 
     781           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
     782           0 :     if (qos_ == qos)
     783           0 :       return DDS::RETCODE_OK;
     784             : 
     785           0 :     if (!Qos_Helper::changeable(qos_, qos) && is_enabled()) {
     786           0 :       return DDS::RETCODE_IMMUTABLE_POLICY;
     787             : 
     788             :     } else {
     789           0 :       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     790             :       const bool status =
     791           0 :         disco->update_subscription_qos(
     792           0 :           participant_servant_->get_domain_id(),
     793           0 :           participant_servant_->get_id(),
     794           0 :           subscription_id_,
     795             :           qos,
     796             :           subscriber_qos);
     797           0 :       if (!status) {
     798           0 :         if (log_level >= LogLevel::Notice) {
     799           0 :           ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::set_qos: qos not updated\n"));
     800             :         }
     801           0 :         return DDS::RETCODE_ERROR;
     802             :       }
     803           0 :     }
     804             : 
     805           0 :     qos_ = qos;
     806           0 :     subqos_ = subscriber_qos;
     807             : 
     808           0 :     return DDS::RETCODE_OK;
     809             : 
     810             :   } else {
     811           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     812             :   }
     813             : }
     814             : 
     815             : DDS::ReturnCode_t
     816           0 : RecorderImpl::get_qos(
     817             :   DDS::SubscriberQos & subscriber_qos,
     818             :   DDS::DataReaderQos & qos)
     819             : {
     820           0 :   qos = passed_qos_;
     821           0 :   subscriber_qos = subqos_;
     822           0 :   return DDS::RETCODE_OK;
     823             : }
     824             : 
     825             : DDS::ReturnCode_t
     826           0 : RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
     827             :                            DDS::StatusMask             mask)
     828             : {
     829           0 :   listener_mask_ = mask;
     830             :   //note: OK to duplicate  a nil object ref
     831           0 :   listener_ = a_listener;
     832           0 :   return DDS::RETCODE_OK;
     833             : }
     834             : 
     835             : RecorderListener_rch
     836           0 : RecorderImpl::get_listener()
     837             : {
     838           0 :   return listener_;
     839             : }
     840             : 
     841             : void
     842           0 : RecorderImpl::lookup_instance_handles(const WriterIdSeq&       ids,
     843             :                                       DDS::InstanceHandleSeq & hdls)
     844             : {
     845           0 :   CORBA::ULong const num_wrts = ids.length();
     846             : 
     847           0 :   if (DCPS_debug_level > 9) {
     848           0 :     OPENDDS_STRING separator = "";
     849           0 :     OPENDDS_STRING buffer;
     850             : 
     851           0 :     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
     852           0 :       buffer += separator + LogGuid(ids[i]).conv_;
     853           0 :       separator = ", ";
     854             :     }
     855             : 
     856           0 :     ACE_DEBUG((LM_DEBUG,
     857             :                ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
     858             :                ACE_TEXT("searching for handles for writer Ids: %C.\n"),
     859             :                buffer.c_str()));
     860           0 :   }
     861             : 
     862           0 :   hdls.length(num_wrts);
     863             : 
     864           0 :   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
     865           0 :     hdls[i] = participant_servant_->lookup_handle(ids[i]);
     866             :   }
     867           0 : }
     868             : 
     869             : DDS::ReturnCode_t
     870           0 : RecorderImpl::enable()
     871             : {
     872           0 :   if (DCPS_debug_level >= 2) {
     873           0 :     ACE_DEBUG((LM_DEBUG,
     874             :                ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
     875             :   }
     876             :   //According spec:
     877             :   // - Calling enable on an already enabled Entity returns OK and has no
     878             :   // effect.
     879             :   // - Calling enable on an Entity whose factory is not enabled will fail
     880             :   // and return PRECONDITION_NOT_MET.
     881             : 
     882           0 :   if (is_enabled()) {
     883           0 :     return DDS::RETCODE_OK;
     884             :   }
     885             : 
     886           0 :   set_enabled();
     887             : 
     888             :   // if (topic_servant_ && !transport_disabled_) {
     889           0 :   if (topic_servant_) {
     890           0 :     if (DCPS_debug_level >= 2) {
     891           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: enable_transport\n"));
     892             :     }
     893             : 
     894             :     try {
     895           0 :       enable_transport(qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
     896           0 :                              qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
     897           0 :     } catch (const Transport::Exception&) {
     898           0 :       if (log_level >= LogLevel::Warning) {
     899           0 :         ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: Transport Exception\n"));
     900             :       }
     901           0 :       return DDS::RETCODE_ERROR;
     902           0 :     }
     903             : 
     904           0 :     const TransportLocatorSeq& trans_conf_info = connection_info();
     905             : 
     906           0 :     CORBA::String_var filterClassName = "";
     907           0 :     CORBA::String_var filterExpression = "";
     908           0 :     DDS::StringSeq exprParams;
     909             : 
     910             :     Discovery_rch disco =
     911           0 :       TheServiceParticipant->get_discovery(domain_id_);
     912             : 
     913           0 :     DCPS::set_reader_effective_data_rep_qos(qos_.representation.value);
     914           0 :     if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
     915           0 :       return DDS::RETCODE_ERROR;
     916             :     }
     917             : 
     918           0 :     if (DCPS_debug_level >= 2) {
     919           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: add_subscription\n"));
     920             :     }
     921             : 
     922           0 :     XTypes::TypeInformation type_info;
     923             : 
     924             :     subscription_id_ =
     925           0 :       disco->add_subscription(domain_id_,
     926           0 :                               participant_servant_->get_id(),
     927           0 :                               topic_servant_->get_id(),
     928           0 :                               rchandle_from(this),
     929           0 :                               qos_,
     930             :                               trans_conf_info,
     931           0 :                               subqos_,
     932             :                               filterClassName,
     933             :                               filterExpression,
     934             :                               exprParams,
     935             :                               type_info);
     936             : 
     937           0 :     if (subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
     938           0 :       if (log_level >= LogLevel::Warning) {
     939           0 :         ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: "
     940             :           "add_subscription returned invalid id\n"));
     941             :       }
     942           0 :       return DDS::RETCODE_ERROR;
     943             :     }
     944           0 :   }
     945             : 
     946           0 :   return DDS::RETCODE_OK;
     947             : }
     948             : 
     949             : DDS::InstanceHandle_t
     950           0 : RecorderImpl::get_instance_handle()
     951             : {
     952           0 :   return get_entity_instance_handle(subscription_id_, rchandle_from(participant_servant_));
     953             : }
     954             : 
     955             : void
     956           0 : RecorderImpl::register_for_writer(const GUID_t& participant,
     957             :                                   const GUID_t& readerid,
     958             :                                   const GUID_t& writerid,
     959             :                                   const TransportLocatorSeq& locators,
     960             :                                   DiscoveryListener* listener)
     961             : {
     962           0 :   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
     963           0 : }
     964             : 
     965             : void
     966           0 : RecorderImpl::unregister_for_writer(const GUID_t& participant,
     967             :                                     const GUID_t& readerid,
     968             :                                     const GUID_t& writerid)
     969             : {
     970           0 :   TransportClient::unregister_for_writer(participant, readerid, writerid);
     971           0 : }
     972             : 
     973             : #if !defined (DDS_HAS_MINIMUM_BIT)
     974             : DDS::ReturnCode_t
     975           0 : RecorderImpl::repoid_to_bit_key(const GUID_t& id,
     976             :                                 DDS::BuiltinTopicKey_t& key)
     977             : {
     978           0 :   const DDS::InstanceHandle_t publication_handle = participant_servant_->lookup_handle(id);
     979             : 
     980           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     981             :                    guard,
     982             :                    publication_handle_lock_,
     983             :                    DDS::RETCODE_ERROR);
     984             : 
     985           0 :   DDS::PublicationBuiltinTopicDataSeq data;
     986             : 
     987           0 :   DDS::ReturnCode_t const ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
     988             :                             participant_servant_,
     989             :                             BUILT_IN_PUBLICATION_TOPIC,
     990             :                             publication_handle,
     991             :                             data);
     992             : 
     993           0 :   if (ret == DDS::RETCODE_OK) {
     994           0 :     key = data[0].key;
     995             :   }
     996             : 
     997           0 :   return ret;
     998           0 : }
     999             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1000             : 
    1001             : #ifndef OPENDDS_SAFETY_PROFILE
    1002           0 : DDS::DynamicData_ptr RecorderImpl::get_dynamic_data(const RawDataSample& sample)
    1003             : {
    1004           0 :   const Encoding enc(sample.encoding_kind_, sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
    1005           0 :   const DynamicTypeByPubId::const_iterator dt_found = dt_map_.find(sample.publication_id_);
    1006           0 :   if (dt_found == dt_map_.end()) {
    1007           0 :     if (log_level >= LogLevel::Error) {
    1008           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RecorderImpl::get_dynamic_data: "
    1009             :         "failed to find GUID: %C in DynamicTypeByPubId.\n", LogGuid(sample.publication_id_).c_str()));
    1010             :     }
    1011           0 :     return 0;
    1012             :   }
    1013             : 
    1014           0 :   DDS::DynamicType_var dt = dt_found->second;
    1015           0 :   XTypes::DynamicDataXcdrReadImpl* dd = new XTypes::DynamicDataXcdrReadImpl(sample.sample_.get(), enc, dt);
    1016           0 :   DDS::DynamicData_var dd_var = dd;
    1017           0 :   if (!dd->check_xcdr1_mutable(dt)) {
    1018           0 :     if (log_level >= LogLevel::Notice) {
    1019           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::get_dynamic_data: "
    1020             :         "Encountered unsupported combination of XCDR1 encoding and mutable extensibility.\n"));
    1021             :     }
    1022           0 :     return 0;
    1023             :   }
    1024           0 :   return dd_var._retn();
    1025           0 : }
    1026             : #endif
    1027             : 
    1028             : } // namespace DCPS
    1029             : } // namespace
    1030             : 
    1031             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16