ReplayerImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
00009 #define OPENDDS_DCPS_REPLAYERIMPL_H
00010 
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "WriteDataContainer.h"
00017 #include "Definitions.h"
00018 #include "DataSampleHeader.h"
00019 #include "TopicImpl.h"
00020 #include "Qos_Helper.h"
00021 #include "CoherentChangeControl.h"
00022 #include "GuidUtils.h"
00023 
00024 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00025 #include "FilterEvaluator.h"
00026 #endif
00027 
00028 #include "ace/Event_Handler.h"
00029 #include "ace/OS_NS_sys_time.h"
00030 #include "ace/Condition_T.h"
00031 #include "ace/Condition_Recursive_Thread_Mutex.h"
00032 
00033 #include <memory>
00034 
00035 #include "Replayer.h"
00036 
00037 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00038 #pragma once
00039 #endif /* ACE_LACKS_PRAGMA_ONCE */
00040 
00041 class DDS_TEST;
00042 namespace OpenDDS {
00043 namespace DCPS {
00044 
00045 class SendStateDataSampleList;
00046 class DataSampleElement;
00047 
00048 /**
00049  * @class ReplayerImpl
00050  *
00051  * @brief Implementation of Replayer functionality
00052  *
00053  * This class is the implmentation of the Replayer.
00054  * Inheritance is used to limit the applications access to
00055  * underlying system methods.
00056  */
00057 
00058 class OpenDDS_Dcps_Export ReplayerImpl : public Replayer,
00059   public TransportClient,
00060   public TransportSendListener,
00061   public DataWriterCallbacks,
00062   public EntityImpl
00063 {
00064 public:
00065 
00066   ReplayerImpl();
00067   ~ReplayerImpl();
00068 
00069   /**
00070    * cleanup the DataWriter.
00071    */
00072   DDS::ReturnCode_t cleanup();
00073 
00074   /**
00075    * Initialize the data members.
00076    */
00077   virtual void init(
00078     DDS::Topic_ptr                        topic,
00079     TopicImpl*                            topic_servant,
00080     const DDS::DataWriterQos &            qos,
00081     ReplayerListener_rch                  a_listener,
00082     const DDS::StatusMask &               mask,
00083     OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00084     const DDS::PublisherQos&              publisher_qos);
00085 
00086 
00087   // implement Replayer
00088 
00089   virtual DDS::ReturnCode_t write (const RawDataSample& sample );
00090   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
00091                                              const RawDataSample&  sample );
00092   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t    subscription,
00093                                              const RawDataSampleList& samples );
00094   virtual DDS::ReturnCode_t set_qos (const ::DDS::PublisherQos & publisher_qos,
00095                                      const DDS::DataWriterQos &  datawriter_qos);
00096   virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &  publisher_qos,
00097                                      DDS::DataWriterQos & datawriter_qos);
00098   virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
00099                                           DDS::StatusMask              mask);
00100   virtual ReplayerListener_rch get_listener ();
00101 
00102   // Implement TransportClient
00103   virtual bool check_transport_qos(const TransportInst& inst);
00104   virtual const RepoId& get_repo_id() const;
00105   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00106   virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00107 
00108   // Implement TransportSendListener
00109   virtual void data_delivered(const DataSampleElement* sample);
00110   virtual void data_dropped(const DataSampleElement* sample,
00111                             bool                         dropped_by_transport);
00112 
00113   virtual void control_delivered(ACE_Message_Block* sample);
00114   virtual void control_dropped(ACE_Message_Block* sample,
00115                                bool               dropped_by_transport);
00116 
00117   virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
00118   virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
00119   virtual void notify_publication_lost(const ReaderIdSeq& subids);
00120   virtual void notify_connection_deleted(const RepoId&);
00121 
00122   /// Statistics counter.
00123   int data_dropped_count_;
00124   int data_delivered_count_;
00125 
00126 
00127   virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
00128 
00129   // implement DataWriterCallbacks
00130   virtual void add_association(const RepoId&            yourId,
00131                                const ReaderAssociation& reader,
00132                                bool                     active);
00133 
00134   virtual void association_complete(const RepoId& remote_id);
00135 
00136   virtual void remove_associations(const ReaderIdSeq& readers,
00137                                    CORBA::Boolean     callback);
00138 
00139   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00140 
00141   virtual void update_subscription_params(const RepoId&         readerId,
00142                                           const DDS::StringSeq& exprParams);
00143 
00144   virtual void inconsistent_topic();
00145 
00146   void remove_all_associations();
00147 
00148   virtual void register_for_reader(const RepoId& participant,
00149                                    const RepoId& writerid,
00150                                    const RepoId& readerid,
00151                                    const TransportLocatorSeq& locators,
00152                                    DiscoveryListener* listener);
00153 
00154   virtual void unregister_for_reader(const RepoId& participant,
00155                                      const RepoId& writerid,
00156                                      const RepoId& readerid);
00157 
00158   DDS::ReturnCode_t enable();
00159 
00160   DomainParticipantImpl*          participant() {
00161     return participant_servant_;
00162   }
00163 
00164   virtual DDS::InstanceHandle_t get_instance_handle();
00165 
00166 private:
00167 
00168   void _add_ref() { EntityImpl::_add_ref(); }
00169   void _remove_ref() { EntityImpl::_remove_ref(); }
00170 
00171   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00172 
00173   DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
00174 
00175   DDS::ReturnCode_t
00176   create_sample_data_message(DataSample*         data,
00177                              DataSampleHeader&   header_data,
00178                              ACE_Message_Block*& message,
00179                              const DDS::Time_t&  source_timestamp,
00180                              bool                content_filter);
00181   bool need_sequence_repair() const;
00182 
00183   /// Lookup the instance handles by the subscription repo ids
00184   bool lookup_instance_handles(const ReaderIdSeq&      ids,
00185                                DDS::InstanceHandleSeq& hdls);
00186   /// The number of chunks for the cached allocator.
00187   size_t n_chunks_;
00188 
00189   /// The multiplier for allocators affected by associations
00190   size_t association_chunk_multiplier_;
00191 
00192   /// The type name of associated topic.
00193   CORBA::String_var type_name_;
00194 
00195   /// The qos policy list of this datawriter.
00196   DDS::DataWriterQos qos_;
00197 
00198   /// The participant servant which creats the publisher that
00199   /// creates this datawriter.
00200   DomainParticipantImpl*          participant_servant_;
00201 
00202   struct ReaderInfo {
00203     SequenceNumber expected_sequence_;
00204     bool durable_;
00205     ReaderInfo(const char* filter, const DDS::StringSeq& params,
00206                DomainParticipantImpl* participant, bool durable);
00207     ~ReaderInfo();
00208   };
00209 
00210   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00211   RepoIdToReaderInfoMap reader_info_;
00212 
00213   void association_complete_i(const RepoId& remote_id);
00214 
00215   friend class ::DDS_TEST; // allows tests to get at privates
00216 
00217   /// The name of associated topic.
00218   CORBA::String_var topic_name_;
00219   /// The associated topic repository id.
00220   RepoId topic_id_;
00221   /// The object reference of the associated topic.
00222   DDS::Topic_var topic_objref_;
00223   /// The topic servant.
00224   TopicImpl*                      topic_servant_;
00225 
00226   /// The StatusKind bit mask indicates which status condition change
00227   /// can be notified by the listener of this entity.
00228   DDS::StatusMask listener_mask_;
00229   /// Used to notify the entity for relevant events.
00230   ReplayerListener_rch listener_;
00231   /// The domain id.
00232   DDS::DomainId_t domain_id_;
00233   /// The publisher servant which creates this datawriter.
00234   PublisherImpl*                  publisher_servant_;
00235   DDS::PublisherQos publisher_qos_;
00236 
00237   /// The repository id of this datawriter/publication.
00238   PublicationId publication_id_;
00239   /// The sequence number unique in DataWriter scope.
00240   SequenceNumber sequence_number_;
00241 
00242   /// The sample data container.
00243   // WriteDataContainer*             data_container_;
00244   /// The lock to protect the activate subscriptions
00245   /// and status changes.
00246   ACE_Recursive_Thread_Mutex lock_;
00247 
00248   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00249 
00250   RepoIdToHandleMap id_to_handle_map_;
00251 
00252   RepoIdSet readers_;
00253 
00254   /// Status conditions.
00255   // DDS::LivelinessLostStatus liveliness_lost_status_;
00256   // DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_;
00257   DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
00258   DDS::PublicationMatchedStatus publication_match_status_;
00259 
00260   /// True if the writer failed to actively signal its liveliness within
00261   /// its offered liveliness period.
00262   // bool liveliness_lost_;
00263 
00264   /**
00265    * @todo The publication_lost_status_ and
00266    *       publication_reconnecting_status_ are left here for
00267    *       future use when we add get_publication_lost_status()
00268    *       and get_publication_reconnecting_status() methods.
00269    */
00270   // Statistics of the lost publications due to lost connection.
00271   // PublicationLostStatus               publication_lost_status_;
00272   // Statistics of the publications that associates with a
00273   // reconnecting datalink.
00274   // PublicationReconnectingStatus       publication_reconnecting_status_;
00275 
00276   // The message block allocator.
00277   std::auto_ptr<MessageBlockAllocator>     mb_allocator_;
00278   // The data block allocator.
00279   std::auto_ptr<DataBlockAllocator>        db_allocator_;
00280   // The header data allocator.
00281   std::auto_ptr<DataSampleHeaderAllocator> header_allocator_;
00282 
00283   /// The cached allocator to allocate DataSampleElement
00284   /// objects.
00285   std::auto_ptr<DataSampleElementAllocator> sample_list_element_allocator_;
00286 
00287   /// The allocator for TransportSendElement.
00288   /// The TransportSendElement allocator is put here because it
00289   /// needs the number of chunks information that WriteDataContainer
00290   /// has.
00291   std::auto_ptr<TransportSendElementAllocator>  transport_send_element_allocator_;
00292 
00293   std::auto_ptr<TransportCustomizedElementAllocator> transport_customized_element_allocator_;
00294 
00295   /// The orb's reactor to be used to register the liveliness
00296   /// timer.
00297   // ACE_Reactor_Timer_Interface* reactor_;
00298   /// The time interval for sending liveliness message.
00299   // ACE_Time_Value             liveliness_check_interval_;
00300   /// Timestamp of last write/dispose/assert_liveliness.
00301   // ACE_Time_Value             last_liveliness_activity_time_;
00302   /// Total number of offered deadlines missed during last offered
00303   /// deadline status check.
00304   // CORBA::Long last_deadline_missed_total_count_;
00305   /// Watchdog responsible for reporting missed offered
00306   /// deadlines.
00307   // std::auto_ptr<OfferedDeadlineWatchdog> watchdog_;
00308   /// The flag indicates whether the liveliness timer is scheduled and
00309   /// needs be cancelled.
00310   // bool                       cancel_timer_;
00311 
00312   /// Flag indicates that this datawriter is a builtin topic
00313   /// datawriter.
00314   bool is_bit_;
00315 
00316   /// Flag indicates that the init() is called.
00317   // bool                       initialized_;
00318 
00319   typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan)
00320   RepoIdToSequenceMap;
00321 
00322   RepoIdToSequenceMap idToSequence_;
00323 
00324   RepoIdSet pending_readers_, assoc_complete_readers_;
00325 
00326   ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00327   int pending_write_count_;
00328 };
00329 
00330 } // namespace DCPS
00331 } // namespace
00332 
00333 #endif /* end of include guard: OPENDDS_DCPS_REPLAYERIMPL_H */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7