ReplayerImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
00009 #define OPENDDS_DCPS_REPLAYERIMPL_H
00010 
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "WriteDataContainer.h"
00017 #include "Definitions.h"
00018 #include "DataSampleHeader.h"
00019 #include "TopicImpl.h"
00020 #include "Time_Helper.h"
00021 #include "CoherentChangeControl.h"
00022 #include "GuidUtils.h"
00023 #include "unique_ptr.h"
00024 
00025 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00026 #include "FilterEvaluator.h"
00027 #endif
00028 
00029 #include "ace/Event_Handler.h"
00030 #include "ace/OS_NS_sys_time.h"
00031 #include "ace/Condition_Recursive_Thread_Mutex.h"
00032 
00033 #include <memory>
00034 
00035 #include "Replayer.h"
00036 
00037 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00038 #pragma once
00039 #endif /* ACE_LACKS_PRAGMA_ONCE */
00040 
00041 class DDS_TEST;
00042 
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044 
00045 namespace OpenDDS {
00046 namespace DCPS {
00047 
00048 class SendStateDataSampleList;
00049 class DataSampleElement;
00050 
00051 /**
00052  * @class ReplayerImpl
00053  *
00054  * @brief Implementation of Replayer functionality
00055  *
00056  * This class is the implementation of the Replayer.
00057  * Inheritance is used to limit the applications access to
00058  * underlying system methods.
00059  */
00060 
00061 class OpenDDS_Dcps_Export ReplayerImpl : public Replayer,
00062   public TransportClient,
00063   public TransportSendListener,
00064   public DataWriterCallbacks,
00065   public EntityImpl
00066 {
00067 public:
00068 
00069   ReplayerImpl();
00070   ~ReplayerImpl();
00071 
00072   /**
00073    * cleanup the DataWriter.
00074    */
00075   DDS::ReturnCode_t cleanup();
00076 
00077   /**
00078    * Initialize the data members.
00079    */
00080   virtual void init(
00081     DDS::Topic_ptr                        topic,
00082     TopicImpl*                            topic_servant,
00083     const DDS::DataWriterQos &            qos,
00084     ReplayerListener_rch                  a_listener,
00085     const DDS::StatusMask &               mask,
00086     OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00087     const DDS::PublisherQos&              publisher_qos);
00088 
00089 
00090   // implement Replayer
00091 
00092   virtual DDS::ReturnCode_t write (const RawDataSample& sample );
00093   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00094                                              const RawDataSample&  sample );
00095   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t    subscription,
00096                                              const RawDataSampleList& samples );
00097   virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos & publisher_qos,
00098                                      const DDS::DataWriterQos &  datawriter_qos);
00099   virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &  publisher_qos,
00100                                      DDS::DataWriterQos & datawriter_qos);
00101   virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
00102                                           DDS::StatusMask              mask);
00103   virtual ReplayerListener_rch get_listener ();
00104 
00105   // Implement TransportClient
00106   virtual bool check_transport_qos(const TransportInst& inst);
00107   virtual const RepoId& get_repo_id() const;
00108   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00109   virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00110 
00111   // Implement TransportSendListener
00112   virtual void data_delivered(const DataSampleElement* sample);
00113   virtual void data_dropped(const DataSampleElement* sample,
00114                             bool                         dropped_by_transport);
00115 
00116   virtual void control_delivered(const Message_Block_Ptr& sample);
00117   virtual void control_dropped(const Message_Block_Ptr& sample,
00118                                bool               dropped_by_transport);
00119 
00120   virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
00121   virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
00122   virtual void notify_publication_lost(const ReaderIdSeq& subids);
00123 
00124   /// Statistics counter.
00125   int data_dropped_count_;
00126   int data_delivered_count_;
00127 
00128 
00129   virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
00130 
00131   // implement DataWriterCallbacks
00132   virtual void add_association(const RepoId&            yourId,
00133                                const ReaderAssociation& reader,
00134                                bool                     active);
00135 
00136   virtual void association_complete(const RepoId& remote_id);
00137 
00138   virtual void remove_associations(const ReaderIdSeq& readers,
00139                                    CORBA::Boolean     callback);
00140 
00141   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00142 
00143   virtual void update_subscription_params(const RepoId&         readerId,
00144                                           const DDS::StringSeq& exprParams);
00145 
00146   virtual void inconsistent_topic();
00147 
00148   void remove_all_associations();
00149 
00150   virtual void register_for_reader(const RepoId& participant,
00151                                    const RepoId& writerid,
00152                                    const RepoId& readerid,
00153                                    const TransportLocatorSeq& locators,
00154                                    DiscoveryListener* listener);
00155 
00156   virtual void unregister_for_reader(const RepoId& participant,
00157                                      const RepoId& writerid,
00158                                      const RepoId& readerid);
00159 
00160   DDS::ReturnCode_t enable();
00161 
00162   DomainParticipantImpl*          participant() {
00163     return participant_servant_;
00164   }
00165 
00166   virtual DDS::InstanceHandle_t get_instance_handle();
00167 
00168 private:
00169 
00170   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00171 
00172   DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
00173 
00174   DDS::ReturnCode_t
00175   create_sample_data_message(Message_Block_Ptr   data,
00176                              DataSampleHeader&   header_data,
00177                              Message_Block_Ptr&  message,
00178                              const DDS::Time_t&  source_timestamp,
00179                              bool                content_filter);
00180   bool need_sequence_repair() const;
00181 
00182   /// Lookup the instance handles by the subscription repo ids
00183   void lookup_instance_handles(const ReaderIdSeq&      ids,
00184                                DDS::InstanceHandleSeq& hdls);
00185   /// The number of chunks for the cached allocator.
00186   size_t n_chunks_;
00187 
00188   /// The multiplier for allocators affected by associations
00189   size_t association_chunk_multiplier_;
00190 
00191   /// The type name of associated topic.
00192   CORBA::String_var type_name_;
00193 
00194   /// The qos policy list of this datawriter.
00195   DDS::DataWriterQos qos_;
00196 
00197   /// The participant servant which creats the publisher that
00198   /// creates this datawriter.
00199   DomainParticipantImpl*          participant_servant_;
00200 
00201   struct ReaderInfo {
00202     SequenceNumber expected_sequence_;
00203     bool durable_;
00204     ReaderInfo(const char* filter, const DDS::StringSeq& params,
00205                DomainParticipantImpl* participant, bool durable);
00206     ~ReaderInfo();
00207   };
00208 
00209   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00210   RepoIdToReaderInfoMap reader_info_;
00211 
00212   void association_complete_i(const RepoId& remote_id);
00213 
00214   friend class ::DDS_TEST; // allows tests to get at privates
00215 
00216   /// The name of associated topic.
00217   CORBA::String_var topic_name_;
00218   /// The associated topic repository id.
00219   RepoId topic_id_;
00220   /// The object reference of the associated topic.
00221   DDS::Topic_var topic_objref_;
00222   /// The topic servant.
00223   TopicDescriptionPtr<TopicImpl> topic_servant_;
00224 
00225   /// The StatusKind bit mask indicates which status condition change
00226   /// can be notified by the listener of this entity.
00227   DDS::StatusMask listener_mask_;
00228   /// Used to notify the entity for relevant events.
00229   ReplayerListener_rch listener_;
00230   /// The domain id.
00231   DDS::DomainId_t domain_id_;
00232   /// The publisher servant which creates this datawriter.
00233   PublisherImpl*                  publisher_servant_;
00234   DDS::PublisherQos publisher_qos_;
00235 
00236   /// The repository id of this datawriter/publication.
00237   PublicationId publication_id_;
00238   /// The sequence number unique in DataWriter scope.
00239   SequenceNumber sequence_number_;
00240 
00241   /// The sample data container.
00242   // WriteDataContainer*             data_container_;
00243   /// The lock to protect the activate subscriptions
00244   /// and status changes.
00245   ACE_Recursive_Thread_Mutex lock_;
00246 
00247   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00248 
00249   RepoIdToHandleMap id_to_handle_map_;
00250 
00251   RepoIdSet readers_;
00252 
00253   /// Status conditions.
00254   // DDS::LivelinessLostStatus liveliness_lost_status_;
00255   // DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_;
00256   DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
00257   DDS::PublicationMatchedStatus publication_match_status_;
00258 
00259   /// True if the writer failed to actively signal its liveliness within
00260   /// its offered liveliness period.
00261   // bool liveliness_lost_;
00262 
00263   /**
00264    * @todo The publication_lost_status_ and
00265    *       publication_reconnecting_status_ are left here for
00266    *       future use when we add get_publication_lost_status()
00267    *       and get_publication_reconnecting_status() methods.
00268    */
00269   // Statistics of the lost publications due to lost connection.
00270   // PublicationLostStatus               publication_lost_status_;
00271   // Statistics of the publications that associates with a
00272   // reconnecting datalink.
00273   // PublicationReconnectingStatus       publication_reconnecting_status_;
00274 
00275   // The message block allocator.
00276   unique_ptr<MessageBlockAllocator>     mb_allocator_;
00277   // The data block allocator.
00278   unique_ptr<DataBlockAllocator>        db_allocator_;
00279   // The header data allocator.
00280   unique_ptr<DataSampleHeaderAllocator> header_allocator_;
00281 
00282   /// The cached allocator to allocate DataSampleElement
00283   /// objects.
00284   unique_ptr<DataSampleElementAllocator> sample_list_element_allocator_;
00285 
00286   /// The orb's reactor to be used to register the liveliness
00287   /// timer.
00288   // ACE_Reactor_Timer_Interface* reactor_;
00289   /// The time interval for sending liveliness message.
00290   // ACE_Time_Value             liveliness_check_interval_;
00291   /// Timestamp of last write/dispose/assert_liveliness.
00292   // ACE_Time_Value             last_liveliness_activity_time_;
00293   /// Total number of offered deadlines missed during last offered
00294   /// deadline status check.
00295   // CORBA::Long last_deadline_missed_total_count_;
00296   /// Watchdog responsible for reporting missed offered
00297   /// deadlines.
00298   // unique_ptr<OfferedDeadlineWatchdog> watchdog_;
00299   /// The flag indicates whether the liveliness timer is scheduled and
00300   /// needs be cancelled.
00301   // bool                       cancel_timer_;
00302 
00303   /// Flag indicates that this datawriter is a builtin topic
00304   /// datawriter.
00305   bool is_bit_;
00306 
00307   typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan)
00308   RepoIdToSequenceMap;
00309 
00310   RepoIdToSequenceMap idToSequence_;
00311 
00312   RepoIdSet pending_readers_, assoc_complete_readers_;
00313 
00314   ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00315   int pending_write_count_;
00316 };
00317 
00318 } // namespace DCPS
00319 } // namespace
00320 
00321 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00322 
00323 #endif /* end of include guard: OPENDDS_DCPS_REPLAYERIMPL_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1