OpenDDS  Snapshot(2023/04/28-20:55)
RecorderImpl.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_RECORDERIMPL_H
9 #define OPENDDS_DCPS_RECORDERIMPL_H
10 
11 #include "RcObject.h"
12 #include "WriterInfo.h"
13 #include "Definitions.h"
14 #include "DataReaderCallbacks.h"
15 #include "Recorder.h"
16 #include "EntityImpl.h"
17 #include "TopicImpl.h"
18 #include "OwnershipManager.h"
19 
24 
25 #include <dds/DdsDcpsTopicC.h>
26 #include <dds/DdsDcpsSubscriptionExtC.h>
27 #include <dds/DdsDcpsDomainC.h>
28 #include <dds/DdsDcpsTopicC.h>
29 
31 
32 namespace OpenDDS {
33 namespace DCPS {
34 
35 /**
36  * @class RecorderImpl
37  *
38  * @brief Implementation of Recorder functionality
39  *
40  * This class is the implementation of the Recorder.
41  * Inheritance is used to limit the applications access to
42  * underlying system methods.
43  */
44 
46  : public TransportClient
48  , public DataReaderCallbacks
49  , public Recorder
50  , public EntityImpl
51  , private WriterInfoListener
52 {
53 public:
54  RecorderImpl();
55 
56  virtual ~RecorderImpl();
57 
58  /**
59  * cleanup the DataWriter.
60  */
61  DDS::ReturnCode_t cleanup();
62 
63  void init(
64  TopicDescriptionImpl* a_topic_desc,
65  const DDS::DataReaderQos & qos,
66  RecorderListener_rch a_listener,
67  const DDS::StatusMask & mask,
68  DomainParticipantImpl* participant,
69  DDS::SubscriberQos subqos);
70 
71  DDS::ReturnCode_t enable();
72 
73  // Implement TransportClient
74  virtual bool check_transport_qos(const TransportInst& inst);
75  virtual GUID_t get_guid() const;
76  DDS::DomainId_t domain_id() const { return this->domain_id_; }
77  virtual CORBA::Long get_priority_value(const AssociationData& data) const;
78 
79  //Implement TransportReceiveListener
80  virtual void data_received(const ReceivedDataSample& sample);
81  virtual void notify_subscription_disconnected(const WriterIdSeq& pubids);
82  virtual void notify_subscription_reconnected(const WriterIdSeq& pubids);
83  virtual void notify_subscription_lost(const WriterIdSeq& pubids);
84 
85  // Implement DataReaderCallbacks
86 
87  virtual void add_association(const GUID_t& yourId,
88  const WriterAssociation& writer,
89  bool active);
90 
91  virtual void remove_associations(const WriterIdSeq& writers,
92  CORBA::Boolean callback);
93 
94  virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
95 
96  virtual void signal_liveliness(const GUID_t& remote_participant);
97 
98  void remove_all_associations();
99 
100 #ifndef OPENDDS_SAFETY_PROFILE
101  void add_to_dynamic_type_map(const GUID_t& pub_id, const XTypes::TypeIdentifier& ti);
102 #endif
103 
104 #if !defined (DDS_HAS_MINIMUM_BIT)
105  // implement Recoder
106  virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::GUID_t& id,
108 #endif
109  /**
110  * Set the Quality of Service settings for the Recorder.
111  *
112  */
113  DDS::ReturnCode_t set_qos (const DDS::SubscriberQos & subscriber_qos,
114  const DDS::DataReaderQos & datareader_qos);
115 
116  /**
117  * Get the Quality of Service settings for the Recorder.
118  *
119  */
120  DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos,
121  DDS::DataReaderQos & datareader_qos);
122 
123 
124  DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener,
125  DDS::StatusMask mask);
126 
127  RecorderListener_rch get_listener();
128 
130  return participant_servant_;
131  }
132 
133  virtual DDS::InstanceHandle_t get_instance_handle();
134 
135  virtual void register_for_writer(const GUID_t& /*participant*/,
136  const GUID_t& /*readerid*/,
137  const GUID_t& /*writerid*/,
138  const TransportLocatorSeq& /*locators*/,
139  DiscoveryListener* /*listener*/);
140 
141  virtual void unregister_for_writer(const GUID_t& /*participant*/,
142  const GUID_t& /*readerid*/,
143  const GUID_t& /*writerid*/);
144 
146 
147 protected:
148  virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
149 
150 private:
151 
152  void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
153 
154  /// Lookup the instance handles by the publication repo ids
155  void lookup_instance_handles(const WriterIdSeq& ids,
156  DDS::InstanceHandleSeq& hdls);
157 
158 #ifndef OPENDDS_SAFETY_PROFILE
159  DDS::DynamicData_ptr get_dynamic_data(const RawDataSample& sample);
160 #endif
161  void check_encap(bool b) { check_encap_ = b; }
162  bool check_encap() const { return check_encap_; }
163 
166 
167  /// lock protecting sample container as well as statuses.
169 
172 
173 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
175 
177 #endif
178 
180 
181  friend class ::DDS_TEST; //allows tests to get at private data
182 
183  DDS::TopicDescription_var topic_desc_;
187 
189 
190  typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
191  RepoIdToHandleMap id_to_handle_map_;
192 
195 
196  /// Flag indicates that this datareader is a builtin topic
197  /// datareader.
198  bool is_bit_;
199 
200  /// publications writing to this reader.
202  GUID_tKeyLessThan) WriterMapType;
203  WriterMapType writers_;
204 
205  /// RW lock for reading/writing publications.
207 
208 #ifndef OPENDDS_SAFETY_PROFILE
209  typedef OPENDDS_MAP(GUID_t, DDS::DynamicType_var) DynamicTypeByPubId;
210  DynamicTypeByPubId dt_map_;
211 #endif
213 
215 };
216 
217 
218 } // namespace DCPS
219 } // namespace
220 
222 
223 #endif /* end of include guard: OPENDDS_DCPS_RECORDERIMPL_H */
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: RecorderImpl.h:145
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
DDS::StatusMask listener_mask_
Definition: RecorderImpl.h:184
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
sequence< octet > key
Defines the interface for Discovery callbacks into the DataReader.
Implementation of Recorder functionality.
Definition: RecorderImpl.h:45
DDS::TopicDescription_var topic_desc_
Definition: RecorderImpl.h:183
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179
sequence< TransportLocator > TransportLocatorSeq
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206
DOMAINID_TYPE_NATIVE DomainId_t
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
TransportMessageBlockAllocator mb_alloc_
Definition: RecorderImpl.h:214
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
Definition: RecorderImpl.h:193
ACE_CDR::Boolean Boolean
Holds a data sample received by the transport.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
RepoIdToHandleMap id_to_handle_map_
Definition: RecorderImpl.h:191
int init(void)
DDS::DataReaderQos passed_qos_
Definition: RecorderImpl.h:165
sequence< GUID_t > WriterIdSeq
OwnershipManager * owner_manager_
Definition: RecorderImpl.h:176
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Definition: RecorderImpl.h:168
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
Mix-in class for DDS entities which directly use the transport layer.
unsigned long StatusMask
DDS::DomainId_t domain_id() const
Definition: RecorderImpl.h:76
DDS::SubscriptionMatchedStatus subscription_match_status_
Definition: RecorderImpl.h:194
Implements the DDS::TopicDescription interface.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DynamicTypeByPubId dt_map_
Definition: RecorderImpl.h:210
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
TopicDescriptionPtr< TopicImpl > topic_servant_
Definition: RecorderImpl.h:171
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.