Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_REPLAYERIMPL_H 9 : #define OPENDDS_DCPS_REPLAYERIMPL_H 10 : 11 : #include "Replayer.h" 12 : #include "DataWriterCallbacks.h" 13 : #include "WriteDataContainer.h" 14 : #include "Definitions.h" 15 : #include "DataSampleHeader.h" 16 : #include "TopicImpl.h" 17 : #include "Time_Helper.h" 18 : #include "CoherentChangeControl.h" 19 : #include "GuidUtils.h" 20 : #include "unique_ptr.h" 21 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 22 : # include "FilterEvaluator.h" 23 : #endif 24 : #include "ConditionVariable.h" 25 : #include "transport/framework/TransportSendListener.h" 26 : #include "transport/framework/TransportClient.h" 27 : 28 : #include <dds/DdsDcpsDomainC.h> 29 : #include <dds/DdsDcpsTopicC.h> 30 : 31 : #include <ace/Event_Handler.h> 32 : #include <ace/OS_NS_sys_time.h> 33 : #include <ace/Recursive_Thread_Mutex.h> 34 : 35 : #include <memory> 36 : 37 : #if !defined (ACE_LACKS_PRAGMA_ONCE) 38 : #pragma once 39 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 40 : 41 : class DDS_TEST; 42 : 43 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 44 : 45 : namespace OpenDDS { 46 : namespace DCPS { 47 : 48 : class SendStateDataSampleList; 49 : class DataSampleElement; 50 : 51 : /** 52 : * @class ReplayerImpl 53 : * 54 : * @brief Implementation of Replayer functionality 55 : * 56 : * This class is the implementation of the Replayer. 57 : * Inheritance is used to limit the applications access to 58 : * underlying system methods. 59 : */ 60 : 61 : class OpenDDS_Dcps_Export ReplayerImpl : public Replayer, 62 : public TransportClient, 63 : public TransportSendListener, 64 : public DataWriterCallbacks, 65 : public EntityImpl 66 : { 67 : public: 68 : ReplayerImpl(); 69 : ~ReplayerImpl(); 70 : 71 : /** 72 : * cleanup the DataWriter. 73 : */ 74 : DDS::ReturnCode_t cleanup(); 75 : 76 : /** 77 : * Initialize the data members. 78 : */ 79 : virtual void init( 80 : DDS::Topic_ptr topic, 81 : TopicImpl* topic_servant, 82 : const DDS::DataWriterQos & qos, 83 : ReplayerListener_rch a_listener, 84 : const DDS::StatusMask & mask, 85 : OpenDDS::DCPS::DomainParticipantImpl* participant_servant, 86 : const DDS::PublisherQos& publisher_qos); 87 : 88 : // Implement Replayer 89 : virtual DDS::ReturnCode_t write (const RawDataSample& sample ); 90 : virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, 91 : const RawDataSample& sample ); 92 : virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, 93 : const RawDataSampleList& samples ); 94 : virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos & publisher_qos, 95 : const DDS::DataWriterQos & datawriter_qos); 96 : virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos & publisher_qos, 97 : DDS::DataWriterQos & datawriter_qos); 98 : virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener, 99 : DDS::StatusMask mask); 100 : virtual ReplayerListener_rch get_listener (); 101 : 102 : // Implement TransportClient 103 : virtual bool check_transport_qos(const TransportInst& inst); 104 : virtual GUID_t get_guid() const; 105 0 : DDS::DomainId_t domain_id() const { return this->domain_id_; } 106 : virtual CORBA::Long get_priority_value(const AssociationData& data) const; 107 0 : SequenceNumber get_max_sn() const { return sequence_number_; } 108 : 109 : 110 : // Implement TransportSendListener 111 : virtual void data_delivered(const DataSampleElement* sample); 112 : virtual void data_dropped(const DataSampleElement* sample, 113 : bool dropped_by_transport); 114 : 115 : virtual void control_delivered(const Message_Block_Ptr& sample); 116 : virtual void control_dropped(const Message_Block_Ptr& sample, 117 : bool dropped_by_transport); 118 : 119 : virtual void notify_publication_disconnected(const ReaderIdSeq& subids); 120 : virtual void notify_publication_reconnected(const ReaderIdSeq& subids); 121 : virtual void notify_publication_lost(const ReaderIdSeq& subids); 122 : 123 : /// Statistics counter. 124 : int data_dropped_count_; 125 : int data_delivered_count_; 126 : 127 : 128 : virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const; 129 : 130 : // implement DataWriterCallbacks 131 : virtual void add_association(const GUID_t& yourId, 132 : const ReaderAssociation& reader, 133 : bool active); 134 : 135 : virtual void remove_associations(const ReaderIdSeq& readers, 136 : CORBA::Boolean callback); 137 : 138 0 : virtual void replay_durable_data_for(const GUID_t&) {} 139 : 140 : virtual void update_incompatible_qos(const IncompatibleQosStatus& status); 141 : 142 : virtual void update_subscription_params(const GUID_t& readerId, 143 : const DDS::StringSeq& exprParams); 144 : 145 : void remove_all_associations(); 146 : 147 : virtual void register_for_reader(const GUID_t& participant, 148 : const GUID_t& writerid, 149 : const GUID_t& readerid, 150 : const TransportLocatorSeq& locators, 151 : DiscoveryListener* listener); 152 : 153 : virtual void unregister_for_reader(const GUID_t& participant, 154 : const GUID_t& writerid, 155 : const GUID_t& readerid); 156 : 157 0 : virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint() { return DCPS::WeakRcHandle<ICE::Endpoint>(); } 158 : 159 : DDS::ReturnCode_t enable(); 160 : 161 0 : DomainParticipantImpl* participant() { 162 0 : return participant_servant_; 163 : } 164 : 165 : virtual DDS::InstanceHandle_t get_instance_handle(); 166 : 167 : private: 168 : void notify_publication_lost(const DDS::InstanceHandleSeq& handles); 169 : 170 : DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader); 171 : 172 : DDS::ReturnCode_t 173 : create_sample_data_message(Message_Block_Ptr data, 174 : DataSampleHeader& header_data, 175 : Message_Block_Ptr& message, 176 : const DDS::Time_t& source_timestamp, 177 : bool content_filter); 178 : bool need_sequence_repair() const; 179 : 180 : /// Lookup the instance handles by the subscription repo ids 181 : void lookup_instance_handles(const ReaderIdSeq& ids, 182 : DDS::InstanceHandleSeq& hdls); 183 : /// The number of chunks for the cached allocator. 184 : size_t n_chunks_; 185 : 186 : /// The multiplier for allocators affected by associations 187 : size_t association_chunk_multiplier_; 188 : 189 : /// The type name of associated topic. 190 : CORBA::String_var type_name_; 191 : 192 : /// The qos policy list of this datawriter. 193 : DDS::DataWriterQos qos_; 194 : /// The qos policy passed in by the user. 195 : /// Differs from qos_ because representation has been interpreted. 196 : DDS::DataWriterQos passed_qos_; 197 : 198 : /// The participant servant which creats the publisher that 199 : /// creates this datawriter. 200 : DomainParticipantImpl* participant_servant_; 201 : 202 : struct ReaderInfo { 203 : SequenceNumber expected_sequence_; 204 : bool durable_; 205 : ReaderInfo(const char* filter, const DDS::StringSeq& params, 206 : DomainParticipantImpl* participant, bool durable); 207 : ~ReaderInfo(); 208 : }; 209 : 210 : typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap; 211 : RepoIdToReaderInfoMap reader_info_; 212 : 213 : void association_complete_i(const GUID_t& remote_id); 214 : 215 : friend class ::DDS_TEST; // allows tests to get at privates 216 : 217 : /// The name of associated topic. 218 : CORBA::String_var topic_name_; 219 : /// The associated topic repository id. 220 : GUID_t topic_id_; 221 : /// The object reference of the associated topic. 222 : DDS::Topic_var topic_objref_; 223 : /// The topic servant. 224 : TopicDescriptionPtr<TopicImpl> topic_servant_; 225 : 226 : /// The StatusKind bit mask indicates which status condition change 227 : /// can be notified by the listener of this entity. 228 : DDS::StatusMask listener_mask_; 229 : /// Used to notify the entity for relevant events. 230 : ReplayerListener_rch listener_; 231 : /// The domain id. 232 : DDS::DomainId_t domain_id_; 233 : /// The publisher servant which creates this datawriter. 234 : PublisherImpl* publisher_servant_; 235 : DDS::PublisherQos publisher_qos_; 236 : 237 : /// The repository id of this datawriter/publication. 238 : GUID_t publication_id_; 239 : /// The sequence number unique in DataWriter scope. 240 : SequenceNumber sequence_number_; 241 : 242 : /// The sample data container. 243 : // WriteDataContainer* data_container_; 244 : /// The lock to protect the activate subscriptions 245 : /// and status changes. 246 : ACE_Recursive_Thread_Mutex lock_; 247 : 248 : typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap; 249 : 250 : RepoIdToHandleMap id_to_handle_map_; 251 : 252 : RepoIdSet readers_; 253 : 254 : /// Status conditions. 255 : // DDS::LivelinessLostStatus liveliness_lost_status_; 256 : // DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_; 257 : DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_; 258 : DDS::PublicationMatchedStatus publication_match_status_; 259 : 260 : /// True if the writer failed to actively signal its liveliness within 261 : /// its offered liveliness period. 262 : // bool liveliness_lost_; 263 : 264 : /** 265 : * @todo The publication_lost_status_ and 266 : * publication_reconnecting_status_ are left here for 267 : * future use when we add get_publication_lost_status() 268 : * and get_publication_reconnecting_status() methods. 269 : */ 270 : // Statistics of the lost publications due to lost connection. 271 : // PublicationLostStatus publication_lost_status_; 272 : // Statistics of the publications that associates with a 273 : // reconnecting datalink. 274 : // PublicationReconnectingStatus publication_reconnecting_status_; 275 : 276 : // The message block allocator. 277 : unique_ptr<MessageBlockAllocator> mb_allocator_; 278 : // The data block allocator. 279 : unique_ptr<DataBlockAllocator> db_allocator_; 280 : // The header data allocator. 281 : unique_ptr<DataSampleHeaderAllocator> header_allocator_; 282 : 283 : /// The cached allocator to allocate DataSampleElement 284 : /// objects. 285 : unique_ptr<DataSampleElementAllocator> sample_list_element_allocator_; 286 : 287 : /// The orb's reactor to be used to register the liveliness 288 : /// timer. 289 : // ACE_Reactor_Timer_Interface* reactor_; 290 : /// The time interval for sending liveliness message. 291 : // ACE_Time_Value liveliness_check_interval_; 292 : /// Timestamp of last write/dispose/assert_liveliness. 293 : // ACE_Time_Value last_liveliness_activity_time_; 294 : /// Total number of offered deadlines missed during last offered 295 : /// deadline status check. 296 : // CORBA::Long last_deadline_missed_total_count_; 297 : /// Watchdog responsible for reporting missed offered 298 : /// deadlines. 299 : // unique_ptr<OfferedDeadlineWatchdog> watchdog_; 300 : /// The flag indicates whether the liveliness timer is scheduled and 301 : /// needs be cancelled. 302 : // bool cancel_timer_; 303 : 304 : /// Flag indicates that this datawriter is a builtin topic 305 : /// datawriter. 306 : bool is_bit_; 307 : 308 : typedef OPENDDS_MAP_CMP(GUID_t, SequenceNumber, GUID_tKeyLessThan) 309 : RepoIdToSequenceMap; 310 : 311 : RepoIdToSequenceMap idToSequence_; 312 : 313 : ConditionVariable<ACE_Recursive_Thread_Mutex> empty_condition_; 314 : int pending_write_count_; 315 : }; 316 : 317 : } // namespace DCPS 318 : } // namespace 319 : 320 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 321 : 322 : #endif /* end of include guard: OPENDDS_DCPS_REPLAYERIMPL_H */