OpenDDS  Snapshot(2023/04/28-20:55)
ReplayerImpl.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
9 #define OPENDDS_DCPS_REPLAYERIMPL_H
10 
11 #include "Replayer.h"
12 #include "DataWriterCallbacks.h"
13 #include "WriteDataContainer.h"
14 #include "Definitions.h"
15 #include "DataSampleHeader.h"
16 #include "TopicImpl.h"
17 #include "Time_Helper.h"
18 #include "CoherentChangeControl.h"
19 #include "GuidUtils.h"
20 #include "unique_ptr.h"
21 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
22 # include "FilterEvaluator.h"
23 #endif
24 #include "ConditionVariable.h"
27 
28 #include <dds/DdsDcpsDomainC.h>
29 #include <dds/DdsDcpsTopicC.h>
30 
31 #include <ace/Event_Handler.h>
32 #include <ace/OS_NS_sys_time.h>
34 
35 #include <memory>
36 
37 #if !defined (ACE_LACKS_PRAGMA_ONCE)
38 #pragma once
39 #endif /* ACE_LACKS_PRAGMA_ONCE */
40 
41 class DDS_TEST;
42 
44 
45 namespace OpenDDS {
46 namespace DCPS {
47 
48 class SendStateDataSampleList;
49 class DataSampleElement;
50 
51 /**
52  * @class ReplayerImpl
53  *
54  * @brief Implementation of Replayer functionality
55  *
56  * This class is the implementation of the Replayer.
57  * Inheritance is used to limit the applications access to
58  * underlying system methods.
59  */
60 
62  public TransportClient,
63  public TransportSendListener,
64  public DataWriterCallbacks,
65  public EntityImpl
66 {
67 public:
68  ReplayerImpl();
69  ~ReplayerImpl();
70 
71  /**
72  * cleanup the DataWriter.
73  */
74  DDS::ReturnCode_t cleanup();
75 
76  /**
77  * Initialize the data members.
78  */
79  virtual void init(
80  DDS::Topic_ptr topic,
81  TopicImpl* topic_servant,
82  const DDS::DataWriterQos & qos,
83  ReplayerListener_rch a_listener,
84  const DDS::StatusMask & mask,
85  OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
86  const DDS::PublisherQos& publisher_qos);
87 
88  // Implement Replayer
89  virtual DDS::ReturnCode_t write (const RawDataSample& sample );
90  virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
91  const RawDataSample& sample );
92  virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
93  const RawDataSampleList& samples );
94  virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos & publisher_qos,
95  const DDS::DataWriterQos & datawriter_qos);
96  virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos & publisher_qos,
97  DDS::DataWriterQos & datawriter_qos);
98  virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
99  DDS::StatusMask mask);
100  virtual ReplayerListener_rch get_listener ();
101 
102  // Implement TransportClient
103  virtual bool check_transport_qos(const TransportInst& inst);
104  virtual GUID_t get_guid() const;
105  DDS::DomainId_t domain_id() const { return this->domain_id_; }
106  virtual CORBA::Long get_priority_value(const AssociationData& data) const;
107  SequenceNumber get_max_sn() const { return sequence_number_; }
108 
109 
110  // Implement TransportSendListener
111  virtual void data_delivered(const DataSampleElement* sample);
112  virtual void data_dropped(const DataSampleElement* sample,
113  bool dropped_by_transport);
114 
115  virtual void control_delivered(const Message_Block_Ptr& sample);
116  virtual void control_dropped(const Message_Block_Ptr& sample,
117  bool dropped_by_transport);
118 
119  virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
120  virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
121  virtual void notify_publication_lost(const ReaderIdSeq& subids);
122 
123  /// Statistics counter.
126 
127 
128  virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
129 
130  // implement DataWriterCallbacks
131  virtual void add_association(const GUID_t& yourId,
132  const ReaderAssociation& reader,
133  bool active);
134 
135  virtual void remove_associations(const ReaderIdSeq& readers,
136  CORBA::Boolean callback);
137 
138  virtual void replay_durable_data_for(const GUID_t&) {}
139 
140  virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
141 
142  virtual void update_subscription_params(const GUID_t& readerId,
143  const DDS::StringSeq& exprParams);
144 
145  void remove_all_associations();
146 
147  virtual void register_for_reader(const GUID_t& participant,
148  const GUID_t& writerid,
149  const GUID_t& readerid,
150  const TransportLocatorSeq& locators,
151  DiscoveryListener* listener);
152 
153  virtual void unregister_for_reader(const GUID_t& participant,
154  const GUID_t& writerid,
155  const GUID_t& readerid);
156 
158 
159  DDS::ReturnCode_t enable();
160 
162  return participant_servant_;
163  }
164 
165  virtual DDS::InstanceHandle_t get_instance_handle();
166 
167 private:
168  void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
169 
170  DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
171 
173  create_sample_data_message(Message_Block_Ptr data,
174  DataSampleHeader& header_data,
175  Message_Block_Ptr& message,
176  const DDS::Time_t& source_timestamp,
177  bool content_filter);
178  bool need_sequence_repair() const;
179 
180  /// Lookup the instance handles by the subscription repo ids
181  void lookup_instance_handles(const ReaderIdSeq& ids,
182  DDS::InstanceHandleSeq& hdls);
183  /// The number of chunks for the cached allocator.
184  size_t n_chunks_;
185 
186  /// The multiplier for allocators affected by associations
188 
189  /// The type name of associated topic.
191 
192  /// The qos policy list of this datawriter.
194  /// The qos policy passed in by the user.
195  /// Differs from qos_ because representation has been interpreted.
197 
198  /// The participant servant which creats the publisher that
199  /// creates this datawriter.
201 
202  struct ReaderInfo {
204  bool durable_;
205  ReaderInfo(const char* filter, const DDS::StringSeq& params,
206  DomainParticipantImpl* participant, bool durable);
207  ~ReaderInfo();
208  };
209 
210  typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
211  RepoIdToReaderInfoMap reader_info_;
212 
213  void association_complete_i(const GUID_t& remote_id);
214 
215  friend class ::DDS_TEST; // allows tests to get at privates
216 
217  /// The name of associated topic.
219  /// The associated topic repository id.
221  /// The object reference of the associated topic.
222  DDS::Topic_var topic_objref_;
223  /// The topic servant.
225 
226  /// The StatusKind bit mask indicates which status condition change
227  /// can be notified by the listener of this entity.
229  /// Used to notify the entity for relevant events.
231  /// The domain id.
233  /// The publisher servant which creates this datawriter.
236 
237  /// The repository id of this datawriter/publication.
239  /// The sequence number unique in DataWriter scope.
241 
242  /// The sample data container.
243  // WriteDataContainer* data_container_;
244  /// The lock to protect the activate subscriptions
245  /// and status changes.
247 
248  typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
249 
250  RepoIdToHandleMap id_to_handle_map_;
251 
253 
254  /// Status conditions.
255  // DDS::LivelinessLostStatus liveliness_lost_status_;
256  // DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_;
259 
260  /// True if the writer failed to actively signal its liveliness within
261  /// its offered liveliness period.
262  // bool liveliness_lost_;
263 
264  /**
265  * @todo The publication_lost_status_ and
266  * publication_reconnecting_status_ are left here for
267  * future use when we add get_publication_lost_status()
268  * and get_publication_reconnecting_status() methods.
269  */
270  // Statistics of the lost publications due to lost connection.
271  // PublicationLostStatus publication_lost_status_;
272  // Statistics of the publications that associates with a
273  // reconnecting datalink.
274  // PublicationReconnectingStatus publication_reconnecting_status_;
275 
276  // The message block allocator.
278  // The data block allocator.
280  // The header data allocator.
282 
283  /// The cached allocator to allocate DataSampleElement
284  /// objects.
286 
287  /// The orb's reactor to be used to register the liveliness
288  /// timer.
289  // ACE_Reactor_Timer_Interface* reactor_;
290  /// The time interval for sending liveliness message.
291  // ACE_Time_Value liveliness_check_interval_;
292  /// Timestamp of last write/dispose/assert_liveliness.
293  // ACE_Time_Value last_liveliness_activity_time_;
294  /// Total number of offered deadlines missed during last offered
295  /// deadline status check.
296  // CORBA::Long last_deadline_missed_total_count_;
297  /// Watchdog responsible for reporting missed offered
298  /// deadlines.
299  // unique_ptr<OfferedDeadlineWatchdog> watchdog_;
300  /// The flag indicates whether the liveliness timer is scheduled and
301  /// needs be cancelled.
302  // bool cancel_timer_;
303 
304  /// Flag indicates that this datawriter is a builtin topic
305  /// datawriter.
306  bool is_bit_;
307 
309  RepoIdToSequenceMap;
310 
311  RepoIdToSequenceMap idToSequence_;
312 
315 };
316 
317 } // namespace DCPS
318 } // namespace
319 
321 
322 #endif /* end of include guard: OPENDDS_DCPS_REPLAYERIMPL_H */
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
Defines the interface for Discovery callbacks into the DataWriter.
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
DDS::DomainId_t domain_id() const
Definition: ReplayerImpl.h:105
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
Send raw data samples in the system.
Definition: Replayer.h:60
unique_ptr< DataBlockAllocator > db_allocator_
Definition: ReplayerImpl.h:279
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
RepoIdToHandleMap id_to_handle_map_
Definition: ReplayerImpl.h:250
GuidSet RepoIdSet
Definition: GuidUtils.h:113
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
PublisherImpl * publisher_servant_
The publisher servant which creates this datawriter.
Definition: ReplayerImpl.h:234
sequence< TransportLocator > TransportLocatorSeq
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
unique_ptr< DataSampleHeaderAllocator > header_allocator_
Definition: ReplayerImpl.h:281
RepoIdToSequenceMap idToSequence_
Definition: ReplayerImpl.h:311
Implementation of Replayer functionality.
Definition: ReplayerImpl.h:61
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
Definition: ReplayerImpl.h:187
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
DomainParticipantImpl * participant()
Definition: ReplayerImpl.h:161
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
sequence< GUID_t > ReaderIdSeq
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211
DOMAINID_TYPE_NATIVE DomainId_t
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: ReplayerImpl.h:277
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
DDS::PublicationMatchedStatus publication_match_status_
Definition: ReplayerImpl.h:258
SequenceNumber get_max_sn() const
Definition: ReplayerImpl.h:107
ACE_CDR::Boolean Boolean
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
CORBA::String_var type_name_
The type name of associated topic.
Definition: ReplayerImpl.h:190
int init(void)
DDS::DataWriterQos passed_qos_
Definition: ReplayerImpl.h:196
int data_dropped_count_
Statistics counter.
Definition: ReplayerImpl.h:124
size_t n_chunks_
The number of chunks for the cached allocator.
Definition: ReplayerImpl.h:184
GUID_t topic_id_
The associated topic repository id.
Definition: ReplayerImpl.h:220
DDS::StatusMask listener_mask_
Definition: ReplayerImpl.h:228
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
Mix-in class for DDS entities which directly use the transport layer.
unsigned long StatusMask
virtual void replay_durable_data_for(const GUID_t &)
Definition: ReplayerImpl.h:138
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
Sequence number abstraction. Only allows positive 64 bit values.
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: ReplayerImpl.h:157
DDS::Topic_var topic_objref_
The object reference of the associated topic.
Definition: ReplayerImpl.h:222
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
CORBA::String_var topic_name_
The name of associated topic.
Definition: ReplayerImpl.h:218
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
Status conditions.
Definition: ReplayerImpl.h:257
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50