RecorderImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_RECORDERIMPL_H
00009 #define OPENDDS_DCPS_RECORDERIMPL_H
00010 
00011 #include "dds/DCPS/RcObject.h"
00012 #include "dds/DCPS/WriterInfo.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "Recorder.h"
00023 #include "RemoveAssociationSweeper.h"
00024 
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 namespace OpenDDS {
00028 namespace DCPS {
00029 
00030 /**
00031  * @class RecorderImpl
00032  *
00033  * @brief Implementation of Recorder functionality
00034  *
00035  * This class is the implementation of the Recorder.
00036  * Inheritance is used to limit the applications access to
00037  * underlying system methods.
00038  */
00039 
00040 class OpenDDS_Dcps_Export RecorderImpl
00041   : public TransportClient
00042   , public TransportReceiveListener
00043   , public DataReaderCallbacks
00044   , public Recorder
00045   , public EntityImpl
00046   , private WriterInfoListener
00047 {
00048 public:
00049   RecorderImpl();
00050 
00051   virtual ~RecorderImpl();
00052 
00053   /**
00054    * cleanup the DataWriter.
00055    */
00056   DDS::ReturnCode_t cleanup();
00057 
00058   void init(
00059     TopicDescriptionImpl*      a_topic_desc,
00060     const DDS::DataReaderQos & qos,
00061     RecorderListener_rch       a_listener,
00062     const DDS::StatusMask &    mask,
00063     DomainParticipantImpl*     participant,
00064     DDS::SubscriberQos         subqos);
00065 
00066   DDS::ReturnCode_t enable();
00067 
00068   // Implement TransportClient
00069   virtual bool check_transport_qos(const TransportInst& inst);
00070   virtual const RepoId& get_repo_id() const;
00071   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00072   virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00073 
00074   //Implement TransportReceiveListener
00075   virtual void data_received(const ReceivedDataSample& sample);
00076   virtual void notify_subscription_disconnected(const WriterIdSeq& pubids);
00077   virtual void notify_subscription_reconnected(const WriterIdSeq& pubids);
00078   virtual void notify_subscription_lost(const WriterIdSeq& pubids);
00079 
00080   // Implement DataReaderCallbacks
00081 
00082   virtual void add_association(const RepoId&            yourId,
00083                                const WriterAssociation& writer,
00084                                bool                     active);
00085 
00086   virtual void association_complete(const RepoId& remote_id);
00087   virtual void remove_associations(const WriterIdSeq& writers,
00088                                    CORBA::Boolean     callback);
00089 
00090   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00091   virtual void inconsistent_topic();
00092 
00093   virtual void signal_liveliness(const RepoId& remote_participant);
00094 
00095   void remove_all_associations();
00096 
00097 #if !defined (DDS_HAS_MINIMUM_BIT)
00098   // implement Recoder
00099   virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::RepoId&     id,
00100                                               DDS::BuiltinTopicKey_t& key);
00101 #endif
00102   /**
00103    * Set the Quality of Service settings for the Recorder.
00104    *
00105    */
00106   DDS::ReturnCode_t set_qos (const DDS::SubscriberQos & subscriber_qos,
00107                              const DDS::DataReaderQos & datareader_qos);
00108 
00109   /**
00110    * Get the Quality of Service settings for the Recorder.
00111    *
00112    */
00113   DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos,
00114                              DDS::DataReaderQos & datareader_qos);
00115 
00116 
00117   DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener,
00118                                  DDS::StatusMask             mask);
00119 
00120   RecorderListener_rch get_listener();
00121 
00122   DomainParticipantImpl*          participant() {
00123     return participant_servant_;
00124   }
00125 
00126   virtual DDS::InstanceHandle_t get_instance_handle();
00127 
00128   virtual void register_for_writer(const RepoId& /*participant*/,
00129                                    const RepoId& /*readerid*/,
00130                                    const RepoId& /*writerid*/,
00131                                    const TransportLocatorSeq& /*locators*/,
00132                                    DiscoveryListener* /*listener*/);
00133 
00134   virtual void unregister_for_writer(const RepoId& /*participant*/,
00135                                      const RepoId& /*readerid*/,
00136                                      const RepoId& /*writerid*/);
00137 
00138 protected:
00139   virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00140   void remove_publication(const PublicationId& pub_id);
00141 
00142 private:
00143 
00144   void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00145 
00146   /// Lookup the instance handles by the publication repo ids
00147   void lookup_instance_handles(const WriterIdSeq&      ids,
00148                                DDS::InstanceHandleSeq& hdls);
00149 
00150   DDS::DataReaderQos qos_;
00151 
00152   /// lock protecting sample container as well as statuses.
00153   ACE_Recursive_Thread_Mutex sample_lock_;
00154 
00155   DomainParticipantImpl*       participant_servant_;
00156   TopicDescriptionPtr<TopicImpl>              topic_servant_;
00157 
00158 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00159   bool is_exclusive_ownership_;
00160 
00161   OwnershipManager* owner_manager_;
00162 #endif
00163 
00164   DDS::SubscriberQos subqos_;
00165 
00166   friend class RemoveAssociationSweeper<RecorderImpl>;
00167 
00168   friend class ::DDS_TEST; //allows tests to get at private data
00169 
00170   DDS::TopicDescription_var topic_desc_;
00171   DDS::StatusMask listener_mask_;
00172   RecorderListener_rch listener_;
00173   DDS::DomainId_t domain_id_;
00174   RcHandle<RemoveAssociationSweeper<RecorderImpl> > remove_association_sweeper_;
00175 
00176   ACE_Recursive_Thread_Mutex publication_handle_lock_;
00177 
00178   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00179   RepoIdToHandleMap id_to_handle_map_;
00180 
00181   DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00182   DDS::SubscriptionMatchedStatus subscription_match_status_;
00183 
00184   /// Flag indicates that this datareader is a builtin topic
00185   /// datareader.
00186   bool is_bit_;
00187 
00188   /// publications writing to this reader.
00189   typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00190                    GUID_tKeyLessThan) WriterMapType;
00191   WriterMapType writers_;
00192 
00193   /// RW lock for reading/writing publications.
00194   ACE_RW_Thread_Mutex writers_lock_;
00195 };
00196 
00197 
00198 } // namespace DCPS
00199 } // namespace
00200 
00201 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00202 
00203 #endif /* end of include guard: OPENDDS_DCPS_RECORDERIMPL_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1