RecorderImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_RECORDERIMPL_H
00009 #define OPENDDS_DCPS_RECORDERIMPL_H
00010 
00011 #include "dds/DCPS/RcObject_T.h"
00012 #include "dds/DCPS/WriterInfo.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "Recorder.h"
00023 #include "RemoveAssociationSweeper.h"
00024 
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 /**
00030  * @class RecorderImpl
00031  *
00032  * @brief Implementation of Recorder functionality
00033  *
00034  * This class is the implmentation of the Recorder.
00035  * Inheritance is used to limit the applications access to
00036  * underlying system methods.
00037  */
00038 
00039 class OpenDDS_Dcps_Export RecorderImpl
00040   : public TransportClient
00041   , public TransportReceiveListener
00042   , public DataReaderCallbacks
00043   , public Recorder
00044   , public EntityImpl
00045   , private WriterInfoListener
00046 {
00047 public:
00048 
00049   //Constructor
00050   RecorderImpl();
00051 
00052   //Destructor
00053   virtual ~RecorderImpl();
00054 
00055   /**
00056    * cleanup the DataWriter.
00057    */
00058   DDS::ReturnCode_t cleanup();
00059 
00060   void init(
00061     TopicDescriptionImpl*      a_topic_desc,
00062     const DDS::DataReaderQos & qos,
00063     RecorderListener_rch       a_listener,
00064     const DDS::StatusMask &    mask,
00065     DomainParticipantImpl*     participant,
00066     DDS::SubscriberQos         subqos);
00067 
00068   DDS::ReturnCode_t enable();
00069 
00070   // Implement TransportClient
00071   virtual bool check_transport_qos(const TransportInst& inst);
00072   virtual const RepoId& get_repo_id() const;
00073   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00074   virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00075 
00076   //Implement TransportReceiveListener
00077   virtual void data_received(const ReceivedDataSample& sample);
00078   virtual void notify_subscription_disconnected(const WriterIdSeq& pubids);
00079   virtual void notify_subscription_reconnected(const WriterIdSeq& pubids);
00080   virtual void notify_subscription_lost(const WriterIdSeq& pubids);
00081   virtual void notify_connection_deleted(const RepoId&);
00082 
00083   // Implement DataReaderCallbacks
00084 
00085   virtual void add_association(const RepoId&            yourId,
00086                                const WriterAssociation& writer,
00087                                bool                     active);
00088 
00089   virtual void association_complete(const RepoId& remote_id);
00090   virtual void remove_associations(const WriterIdSeq& writers,
00091                                    CORBA::Boolean     callback);
00092 
00093   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00094   virtual void inconsistent_topic();
00095 
00096   virtual void signal_liveliness(const RepoId& remote_participant);
00097 
00098   void remove_all_associations();
00099 
00100 #if !defined (DDS_HAS_MINIMUM_BIT)
00101   // implement Recoder
00102   virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::RepoId&     id,
00103                                               DDS::BuiltinTopicKey_t& key);
00104 #endif
00105   /**
00106    * Set the Quality of Service settings for the Recorder.
00107    *
00108    */
00109   DDS::ReturnCode_t set_qos (const ::DDS::SubscriberQos & subscriber_qos,
00110                              const DDS::DataReaderQos &   datareader_qos);
00111 
00112   /**
00113    * Get the Quality of Service settings for the Recorder.
00114    *
00115    */
00116   DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos,
00117                              DDS::DataReaderQos & datareader_qos);
00118 
00119 
00120   DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener,
00121                                  DDS::StatusMask             mask);
00122 
00123   RecorderListener_rch get_listener();
00124 
00125   DomainParticipantImpl*          participant() {
00126     return participant_servant_;
00127   }
00128 
00129   virtual DDS::InstanceHandle_t get_instance_handle();
00130 
00131   virtual void register_for_writer(const RepoId& /*participant*/,
00132                                    const RepoId& /*readerid*/,
00133                                    const RepoId& /*writerid*/,
00134                                    const TransportLocatorSeq& /*locators*/,
00135                                    DiscoveryListener* /*listener*/);
00136 
00137   virtual void unregister_for_writer(const RepoId& /*participant*/,
00138                                      const RepoId& /*readerid*/,
00139                                      const RepoId& /*writerid*/);
00140 
00141 protected:
00142   virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00143   void remove_or_reschedule(const PublicationId& pub_id);
00144 
00145 private:
00146 
00147   void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00148 
00149   /// Lookup the instance handles by the publication repo ids
00150   bool lookup_instance_handles(const WriterIdSeq&      ids,
00151                                DDS::InstanceHandleSeq& hdls);
00152 
00153   void listener_add_ref() { EntityImpl::_add_ref(); }
00154   void listener_remove_ref() { EntityImpl::_remove_ref(); }
00155   void _add_ref() { EntityImpl::_add_ref(); }
00156   void _remove_ref() { EntityImpl::_remove_ref(); }
00157 
00158   DDS::DataReaderQos qos_;
00159 
00160   /// lock protecting sample container as well as statuses.
00161   ACE_Recursive_Thread_Mutex sample_lock_;
00162 
00163   DomainParticipantImpl*       participant_servant_;
00164   TopicImpl*                   topic_servant_;
00165 
00166 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00167   bool is_exclusive_ownership_;
00168 
00169   OwnershipManager* owner_manager_;
00170 #endif
00171 
00172   DDS::SubscriberQos subqos_;
00173 
00174   friend class RemoveAssociationSweeper<RecorderImpl>;
00175 
00176   friend class ::DDS_TEST; //allows tests to get at private data
00177 
00178   DDS::TopicDescription_var topic_desc_;
00179   DDS::StatusMask listener_mask_;
00180   RecorderListener_rch listener_;
00181   DDS::DomainId_t domain_id_;
00182   RemoveAssociationSweeper<RecorderImpl>* remove_association_sweeper_;
00183 
00184   ACE_Recursive_Thread_Mutex publication_handle_lock_;
00185 
00186   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00187   RepoIdToHandleMap id_to_handle_map_;
00188 
00189   DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00190   DDS::SubscriptionMatchedStatus subscription_match_status_;
00191 
00192   /// Flag indicates that this datareader is a builtin topic
00193   /// datareader.
00194   bool is_bit_;
00195 
00196   /// publications writing to this reader.
00197   typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00198                    GUID_tKeyLessThan) WriterMapType;
00199   WriterMapType writers_;
00200 
00201   /// RW lock for reading/writing publications.
00202   ACE_RW_Thread_Mutex writers_lock_;
00203 };
00204 
00205 
00206 } // namespace DCPS
00207 } // namespace
00208 
00209 #endif /* end of include guard: OPENDDS_DCPS_RECORDERIMPL_H */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7