ReplayerImpl.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
00009 #define OPENDDS_DCPS_REPLAYERIMPL_H
00010
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "WriteDataContainer.h"
00017 #include "Definitions.h"
00018 #include "DataSampleHeader.h"
00019 #include "TopicImpl.h"
00020 #include "Time_Helper.h"
00021 #include "CoherentChangeControl.h"
00022 #include "GuidUtils.h"
00023 #include "unique_ptr.h"
00024
00025 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00026 #include "FilterEvaluator.h"
00027 #endif
00028
00029 #include "ace/Event_Handler.h"
00030 #include "ace/OS_NS_sys_time.h"
00031 #include "ace/Condition_Recursive_Thread_Mutex.h"
00032
00033 #include <memory>
00034
00035 #include "Replayer.h"
00036
00037 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00038 #pragma once
00039 #endif
00040
00041 class DDS_TEST;
00042
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044
00045 namespace OpenDDS {
00046 namespace DCPS {
00047
00048 class SendStateDataSampleList;
00049 class DataSampleElement;
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 class OpenDDS_Dcps_Export ReplayerImpl : public Replayer,
00062 public TransportClient,
00063 public TransportSendListener,
00064 public DataWriterCallbacks,
00065 public EntityImpl
00066 {
00067 public:
00068
00069 ReplayerImpl();
00070 ~ReplayerImpl();
00071
00072
00073
00074
00075 DDS::ReturnCode_t cleanup();
00076
00077
00078
00079
00080 virtual void init(
00081 DDS::Topic_ptr topic,
00082 TopicImpl* topic_servant,
00083 const DDS::DataWriterQos & qos,
00084 ReplayerListener_rch a_listener,
00085 const DDS::StatusMask & mask,
00086 OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00087 const DDS::PublisherQos& publisher_qos);
00088
00089
00090
00091
00092 virtual DDS::ReturnCode_t write (const RawDataSample& sample );
00093 virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00094 const RawDataSample& sample );
00095 virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00096 const RawDataSampleList& samples );
00097 virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos & publisher_qos,
00098 const DDS::DataWriterQos & datawriter_qos);
00099 virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos & publisher_qos,
00100 DDS::DataWriterQos & datawriter_qos);
00101 virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
00102 DDS::StatusMask mask);
00103 virtual ReplayerListener_rch get_listener ();
00104
00105
00106 virtual bool check_transport_qos(const TransportInst& inst);
00107 virtual const RepoId& get_repo_id() const;
00108 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00109 virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00110
00111
00112 virtual void data_delivered(const DataSampleElement* sample);
00113 virtual void data_dropped(const DataSampleElement* sample,
00114 bool dropped_by_transport);
00115
00116 virtual void control_delivered(const Message_Block_Ptr& sample);
00117 virtual void control_dropped(const Message_Block_Ptr& sample,
00118 bool dropped_by_transport);
00119
00120 virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
00121 virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
00122 virtual void notify_publication_lost(const ReaderIdSeq& subids);
00123
00124
00125 int data_dropped_count_;
00126 int data_delivered_count_;
00127
00128
00129 virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
00130
00131
00132 virtual void add_association(const RepoId& yourId,
00133 const ReaderAssociation& reader,
00134 bool active);
00135
00136 virtual void association_complete(const RepoId& remote_id);
00137
00138 virtual void remove_associations(const ReaderIdSeq& readers,
00139 CORBA::Boolean callback);
00140
00141 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00142
00143 virtual void update_subscription_params(const RepoId& readerId,
00144 const DDS::StringSeq& exprParams);
00145
00146 virtual void inconsistent_topic();
00147
00148 void remove_all_associations();
00149
00150 virtual void register_for_reader(const RepoId& participant,
00151 const RepoId& writerid,
00152 const RepoId& readerid,
00153 const TransportLocatorSeq& locators,
00154 DiscoveryListener* listener);
00155
00156 virtual void unregister_for_reader(const RepoId& participant,
00157 const RepoId& writerid,
00158 const RepoId& readerid);
00159
00160 DDS::ReturnCode_t enable();
00161
00162 DomainParticipantImpl* participant() {
00163 return participant_servant_;
00164 }
00165
00166 virtual DDS::InstanceHandle_t get_instance_handle();
00167
00168 private:
00169
00170 void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00171
00172 DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
00173
00174 DDS::ReturnCode_t
00175 create_sample_data_message(Message_Block_Ptr data,
00176 DataSampleHeader& header_data,
00177 Message_Block_Ptr& message,
00178 const DDS::Time_t& source_timestamp,
00179 bool content_filter);
00180 bool need_sequence_repair() const;
00181
00182
00183 void lookup_instance_handles(const ReaderIdSeq& ids,
00184 DDS::InstanceHandleSeq& hdls);
00185
00186 size_t n_chunks_;
00187
00188
00189 size_t association_chunk_multiplier_;
00190
00191
00192 CORBA::String_var type_name_;
00193
00194
00195 DDS::DataWriterQos qos_;
00196
00197
00198
00199 DomainParticipantImpl* participant_servant_;
00200
00201 struct ReaderInfo {
00202 SequenceNumber expected_sequence_;
00203 bool durable_;
00204 ReaderInfo(const char* filter, const DDS::StringSeq& params,
00205 DomainParticipantImpl* participant, bool durable);
00206 ~ReaderInfo();
00207 };
00208
00209 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00210 RepoIdToReaderInfoMap reader_info_;
00211
00212 void association_complete_i(const RepoId& remote_id);
00213
00214 friend class ::DDS_TEST;
00215
00216
00217 CORBA::String_var topic_name_;
00218
00219 RepoId topic_id_;
00220
00221 DDS::Topic_var topic_objref_;
00222
00223 TopicDescriptionPtr<TopicImpl> topic_servant_;
00224
00225
00226
00227 DDS::StatusMask listener_mask_;
00228
00229 ReplayerListener_rch listener_;
00230
00231 DDS::DomainId_t domain_id_;
00232
00233 PublisherImpl* publisher_servant_;
00234 DDS::PublisherQos publisher_qos_;
00235
00236
00237 PublicationId publication_id_;
00238
00239 SequenceNumber sequence_number_;
00240
00241
00242
00243
00244
00245 ACE_Recursive_Thread_Mutex lock_;
00246
00247 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00248
00249 RepoIdToHandleMap id_to_handle_map_;
00250
00251 RepoIdSet readers_;
00252
00253
00254
00255
00256 DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
00257 DDS::PublicationMatchedStatus publication_match_status_;
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276 unique_ptr<MessageBlockAllocator> mb_allocator_;
00277
00278 unique_ptr<DataBlockAllocator> db_allocator_;
00279
00280 unique_ptr<DataSampleHeaderAllocator> header_allocator_;
00281
00282
00283
00284 unique_ptr<DataSampleElementAllocator> sample_list_element_allocator_;
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305 bool is_bit_;
00306
00307 typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan)
00308 RepoIdToSequenceMap;
00309
00310 RepoIdToSequenceMap idToSequence_;
00311
00312 RepoIdSet pending_readers_, assoc_complete_readers_;
00313
00314 ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00315 int pending_write_count_;
00316 };
00317
00318 }
00319 }
00320
00321 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00322
00323 #endif