OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::RecorderImpl Class Reference

Implementation of Recorder functionality. More...

#include <RecorderImpl.h>

Inheritance diagram for OpenDDS::DCPS::RecorderImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RecorderImpl:
Collaboration graph
[legend]

Public Member Functions

 RecorderImpl ()
 
virtual ~RecorderImpl ()
 
DDS::ReturnCode_t cleanup ()
 
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
 
DDS::ReturnCode_t enable ()
 
virtual bool check_transport_qos (const TransportInst &inst)
 
virtual GUID_t get_guid () const
 
DDS::DomainId_t domain_id () const
 
virtual CORBA::Long get_priority_value (const AssociationData &data) const
 
virtual void data_received (const ReceivedDataSample &sample)
 
virtual void notify_subscription_disconnected (const WriterIdSeq &pubids)
 
virtual void notify_subscription_reconnected (const WriterIdSeq &pubids)
 
virtual void notify_subscription_lost (const WriterIdSeq &pubids)
 
virtual void add_association (const GUID_t &yourId, const WriterAssociation &writer, bool active)
 
virtual void remove_associations (const WriterIdSeq &writers, CORBA::Boolean callback)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void signal_liveliness (const GUID_t &remote_participant)
 
void remove_all_associations ()
 
void add_to_dynamic_type_map (const GUID_t &pub_id, const XTypes::TypeIdentifier &ti)
 
virtual DDS::ReturnCode_t repoid_to_bit_key (const DCPS::GUID_t &id, DDS::BuiltinTopicKey_t &key)
 
DDS::ReturnCode_t set_qos (const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos)
 
DDS::ReturnCode_t get_qos (DDS::SubscriberQos &subscriber_qos, DDS::DataReaderQos &datareader_qos)
 
DDS::ReturnCode_t set_listener (const RecorderListener_rch &a_listener, DDS::StatusMask mask)
 
RecorderListener_rch get_listener ()
 
DomainParticipantImplparticipant ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
virtual RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
virtual ~TransportReceiveListener ()
 
virtual void transport_discovery_change ()
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderCallbacks
 DataReaderCallbacks ()
 
virtual ~DataReaderCallbacks ()
 
virtual void update_locators (const GUID_t &, const TransportLocatorSeq &)
 
- Public Member Functions inherited from OpenDDS::DCPS::Recorder
virtual ~Recorder ()
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
virtual RcHandle< EntityImplparent () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 

Protected Member Functions

virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
 TransportReceiveListener ()
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 

Private Member Functions

void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
 
void lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids. More...
 
DDS::DynamicData_ptr get_dynamic_data (const RawDataSample &sample)
 
void check_encap (bool b)
 
bool check_encap () const
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RcHandle< WriterInfo >, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader. More...
 
typedef OPENDDS_MAP (GUID_t, DDS::DynamicType_var) DynamicTypeByPubId
 
- Private Member Functions inherited from OpenDDS::DCPS::WriterInfoListener
 WriterInfoListener ()
 
virtual ~WriterInfoListener ()
 
virtual void writer_became_alive (WriterInfo &info, const MonotonicTimePoint &when)
 
virtual void writer_became_dead (WriterInfo &info)
 
virtual void writer_removed (WriterInfo &info)
 
- Private Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
 RcObject ()
 

Private Attributes

DDS::DataReaderQos qos_
 
DDS::DataReaderQos passed_qos_
 
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses. More...
 
DomainParticipantImplparticipant_servant_
 
TopicDescriptionPtr< TopicImpltopic_servant_
 
bool is_exclusive_ownership_
 
OwnershipManagerowner_manager_
 
DDS::SubscriberQos subqos_
 
DDS::TopicDescription_var topic_desc_
 
DDS::StatusMask listener_mask_
 
RecorderListener_rch listener_
 
DDS::DomainId_t domain_id_
 
ACE_Recursive_Thread_Mutex publication_handle_lock_
 
RepoIdToHandleMap id_to_handle_map_
 
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
 
DDS::SubscriptionMatchedStatus subscription_match_status_
 
bool is_bit_
 
WriterMapType writers_
 
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications. More...
 
DynamicTypeByPubId dt_map_
 
bool check_encap_
 
TransportMessageBlockAllocator mb_alloc_
 
- Private Attributes inherited from OpenDDS::DCPS::WriterInfoListener
GUID_t subscription_id_
 
TimeDuration liveliness_lease_duration_
 

Friends

class ::DDS_TEST
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 
- Public Types inherited from OpenDDS::DCPS::Recorder
typedef Recorder_ptr _ptr_type
 
typedef Recorder_var _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Static Public Member Functions inherited from OpenDDS::DCPS::Recorder
static Recorder_ptr _duplicate (Recorder_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Detailed Description

Implementation of Recorder functionality.

This class is the implementation of the Recorder. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 45 of file RecorderImpl.h.

Constructor & Destructor Documentation

◆ RecorderImpl()

OpenDDS::DCPS::RecorderImpl::RecorderImpl ( )

Definition at line 52 of file RecorderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, subscription_match_status_, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::SubscriptionMatchedStatus::total_count_change.

53  : qos_(TheServiceParticipant->initial_DataReaderQos())
55  , topic_servant_(0)
56 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
58 #endif
59 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
60  , owner_manager_(0)
61 #endif
62  , subqos_(TheServiceParticipant->initial_SubscriberQos())
63  , topic_desc_(0)
65  , domain_id_(0)
66  , is_bit_(false)
67  , check_encap_(true)
69 {
74 
80 }
DDS::SubscriptionMatchedStatus subscription_match_status_
Definition: RecorderImpl.h:194
TopicDescriptionPtr< TopicImpl > topic_servant_
Definition: RecorderImpl.h:171
const InstanceHandle_t HANDLE_NIL
const DDS::StatusMask DEFAULT_STATUS_MASK
DDS::StatusMask listener_mask_
Definition: RecorderImpl.h:184
TransportMessageBlockAllocator mb_alloc_
Definition: RecorderImpl.h:214
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
Definition: RecorderImpl.h:193
DDS::TopicDescription_var topic_desc_
Definition: RecorderImpl.h:183
OwnershipManager * owner_manager_
Definition: RecorderImpl.h:176
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
#define TheServiceParticipant
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179

◆ ~RecorderImpl()

OpenDDS::DCPS::RecorderImpl::~RecorderImpl ( )
virtual

Definition at line 84 of file RecorderImpl.cpp.

References DBG_ENTRY_LVL.

85 {
86  DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
87 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ add_association()

void OpenDDS::DCPS::RecorderImpl::add_association ( const GUID_t yourId,
const WriterAssociation writer,
bool  active 
)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 248 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), ACE_WRITE_GUARD, OpenDDS::DCPS::DomainParticipantImpl::assign_handle(), OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::LogGuid::c_str(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, DDS::DataWriterQos::durability, OpenDDS::DCPS::GUID_UNKNOWN, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::RecorderListener::on_recorder_matched(), participant_servant_, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, DDS::DataWriterQos::transport_priority, OpenDDS::DCPS::WriterAssociation::transportContext, DDS::TransportPriorityQosPolicy::value, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::LogLevel::Warning, OpenDDS::DCPS::WriterAssociation::writerDiscInfo, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

251 {
252  if (DCPS_debug_level >= 4) {
253  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::add_association: "
254  "bit %d local %C remote %C\n",
255  is_bit_,
256  LogGuid(yourId).c_str(),
257  LogGuid(writer.writerId).c_str()));
258  }
259 
260  //
261  // This block prevents adding associations to deleted readers.
262  // Presumably this is a "good thing(tm)".
263  //
264  // if (entity_deleted_) {
265  // if (DCPS_debug_level >= 1)
266  // ACE_DEBUG((LM_DEBUG,
267  // ACE_TEXT("(%P|%t) RecorderImpl::add_association")
268  // ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
269  //
270  // return;
271  // }
272 
273  //
274  // We are being called back from the repository before we are done
275  // processing after our call to the repository that caused this call
276  // (from the repository) to be made.
277  //
279  // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
280  subscription_id_ = yourId;
281  }
282 
283  //
284  // We do the following while holding the publication_handle_lock_.
285  //
286  {
288 
289  //
290  // For each writer in the list of writers to associate with, we
291  // create a WriterInfo and a WriterStats object and store them in
292  // our internal maps.
293  //
294  {
296 
297  const GUID_t& writer_id = writer.writerId;
298  RcHandle<WriterInfo> info ( make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos));
299  /*std::pair<WriterMapType::iterator, bool> bpair =*/
300  writers_.insert(
301  // This insertion is idempotent.
302  WriterMapType::value_type(
303  writer_id,
304  info));
305  // statistics_.insert(
306  // StatsMapType::value_type(
307  // writer_id,
308  // WriterStats(
309  // raw_latency_buffer_size_,
310  // raw_latency_buffer_type_)));
311 
312  // if (DCPS_debug_level > 4) {
313  // GuidConverter converter(writer_id);
314  // ACE_DEBUG((LM_DEBUG,
315  // "(%P|%t) RecorderImpl::add_association: "
316  // "inserted writer %C.return %d\n",
317  // OPENDDS_STRING(converter).c_str(), bpair.second));
318  //
319  // WriterMapType::iterator iter = writers_.find(writer_id);
320  // if (iter != writers_.end()) {
321  // // This may not be an error since it could happen that the sample
322  // // is delivered to the datareader after the write is dis-associated
323  // // with this datareader.
324  // GuidConverter reader_converter(subscription_id_);
325  // GuidConverter writer_converter(writer_id);
326  // ACE_DEBUG((LM_DEBUG,
327  // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
328  // ACE_TEXT("reader %C is associated with writer %C.\n"),
329  // OPENDDS_STRING(reader_converter).c_str(),
330  // OPENDDS_STRING(writer_converter).c_str()));
331  // }
332  // }
333  }
334 
335  //
336  // Propagate the add_associations processing down into the Transport
337  // layer here. This will establish the transport support and reserve
338  // usage of an existing connection or initiate creation of a new
339  // connection if no suitable connection is available.
340  //
341  AssociationData data;
342  data.remote_id_ = writer.writerId;
343  data.remote_data_ = writer.writerTransInfo;
344  data.discovery_locator_ = writer.writerDiscInfo;
345  data.remote_transport_context_ = writer.transportContext;
346  data.publication_transport_priority_ =
347  writer.writerQos.transport_priority.value;
348  data.remote_reliable_ =
349  (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
350  data.remote_durable_ =
351  (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
352 
353  if (!associate(data, active)) {
354  if (log_level >= LogLevel::Warning) {
355  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::add_association: "
356  "transport layer failed to associate\n"));
357  }
358  return;
359  }
360 
361  // Check if any publications have already sent a REQUEST_ACK message.
362  // {
363  // ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
364  //
365  // WriterMapType::iterator where = writers_.find(writer.writerId);
366  //
367  // if (where != writers_.end()) {
368  // const MonotonicTimePoint now = MonotonicTimePoint::now();
369  //
370  // ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
371  //
372  // if (where->second->should_ack(now)) {
373  // const SequenceNumber sequence = where->second->ack_sequence();
374  // if (send_sample_ack(writer.writerId, sequence, now.to_dds_time())) {
375  // where->second->clear_acks(sequence);
376  // }
377  // }
378  // }
379  // }
380 
381  //
382  // LIVELINESS policy timers are managed here.
383  //
384  // if (liveliness_lease_duration_ != TimeDuration::zero) {
385  // // this call will start the timer if it is not already set
386  // const MonotonicTimePoint now = MonotonicTimePoint::now();
387  //
388  // if (DCPS_debug_level >= 5) {
389  // GuidConverter converter(subscription_id_);
390  // ACE_DEBUG((LM_DEBUG,
391  // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
392  // ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
393  // OPENDDS_STRING(converter).c_str()));
394  // }
395  //
396  // handle_timeout(now, this);
397  // }
398 
399  // else - no timer needed when LIVELINESS.lease_duration is INFINITE
400 
401  }
402  //
403  // We no longer hold the publication_handle_lock_.
404  //
405 
406  //
407  // We only do the following processing for readers that are *not*
408  // readers of Builtin Topics.
409  //
410  if (!is_bit_) {
411 
412  const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(writer.writerId);
413 
414  //
415  // We acquire the publication_handle_lock_ for the remainder of our
416  // processing.
417  //
418  {
420 
421  // This insertion is idempotent.
422  id_to_handle_map_.insert(
423  RepoIdToHandleMap::value_type(writer.writerId, handle));
424 
425  if (DCPS_debug_level > 4) {
426  ACE_DEBUG((LM_DEBUG,
427  ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
428  ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
429  LogGuid(writer.writerId).c_str(),
430  handle));
431  }
432 
433  // We need to adjust these after the insertions have all completed
434  // since insertions are not guaranteed to increase the number of
435  // currently matched publications.
436  int matchedPublications = static_cast<int>(id_to_handle_map_.size());
438  = matchedPublications - subscription_match_status_.current_count;
439  subscription_match_status_.current_count = matchedPublications;
440 
443 
445 
446  // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
447 
448 
449  if (listener_.in()) {
451  this,
453 
454  // TBD - why does the spec say to change this but not change
455  // the ChangeFlagStatus after a listener call?
456 
457  // Client will look at it so next time it looks the change should be 0
460  }
461 
462  // notify_status_condition();
463  }
464 
465  {
468 
469  if (!writers_.count(writer.writerId)) {
470  return;
471  }
472 
473  writers_[writer.writerId]->handle(handle);
474  }
475  }
476 
477  // if (monitor_) {
478  // monitor_->report();
479  // }
480 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
DDS::SubscriptionMatchedStatus subscription_match_status_
Definition: RecorderImpl.h:194
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Definition: RecorderImpl.h:168
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
virtual void on_recorder_matched(Recorder *recorder, const DDS::SubscriptionMatchedStatus &status)=0
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
RepoIdToHandleMap id_to_handle_map_
Definition: RecorderImpl.h:191
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206
bool associate(const AssociationData &peer, bool active)

◆ add_to_dynamic_type_map()

void OpenDDS::DCPS::RecorderImpl::add_to_dynamic_type_map ( const GUID_t pub_id,
const XTypes::TypeIdentifier ti 
)

Definition at line 234 of file RecorderImpl.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, dt_map_, OpenDDS::DCPS::DomainParticipantImpl::get_type_lookup_service(), LM_DEBUG, and participant_servant_.

Referenced by OpenDDS::RTPS::Sedp::match_continue().

235 {
237  DDS::DynamicType_var dt = tls->type_identifier_to_dynamic(ti, pub_id);
238  if (DCPS_debug_level >= 4) {
239  ACE_DEBUG((LM_DEBUG,
240  "(%P|%t) RecorderImpl::add_association: "
241  "DynamicType added to map with guid: %C\n", LogGuid(pub_id).c_str()));
242  }
243  dt_map_.insert(std::make_pair(pub_id, dt));
244 }
#define ACE_DEBUG(X)
DynamicTypeByPubId dt_map_
Definition: RecorderImpl.h:210
DCPS::RcHandle< TypeLookupService > TypeLookupService_rch
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
XTypes::TypeLookupService_rch get_type_lookup_service()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ check_encap() [1/2]

void OpenDDS::DCPS::RecorderImpl::check_encap ( bool  b)
inlineprivatevirtual

Implements OpenDDS::DCPS::Recorder.

Definition at line 161 of file RecorderImpl.h.

◆ check_encap() [2/2]

bool OpenDDS::DCPS::RecorderImpl::check_encap ( ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::Recorder.

Definition at line 162 of file RecorderImpl.h.

162 { return check_encap_; }

◆ check_transport_qos()

bool OpenDDS::DCPS::RecorderImpl::check_transport_qos ( const TransportInst inst)
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 156 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), DDS::ReliabilityQosPolicy::kind, qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.

157 {
159  return ti.is_reliable();
160  }
161  return true;
162 }
ReliabilityQosPolicyKind kind
ReliabilityQosPolicy reliability
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164

◆ cleanup()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::cleanup ( void  )

cleanup the DataWriter.

Definition at line 91 of file RecorderImpl.cpp.

References ACE_ERROR, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, participant_servant_, remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and TheServiceParticipant.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().

92 {
93 
94  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
95  if (!disco || !disco->remove_subscription(domain_id_,
98  if (log_level >= LogLevel::Notice) {
99  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::cleanup: "
100  "could not remove subscription from discovery\n"));
101  }
102  return DDS::RETCODE_ERROR;
103  }
104 
105  // Call remove association before unregistering the datareader from the transport,
106  // otherwise some callbacks resulted from remove_association may lost.
107 
109 
110  return DDS::RETCODE_OK;
111 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
const ReturnCode_t RETCODE_ERROR
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
#define TheServiceParticipant

◆ data_received()

void OpenDDS::DCPS::RecorderImpl::data_received ( const ReceivedDataSample sample)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 175 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, check_encap_, OpenDDS::DCPS::ReceivedDataSample::data(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::Encoding::kind(), OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, listener_, LM_DEBUG, mb_alloc_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::RecorderListener::on_sample_data_received(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::to_string().

176 {
177  DBG_ENTRY_LVL("RecorderImpl","data_received",6);
178 
179  // Ensure some other thread is not changing the sample container
180  // or statuses related to samples.
182 
183  if (DCPS_debug_level >= 8) {
184  ACE_DEBUG((LM_DEBUG,
185  "(%P|%t) RecorderImpl::data_received: "
186  "%C received sample: %C\n",
187  LogGuid(subscription_id_).c_str(),
188  to_string(sample.header_).c_str()));
189  }
190 
191  // we only support SAMPLE_DATA messages
192  if (sample.header_.message_id_ == SAMPLE_DATA && listener_.in()) {
193  Message_Block_Ptr payload(sample.data(&mb_alloc_));
195  if (sample.header_.cdr_encapsulation_ && check_encap_) {
196  Encoding enc;
197  Serializer ser(payload.get(), enc);
198  EncapsulationHeader encap;
199  if (ser >> encap && encap.to_any_encoding(enc)) {
200  kind = enc.kind();
201  }
202  }
203  RawDataSample rawSample(sample.header_,
204  static_cast<MessageId> (sample.header_.message_id_),
205  sample.header_.source_timestamp_sec_,
206  sample.header_.source_timestamp_nanosec_,
207  sample.header_.publication_id_,
208  sample.header_.byte_order_,
209  payload.get(),
210  kind);
211  listener_->on_sample_data_received(this, rawSample);
212  }
213 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Definition: RecorderImpl.h:168
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
TransportMessageBlockAllocator mb_alloc_
Definition: RecorderImpl.h:214
const char * to_string(MessageId value)
virtual void on_sample_data_received(Recorder *recorder, const RawDataSample &sample)=0
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ domain_id()

DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id ( ) const
inlinevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 76 of file RecorderImpl.h.

76 { return this->domain_id_; }
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::enable ( )

Implements DDS::Entity.

Definition at line 870 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataReaderQos::durability, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::log_level, participant_servant_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataReaderQos::representation, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::set_reader_effective_data_rep_qos(), subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, DDS::DataRepresentationQosPolicy::value, DDS::VOLATILE_DURABILITY_QOS, and OpenDDS::DCPS::LogLevel::Warning.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

871 {
872  if (DCPS_debug_level >= 2) {
873  ACE_DEBUG((LM_DEBUG,
874  ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
875  }
876  //According spec:
877  // - Calling enable on an already enabled Entity returns OK and has no
878  // effect.
879  // - Calling enable on an Entity whose factory is not enabled will fail
880  // and return PRECONDITION_NOT_MET.
881 
882  if (is_enabled()) {
883  return DDS::RETCODE_OK;
884  }
885 
886  set_enabled();
887 
888  // if (topic_servant_ && !transport_disabled_) {
889  if (topic_servant_) {
890  if (DCPS_debug_level >= 2) {
891  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: enable_transport\n"));
892  }
893 
894  try {
897  } catch (const Transport::Exception&) {
898  if (log_level >= LogLevel::Warning) {
899  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: Transport Exception\n"));
900  }
901  return DDS::RETCODE_ERROR;
902  }
903 
904  const TransportLocatorSeq& trans_conf_info = connection_info();
905 
906  CORBA::String_var filterClassName = "";
907  CORBA::String_var filterExpression = "";
908  DDS::StringSeq exprParams;
909 
910  Discovery_rch disco =
911  TheServiceParticipant->get_discovery(domain_id_);
912 
914  if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
915  return DDS::RETCODE_ERROR;
916  }
917 
918  if (DCPS_debug_level >= 2) {
919  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: add_subscription\n"));
920  }
921 
922  XTypes::TypeInformation type_info;
923 
925  disco->add_subscription(domain_id_,
927  topic_servant_->get_id(),
928  rchandle_from(this),
929  qos_,
930  trans_conf_info,
931  subqos_,
932  filterClassName,
933  filterExpression,
934  exprParams,
935  type_info);
936 
938  if (log_level >= LogLevel::Warning) {
939  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: "
940  "add_subscription returned invalid id\n"));
941  }
942  return DDS::RETCODE_ERROR;
943  }
944  }
945 
946  return DDS::RETCODE_OK;
947 }
void enable_transport(bool reliable, bool durable)
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
TopicDescriptionPtr< TopicImpl > topic_servant_
Definition: RecorderImpl.h:171
const ReturnCode_t RETCODE_OK
ReliabilityQosPolicyKind kind
sequence< TransportLocator > TransportLocatorSeq
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
const TransportLocatorSeq & connection_info() const
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
DataRepresentationQosPolicy representation
ReliabilityQosPolicy reliability
DurabilityQosPolicy durability
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
const ReturnCode_t RETCODE_ERROR
DataRepresentationIdSeq value
DurabilityQosPolicyKind kind
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
#define TheServiceParticipant
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179

◆ get_dynamic_data()

DDS::DynamicData_ptr OpenDDS::DCPS::RecorderImpl::get_dynamic_data ( const RawDataSample sample)
privatevirtual

Implements OpenDDS::DCPS::Recorder.

Definition at line 1002 of file RecorderImpl.cpp.

References ACE_ERROR, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::XTypes::DynamicDataXcdrReadImpl::check_xcdr1_mutable(), dt_map_, OpenDDS::DCPS::RawDataSample::encoding_kind_, OpenDDS::DCPS::ENDIAN_BIG, OpenDDS::DCPS::ENDIAN_LITTLE, OpenDDS::DCPS::LogLevel::Error, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::RawDataSample::header_, LM_ERROR, LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::RawDataSample::publication_id_, and OpenDDS::DCPS::RawDataSample::sample_.

1003 {
1004  const Encoding enc(sample.encoding_kind_, sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
1005  const DynamicTypeByPubId::const_iterator dt_found = dt_map_.find(sample.publication_id_);
1006  if (dt_found == dt_map_.end()) {
1007  if (log_level >= LogLevel::Error) {
1008  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RecorderImpl::get_dynamic_data: "
1009  "failed to find GUID: %C in DynamicTypeByPubId.\n", LogGuid(sample.publication_id_).c_str()));
1010  }
1011  return 0;
1012  }
1013 
1014  DDS::DynamicType_var dt = dt_found->second;
1015  XTypes::DynamicDataXcdrReadImpl* dd = new XTypes::DynamicDataXcdrReadImpl(sample.sample_.get(), enc, dt);
1016  DDS::DynamicData_var dd_var = dd;
1017  if (!dd->check_xcdr1_mutable(dt)) {
1018  if (log_level >= LogLevel::Notice) {
1019  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::get_dynamic_data: "
1020  "Encountered unsupported combination of XCDR1 encoding and mutable extensibility.\n"));
1021  }
1022  return 0;
1023  }
1024  return dd_var._retn();
1025 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
DynamicTypeByPubId dt_map_
Definition: RecorderImpl.h:210

◆ get_guid()

GUID_t OpenDDS::DCPS::RecorderImpl::get_guid ( ) const
virtual

◆ get_ice_endpoint()

virtual WeakRcHandle<ICE::Endpoint> OpenDDS::DCPS::RecorderImpl::get_ice_endpoint ( )
inlinevirtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 145 of file RecorderImpl.h.

145 { return WeakRcHandle<ICE::Endpoint>(); }

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::RecorderImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 950 of file RecorderImpl.cpp.

References OpenDDS::DCPS::EntityImpl::get_entity_instance_handle(), participant_servant_, OpenDDS::DCPS::rchandle_from(), and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

951 {
953 }
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ get_listener()

RecorderListener_rch OpenDDS::DCPS::RecorderImpl::get_listener ( )
virtual

Get the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 836 of file RecorderImpl.cpp.

References listener_.

837 {
838  return listener_;
839 }
RecorderListener_rch listener_
Definition: RecorderImpl.h:185

◆ get_priority_value()

CORBA::Long OpenDDS::DCPS::RecorderImpl::get_priority_value ( const AssociationData data) const
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 169 of file RecorderImpl.cpp.

References OpenDDS::DCPS::AssociationData::publication_transport_priority_.

170 {
171  return data.publication_transport_priority_;
172 }

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::get_qos ( DDS::SubscriberQos subscriber_qos,
DDS::DataReaderQos datareader_qos 
)
virtual

Get the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 816 of file RecorderImpl.cpp.

References passed_qos_, DDS::RETCODE_OK, and subqos_.

819 {
820  qos = passed_qos_;
821  subscriber_qos = subqos_;
822  return DDS::RETCODE_OK;
823 }
const ReturnCode_t RETCODE_OK
DDS::DataReaderQos passed_qos_
Definition: RecorderImpl.h:165
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179

◆ init()

void OpenDDS::DCPS::RecorderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
RecorderListener_rch  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
DDS::SubscriberQos  subqos 
)

Definition at line 113 of file RecorderImpl.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), is_exclusive_ownership_, DDS::OwnershipQosPolicy::kind, listener_, listener_mask_, LM_DEBUG, owner_manager_, DDS::DataReaderQos::ownership, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant(), participant_servant_, passed_qos_, qos_, subqos_, topic_desc_, and topic_servant_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

120 {
121  if (DCPS_debug_level >= 2) {
122  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::init\n"));
123  }
124 
125 
126  topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
127  if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
128  topic_servant_ = a_topic;
129  }
130 
131  CORBA::String_var topic_name = a_topic_desc->get_name();
132  qos_ = qos;
133  passed_qos_ = qos;
134 
135 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
137 #endif
138 
139  listener_ = a_listener;
140  listener_mask_ = mask;
141 
142  // Only store the participant pointer, since it is our "grand"
143  // parent, we will exist as long as it does
145 
146 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
149  }
150 #endif
151 
153  subqos_ = subqos;
154 }
#define ACE_DEBUG(X)
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
TopicDescriptionPtr< TopicImpl > topic_servant_
Definition: RecorderImpl.h:171
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
DDS::StatusMask listener_mask_
Definition: RecorderImpl.h:184
DDS::DataReaderQos passed_qos_
Definition: RecorderImpl.h:165
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
OwnershipQosPolicy ownership
DDS::TopicDescription_var topic_desc_
Definition: RecorderImpl.h:183
OwnershipManager * owner_manager_
Definition: RecorderImpl.h:176
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179
OwnershipQosPolicyKind kind

◆ lookup_instance_handles()

void OpenDDS::DCPS::RecorderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
)
private

Lookup the instance handles by the publication repo ids.

Definition at line 842 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::conv_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::DomainParticipantImpl::lookup_handle(), OPENDDS_STRING, and participant_servant_.

Referenced by remove_associations_i().

844 {
845  CORBA::ULong const num_wrts = ids.length();
846 
847  if (DCPS_debug_level > 9) {
848  OPENDDS_STRING separator = "";
849  OPENDDS_STRING buffer;
850 
851  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
852  buffer += separator + LogGuid(ids[i]).conv_;
853  separator = ", ";
854  }
855 
856  ACE_DEBUG((LM_DEBUG,
857  ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
858  ACE_TEXT("searching for handles for writer Ids: %C.\n"),
859  buffer.c_str()));
860  }
861 
862  hdls.length(num_wrts);
863 
864  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
865  hdls[i] = participant_servant_->lookup_handle(ids[i]);
866  }
867 }
#define ACE_DEBUG(X)
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define OPENDDS_STRING
ACE_CDR::ULong ULong
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ notify_subscription_disconnected()

void OpenDDS::DCPS::RecorderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 215 of file RecorderImpl.cpp.

216 {
217 }

◆ notify_subscription_lost() [1/2]

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 228 of file RecorderImpl.cpp.

Referenced by remove_associations_i().

229 {
230 }

◆ notify_subscription_lost() [2/2]

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles)
private

Definition at line 224 of file RecorderImpl.cpp.

225 {
226 }

◆ notify_subscription_reconnected()

void OpenDDS::DCPS::RecorderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 219 of file RecorderImpl.cpp.

220 {
221 }

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP ( GUID_t  ,
DDS::DynamicType_var   
)
private

◆ OPENDDS_MAP_CMP() [1/2]

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [2/2]

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( GUID_t  ,
RcHandle< WriterInfo ,
GUID_tKeyLessThan   
)
private

publications writing to this reader.

◆ participant()

DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant ( )
inline

Definition at line 129 of file RecorderImpl.h.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), and init().

129  {
130  return participant_servant_;
131  }
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170

◆ register_for_writer()

void OpenDDS::DCPS::RecorderImpl::register_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
virtual

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 956 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_writer().

961 {
962  TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
963 }
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)

◆ remove_all_associations()

void OpenDDS::DCPS::RecorderImpl::remove_all_associations ( )

Definition at line 650 of file RecorderImpl.cpp.

References ACE_GUARD, ACE_READ_GUARD, DBG_ENTRY_LVL, publication_handle_lock_, remove_associations(), OpenDDS::DCPS::TransportClient::transport_stop(), writers_, and writers_lock_.

Referenced by cleanup().

651 {
652  DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
653 
655  int size;
656 
658 
659  {
661 
662  size = static_cast<int>(writers_.size());
663  writers.length(size);
664 
665  WriterMapType::iterator curr_writer = writers_.begin();
666  WriterMapType::iterator end_writer = writers_.end();
667 
668  int i = 0;
669 
670  while (curr_writer != end_writer) {
671  writers[i++] = curr_writer->first;
672  ++curr_writer;
673  }
674  }
675 
676  try {
677  CORBA::Boolean dont_notify_lost = false;
678 
679  if (0 < size) {
680  remove_associations(writers, dont_notify_lost);
681  }
682 
683  } catch (const CORBA::Exception&) {
684  }
685 
686  transport_stop();
687 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
virtual void remove_associations(const WriterIdSeq &writers, CORBA::Boolean callback)
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
ACE_CDR::Boolean Boolean
sequence< GUID_t > WriterIdSeq
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206

◆ remove_associations()

void OpenDDS::DCPS::RecorderImpl::remove_associations ( const WriterIdSeq writers,
CORBA::Boolean  callback 
)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 483 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::get_deleted(), is_bit_, LM_DEBUG, remove_associations_i(), OpenDDS::DCPS::TransportClient::stop_associating(), and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

Referenced by remove_all_associations().

485 {
486  DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
487  if (writers.length() == 0) {
488  return;
489  }
490 
491  if (DCPS_debug_level >= 4) {
492  ACE_DEBUG((LM_DEBUG,
493  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
494  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
495  is_bit_,
496  LogGuid(subscription_id_).c_str(),
497  LogGuid(writers[0]).c_str(),
498  writers.length()));
499  }
500  if (!get_deleted()) {
501  // stop pending associations for these writer ids
502  stop_associating(writers.get_buffer(), writers.length());
503  }
504 
505  remove_associations_i(writers, notify_lost);
506 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)

◆ remove_associations_i()

void OpenDDS::DCPS::RecorderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
)
protectedvirtual

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1

Definition at line 509 of file RecorderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), ACE_WRITE_GUARD, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), dt_map_, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, lookup_instance_handles(), notify_subscription_lost(), OpenDDS::DCPS::RecorderListener::on_recorder_matched(), participant_servant_, publication_handle_lock_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::DomainParticipantImpl::return_handle(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.

Referenced by remove_associations().

511 {
512  DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
513 
514  if (writers.length() == 0) {
515  return;
516  }
517 
518  if (DCPS_debug_level >= 4) {
519  ACE_DEBUG((LM_DEBUG,
520  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
521  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
522  is_bit_,
523  LogGuid(subscription_id_).c_str(),
524  LogGuid(writers[0]).c_str(),
525  writers.length()));
526  }
527  DDS::InstanceHandleSeq handles;
528 
530 
531  // This is used to hold the list of writers which were actually
532  // removed, which is a proper subset of the writers which were
533  // requested to be removed.
534  WriterIdSeq updated_writers;
535 
536  CORBA::ULong wr_len;
537 
538  //Remove the writers from writer list. If the supplied writer
539  //is not in the cached writers list then it is already removed.
540  //We just need remove the writers in the list that have not been
541  //removed.
542  {
544 
545  wr_len = writers.length();
546 
547  for (CORBA::ULong i = 0; i < wr_len; i++) {
548  GUID_t writer_id = writers[i];
549 
550 #ifndef OPENDDS_SAFETY_PROFILE
551  if (dt_map_.erase(writer_id) == 0) {
552  if (DCPS_debug_level >= 4) {
553  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::remove_associations_i: -"
554  "failed to find writer_id in the DynamicTypeByPubId map.\n"));
555  }
556  }
557 #endif
558 
559  WriterMapType::iterator it = writers_.find(writer_id);
560  if (it != writers_.end()) {
561  it->second->removed();
562  }
563 
564  if (writers_.erase(writer_id) == 0) {
565  if (DCPS_debug_level >= 4) {
566  ACE_DEBUG((LM_DEBUG,
567  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
568  ACE_TEXT("the writer local %C was already removed.\n"),
569  LogGuid(writer_id).c_str()));
570  }
571 
572  } else {
573  push_back(updated_writers, writer_id);
574  }
575  }
576  }
577 
578  wr_len = updated_writers.length();
579 
580  // Return now if the supplied writers have been removed already.
581  if (wr_len == 0) {
582  return;
583  }
584 
585  if (!is_bit_) {
586  // The writer should be in the id_to_handle map at this time. Note
587  // it if it not there.
588  lookup_instance_handles(updated_writers, handles);
589 
590  for (CORBA::ULong i = 0; i < wr_len; ++i) {
591  id_to_handle_map_.erase(updated_writers[i]);
592  }
593  }
594  for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
595  disassociate(updated_writers[i]);
596  }
597 
598  // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
599  if (!is_bit_) {
600  // Derive the change in the number of publications writing to this reader.
601  int matchedPublications = static_cast<int>(id_to_handle_map_.size());
603  = matchedPublications - subscription_match_status_.current_count;
604 
605  // Only process status if the number of publications has changed.
607  subscription_match_status_.current_count = matchedPublications;
608  /// Section 7.1.4.1: total_count will not decrement.
609 
610  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
612  = handles[ wr_len - 1];
613 
614  // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
615 
616  // DDS::DataReaderListener_var listener
617  // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
618 
619  if (listener_.in()) {
621  this,
623 
624  // Client will look at it so next time it looks the change should be 0
627  }
628 
629  // notify_status_condition();
630  }
631  }
632 
633  // If this remove_association is invoked when the InfoRepo
634  // detects a lost writer then make a callback to notify
635  // subscription lost.
636  if (notify_lost) {
637  notify_subscription_lost(handles);
638  }
639 
640  // if (monitor_) {
641  // monitor_->report();
642  // }
643 
644  for (unsigned int i = 0; i < handles.length(); ++i) {
645  participant_servant_->return_handle(handles[i]);
646  }
647 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
DDS::SubscriptionMatchedStatus subscription_match_status_
Definition: RecorderImpl.h:194
void return_handle(DDS::InstanceHandle_t handle)
void disassociate(const GUID_t &peerId)
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
virtual void on_recorder_matched(Recorder *recorder, const DDS::SubscriptionMatchedStatus &status)=0
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
DynamicTypeByPubId dt_map_
Definition: RecorderImpl.h:210
ACE_CDR::ULong ULong
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
virtual void notify_subscription_lost(const WriterIdSeq &pubids)
RepoIdToHandleMap id_to_handle_map_
Definition: RecorderImpl.h:191
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
sequence< GUID_t > WriterIdSeq
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206

◆ repoid_to_bit_key()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::repoid_to_bit_key ( const DCPS::GUID_t id,
DDS::BuiltinTopicKey_t key 
)
virtual

Find the bit key for a given repo id.

Implements OpenDDS::DCPS::Recorder.

Definition at line 975 of file RecorderImpl.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::DomainParticipantImpl::lookup_handle(), participant_servant_, publication_handle_lock_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

977 {
978  const DDS::InstanceHandle_t publication_handle = participant_servant_->lookup_handle(id);
979 
981  guard,
984 
985  DDS::PublicationBuiltinTopicDataSeq data;
986 
987  DDS::ReturnCode_t const ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
990  publication_handle,
991  data);
992 
993  if (ret == DDS::RETCODE_OK) {
994  key = data[0].key;
995  }
996 
997  return ret;
998 }
const ReturnCode_t RETCODE_OK
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
const char *const BUILT_IN_PUBLICATION_TOPIC
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_listener ( const RecorderListener_rch a_listener,
DDS::StatusMask  mask 
)
virtual

Change the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 826 of file RecorderImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

828 {
829  listener_mask_ = mask;
830  //note: OK to duplicate a nil object ref
831  listener_ = a_listener;
832  return DDS::RETCODE_OK;
833 }
const ReturnCode_t RETCODE_OK
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
DDS::StatusMask listener_mask_
Definition: RecorderImpl.h:184

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_qos ( const DDS::SubscriberQos subscriber_qos,
const DDS::DataReaderQos datareader_qos 
)
virtual

Set the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 757 of file RecorderImpl.cpp.

References ACE_ERROR, OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::EntityImpl::is_enabled(), LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

760 {
762 
763  if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
764  if (subqos_ != subscriber_qos) {
765  // for the not changeable qos, it can be changed before enable
766  if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_) {
768 
769  } else {
770  subqos_ = subscriber_qos;
771  }
772  }
773  } else {
775  }
776 
780 
781  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
782  if (qos_ == qos)
783  return DDS::RETCODE_OK;
784 
785  if (!Qos_Helper::changeable(qos_, qos) && is_enabled()) {
787 
788  } else {
789  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
790  const bool status =
791  disco->update_subscription_qos(
795  qos,
796  subscriber_qos);
797  if (!status) {
798  if (log_level >= LogLevel::Notice) {
799  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::set_qos: qos not updated\n"));
800  }
801  return DDS::RETCODE_ERROR;
802  }
803  }
804 
805  qos_ = qos;
806  subqos_ = subscriber_qos;
807 
808  return DDS::RETCODE_OK;
809 
810  } else {
812  }
813 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
const ReturnCode_t RETCODE_ERROR
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
const ReturnCode_t RETCODE_UNSUPPORTED
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
#define TheServiceParticipant
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)

◆ signal_liveliness()

void OpenDDS::DCPS::RecorderImpl::signal_liveliness ( const GUID_t remote_participant)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 728 of file RecorderImpl.cpp.

References ACE_GUARD, ACE_READ_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.

729 {
730  GUID_t prefix = remote_participant;
731  prefix.entityId = EntityId_t();
732 
734 
735  typedef std::pair<GUID_t, RcHandle<WriterInfo> > WriterSetElement;
736  typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
737  WriterSet writers;
738 
739  {
741  for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
742  limit = writers_.end();
743  pos != limit && equal_guid_prefixes(pos->first, prefix);
744  ++pos) {
745  writers.push_back(std::make_pair(pos->first, pos->second));
746  }
747  }
748 
750  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
751  pos != limit;
752  ++pos) {
753  pos->second->received_activity(now);
754  }
755 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_VECTOR(WeakRcHandle< TransportImpl >) ImplsType
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Definition: RecorderImpl.h:168
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206

◆ unregister_for_writer()

void OpenDDS::DCPS::RecorderImpl::unregister_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid 
)
virtual

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 966 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_writer().

969 {
971 }
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)

◆ update_incompatible_qos()

void OpenDDS::DCPS::RecorderImpl::update_incompatible_qos ( const IncompatibleQosStatus status)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 690 of file RecorderImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, publication_handle_lock_, requested_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

691 {
693  guard,
695 
696  if (requested_incompatible_qos_status_.total_count == status.total_count) {
697  // This test should make the method idempotent.
698  return;
699  }
700 
701  // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
702  // true);
703 
704  // copy status and increment change
705  requested_incompatible_qos_status_.total_count = status.total_count;
707  status.count_since_last_send;
709  status.last_policy_id;
711 
712  // if (!CORBA::is_nil(listener.in())) {
713  // listener->on_requested_incompatible_qos(this,
714  // requested_incompatible_qos_status_);
715  //
716  // // TBD - why does the spec say to change total_count_change but not
717  // // change the ChangeFlagStatus after a listener call?
718  //
719  // // client just looked at it so next time it looks the
720  // // change should be 0
721  // requested_incompatible_qos_status_.total_count_change = 0;
722  // }
723  //
724  // notify_status_condition();
725 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
Definition: RecorderImpl.h:193

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 181 of file RecorderImpl.h.

Member Data Documentation

◆ check_encap_

bool OpenDDS::DCPS::RecorderImpl::check_encap_
private

Definition at line 212 of file RecorderImpl.h.

Referenced by data_received().

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id_
private

Definition at line 186 of file RecorderImpl.h.

Referenced by cleanup(), enable(), init(), and set_qos().

◆ dt_map_

DynamicTypeByPubId OpenDDS::DCPS::RecorderImpl::dt_map_
private

◆ id_to_handle_map_

RepoIdToHandleMap OpenDDS::DCPS::RecorderImpl::id_to_handle_map_
private

Definition at line 191 of file RecorderImpl.h.

Referenced by add_association(), and remove_associations_i().

◆ is_bit_

bool OpenDDS::DCPS::RecorderImpl::is_bit_
private

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 198 of file RecorderImpl.h.

Referenced by add_association(), remove_associations(), and remove_associations_i().

◆ is_exclusive_ownership_

bool OpenDDS::DCPS::RecorderImpl::is_exclusive_ownership_
private

Definition at line 174 of file RecorderImpl.h.

Referenced by init().

◆ listener_

RecorderListener_rch OpenDDS::DCPS::RecorderImpl::listener_
private

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::RecorderImpl::listener_mask_
private

Definition at line 184 of file RecorderImpl.h.

Referenced by init(), and set_listener().

◆ mb_alloc_

TransportMessageBlockAllocator OpenDDS::DCPS::RecorderImpl::mb_alloc_
private

Definition at line 214 of file RecorderImpl.h.

Referenced by data_received().

◆ owner_manager_

OwnershipManager* OpenDDS::DCPS::RecorderImpl::owner_manager_
private

Definition at line 176 of file RecorderImpl.h.

Referenced by init().

◆ participant_servant_

DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant_servant_
private

◆ passed_qos_

DDS::DataReaderQos OpenDDS::DCPS::RecorderImpl::passed_qos_
private

Definition at line 165 of file RecorderImpl.h.

Referenced by get_qos(), and init().

◆ publication_handle_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::publication_handle_lock_
private

◆ qos_

DDS::DataReaderQos OpenDDS::DCPS::RecorderImpl::qos_
private

Definition at line 164 of file RecorderImpl.h.

Referenced by check_transport_qos(), enable(), init(), and set_qos().

◆ requested_incompatible_qos_status_

DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::RecorderImpl::requested_incompatible_qos_status_
private

Definition at line 193 of file RecorderImpl.h.

Referenced by RecorderImpl(), and update_incompatible_qos().

◆ sample_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::sample_lock_
private

lock protecting sample container as well as statuses.

Definition at line 168 of file RecorderImpl.h.

Referenced by add_association(), data_received(), and signal_liveliness().

◆ subqos_

DDS::SubscriberQos OpenDDS::DCPS::RecorderImpl::subqos_
private

Definition at line 179 of file RecorderImpl.h.

Referenced by enable(), get_qos(), init(), and set_qos().

◆ subscription_match_status_

DDS::SubscriptionMatchedStatus OpenDDS::DCPS::RecorderImpl::subscription_match_status_
private

Definition at line 194 of file RecorderImpl.h.

Referenced by add_association(), RecorderImpl(), and remove_associations_i().

◆ topic_desc_

DDS::TopicDescription_var OpenDDS::DCPS::RecorderImpl::topic_desc_
private

Definition at line 183 of file RecorderImpl.h.

Referenced by init().

◆ topic_servant_

TopicDescriptionPtr<TopicImpl> OpenDDS::DCPS::RecorderImpl::topic_servant_
private

Definition at line 171 of file RecorderImpl.h.

Referenced by enable(), and init().

◆ writers_

WriterMapType OpenDDS::DCPS::RecorderImpl::writers_
private

◆ writers_lock_

ACE_RW_Thread_Mutex OpenDDS::DCPS::RecorderImpl::writers_lock_
private

RW lock for reading/writing publications.

Definition at line 206 of file RecorderImpl.h.

Referenced by add_association(), remove_all_associations(), remove_associations_i(), and signal_liveliness().


The documentation for this class was generated from the following files: