00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
00009 #define OPENDDS_DCPS_REPLAYERIMPL_H
00010
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "WriteDataContainer.h"
00017 #include "Definitions.h"
00018 #include "DataSampleHeader.h"
00019 #include "TopicImpl.h"
00020 #include "Qos_Helper.h"
00021 #include "CoherentChangeControl.h"
00022 #include "GuidUtils.h"
00023
00024 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00025 #include "FilterEvaluator.h"
00026 #endif
00027
00028 #include "ace/Event_Handler.h"
00029 #include "ace/OS_NS_sys_time.h"
00030 #include "ace/Condition_T.h"
00031 #include "ace/Condition_Recursive_Thread_Mutex.h"
00032
00033 #include <memory>
00034
00035 #include "Replayer.h"
00036
00037 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00038 #pragma once
00039 #endif
00040
00041 class DDS_TEST;
00042 namespace OpenDDS {
00043 namespace DCPS {
00044
00045 class SendStateDataSampleList;
00046 class DataSampleElement;
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 class OpenDDS_Dcps_Export ReplayerImpl : public Replayer,
00059 public TransportClient,
00060 public TransportSendListener,
00061 public DataWriterCallbacks,
00062 public EntityImpl
00063 {
00064 public:
00065
00066 ReplayerImpl();
00067 ~ReplayerImpl();
00068
00069
00070
00071
00072 DDS::ReturnCode_t cleanup();
00073
00074
00075
00076
00077 virtual void init(
00078 DDS::Topic_ptr topic,
00079 TopicImpl* topic_servant,
00080 const DDS::DataWriterQos & qos,
00081 ReplayerListener_rch a_listener,
00082 const DDS::StatusMask & mask,
00083 OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00084 const DDS::PublisherQos& publisher_qos);
00085
00086
00087
00088
00089 virtual DDS::ReturnCode_t write (const RawDataSample& sample );
00090 virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00091 const RawDataSample& sample );
00092 virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00093 const RawDataSampleList& samples );
00094 virtual DDS::ReturnCode_t set_qos (const ::DDS::PublisherQos & publisher_qos,
00095 const DDS::DataWriterQos & datawriter_qos);
00096 virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos & publisher_qos,
00097 DDS::DataWriterQos & datawriter_qos);
00098 virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
00099 DDS::StatusMask mask);
00100 virtual ReplayerListener_rch get_listener ();
00101
00102
00103 virtual bool check_transport_qos(const TransportInst& inst);
00104 virtual const RepoId& get_repo_id() const;
00105 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00106 virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00107
00108
00109 virtual void data_delivered(const DataSampleElement* sample);
00110 virtual void data_dropped(const DataSampleElement* sample,
00111 bool dropped_by_transport);
00112
00113 virtual void control_delivered(ACE_Message_Block* sample);
00114 virtual void control_dropped(ACE_Message_Block* sample,
00115 bool dropped_by_transport);
00116
00117 virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
00118 virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
00119 virtual void notify_publication_lost(const ReaderIdSeq& subids);
00120 virtual void notify_connection_deleted(const RepoId&);
00121
00122
00123 int data_dropped_count_;
00124 int data_delivered_count_;
00125
00126
00127 virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
00128
00129
00130 virtual void add_association(const RepoId& yourId,
00131 const ReaderAssociation& reader,
00132 bool active);
00133
00134 virtual void association_complete(const RepoId& remote_id);
00135
00136 virtual void remove_associations(const ReaderIdSeq& readers,
00137 CORBA::Boolean callback);
00138
00139 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00140
00141 virtual void update_subscription_params(const RepoId& readerId,
00142 const DDS::StringSeq& exprParams);
00143
00144 virtual void inconsistent_topic();
00145
00146 void remove_all_associations();
00147
00148 virtual void register_for_reader(const RepoId& participant,
00149 const RepoId& writerid,
00150 const RepoId& readerid,
00151 const TransportLocatorSeq& locators,
00152 DiscoveryListener* listener);
00153
00154 virtual void unregister_for_reader(const RepoId& participant,
00155 const RepoId& writerid,
00156 const RepoId& readerid);
00157
00158 DDS::ReturnCode_t enable();
00159
00160 DomainParticipantImpl* participant() {
00161 return participant_servant_;
00162 }
00163
00164 virtual DDS::InstanceHandle_t get_instance_handle();
00165
00166 private:
00167
00168 void _add_ref() { EntityImpl::_add_ref(); }
00169 void _remove_ref() { EntityImpl::_remove_ref(); }
00170
00171 void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00172
00173 DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
00174
00175 DDS::ReturnCode_t
00176 create_sample_data_message(DataSample* data,
00177 DataSampleHeader& header_data,
00178 ACE_Message_Block*& message,
00179 const DDS::Time_t& source_timestamp,
00180 bool content_filter);
00181 bool need_sequence_repair() const;
00182
00183
00184 bool lookup_instance_handles(const ReaderIdSeq& ids,
00185 DDS::InstanceHandleSeq& hdls);
00186
00187 size_t n_chunks_;
00188
00189
00190 size_t association_chunk_multiplier_;
00191
00192
00193 CORBA::String_var type_name_;
00194
00195
00196 DDS::DataWriterQos qos_;
00197
00198
00199
00200 DomainParticipantImpl* participant_servant_;
00201
00202 struct ReaderInfo {
00203 SequenceNumber expected_sequence_;
00204 bool durable_;
00205 ReaderInfo(const char* filter, const DDS::StringSeq& params,
00206 DomainParticipantImpl* participant, bool durable);
00207 ~ReaderInfo();
00208 };
00209
00210 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00211 RepoIdToReaderInfoMap reader_info_;
00212
00213 void association_complete_i(const RepoId& remote_id);
00214
00215 friend class ::DDS_TEST;
00216
00217
00218 CORBA::String_var topic_name_;
00219
00220 RepoId topic_id_;
00221
00222 DDS::Topic_var topic_objref_;
00223
00224 TopicImpl* topic_servant_;
00225
00226
00227
00228 DDS::StatusMask listener_mask_;
00229
00230 ReplayerListener_rch listener_;
00231
00232 DDS::DomainId_t domain_id_;
00233
00234 PublisherImpl* publisher_servant_;
00235 DDS::PublisherQos publisher_qos_;
00236
00237
00238 PublicationId publication_id_;
00239
00240 SequenceNumber sequence_number_;
00241
00242
00243
00244
00245
00246 ACE_Recursive_Thread_Mutex lock_;
00247
00248 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00249
00250 RepoIdToHandleMap id_to_handle_map_;
00251
00252 RepoIdSet readers_;
00253
00254
00255
00256
00257 DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
00258 DDS::PublicationMatchedStatus publication_match_status_;
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 std::auto_ptr<MessageBlockAllocator> mb_allocator_;
00278
00279 std::auto_ptr<DataBlockAllocator> db_allocator_;
00280
00281 std::auto_ptr<DataSampleHeaderAllocator> header_allocator_;
00282
00283
00284
00285 std::auto_ptr<DataSampleElementAllocator> sample_list_element_allocator_;
00286
00287
00288
00289
00290
00291 std::auto_ptr<TransportSendElementAllocator> transport_send_element_allocator_;
00292
00293 std::auto_ptr<TransportCustomizedElementAllocator> transport_customized_element_allocator_;
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314 bool is_bit_;
00315
00316
00317
00318
00319 typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan)
00320 RepoIdToSequenceMap;
00321
00322 RepoIdToSequenceMap idToSequence_;
00323
00324 RepoIdSet pending_readers_, assoc_complete_readers_;
00325
00326 ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00327 int pending_write_count_;
00328 };
00329
00330 }
00331 }
00332
00333 #endif