RecorderImpl.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_RECORDERIMPL_H
00009 #define OPENDDS_DCPS_RECORDERIMPL_H
00010
00011 #include "dds/DCPS/RcObject.h"
00012 #include "dds/DCPS/WriterInfo.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "Recorder.h"
00023 #include "RemoveAssociationSweeper.h"
00024
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 namespace OpenDDS {
00028 namespace DCPS {
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 class OpenDDS_Dcps_Export RecorderImpl
00041 : public TransportClient
00042 , public TransportReceiveListener
00043 , public DataReaderCallbacks
00044 , public Recorder
00045 , public EntityImpl
00046 , private WriterInfoListener
00047 {
00048 public:
00049 RecorderImpl();
00050
00051 virtual ~RecorderImpl();
00052
00053
00054
00055
00056 DDS::ReturnCode_t cleanup();
00057
00058 void init(
00059 TopicDescriptionImpl* a_topic_desc,
00060 const DDS::DataReaderQos & qos,
00061 RecorderListener_rch a_listener,
00062 const DDS::StatusMask & mask,
00063 DomainParticipantImpl* participant,
00064 DDS::SubscriberQos subqos);
00065
00066 DDS::ReturnCode_t enable();
00067
00068
00069 virtual bool check_transport_qos(const TransportInst& inst);
00070 virtual const RepoId& get_repo_id() const;
00071 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00072 virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00073
00074
00075 virtual void data_received(const ReceivedDataSample& sample);
00076 virtual void notify_subscription_disconnected(const WriterIdSeq& pubids);
00077 virtual void notify_subscription_reconnected(const WriterIdSeq& pubids);
00078 virtual void notify_subscription_lost(const WriterIdSeq& pubids);
00079
00080
00081
00082 virtual void add_association(const RepoId& yourId,
00083 const WriterAssociation& writer,
00084 bool active);
00085
00086 virtual void association_complete(const RepoId& remote_id);
00087 virtual void remove_associations(const WriterIdSeq& writers,
00088 CORBA::Boolean callback);
00089
00090 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00091 virtual void inconsistent_topic();
00092
00093 virtual void signal_liveliness(const RepoId& remote_participant);
00094
00095 void remove_all_associations();
00096
00097 #if !defined (DDS_HAS_MINIMUM_BIT)
00098
00099 virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::RepoId& id,
00100 DDS::BuiltinTopicKey_t& key);
00101 #endif
00102
00103
00104
00105
00106 DDS::ReturnCode_t set_qos (const DDS::SubscriberQos & subscriber_qos,
00107 const DDS::DataReaderQos & datareader_qos);
00108
00109
00110
00111
00112
00113 DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos,
00114 DDS::DataReaderQos & datareader_qos);
00115
00116
00117 DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener,
00118 DDS::StatusMask mask);
00119
00120 RecorderListener_rch get_listener();
00121
00122 DomainParticipantImpl* participant() {
00123 return participant_servant_;
00124 }
00125
00126 virtual DDS::InstanceHandle_t get_instance_handle();
00127
00128 virtual void register_for_writer(const RepoId& ,
00129 const RepoId& ,
00130 const RepoId& ,
00131 const TransportLocatorSeq& ,
00132 DiscoveryListener* );
00133
00134 virtual void unregister_for_writer(const RepoId& ,
00135 const RepoId& ,
00136 const RepoId& );
00137
00138 protected:
00139 virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00140 void remove_publication(const PublicationId& pub_id);
00141
00142 private:
00143
00144 void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00145
00146
00147 void lookup_instance_handles(const WriterIdSeq& ids,
00148 DDS::InstanceHandleSeq& hdls);
00149
00150 DDS::DataReaderQos qos_;
00151
00152
00153 ACE_Recursive_Thread_Mutex sample_lock_;
00154
00155 DomainParticipantImpl* participant_servant_;
00156 TopicDescriptionPtr<TopicImpl> topic_servant_;
00157
00158 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00159 bool is_exclusive_ownership_;
00160
00161 OwnershipManager* owner_manager_;
00162 #endif
00163
00164 DDS::SubscriberQos subqos_;
00165
00166 friend class RemoveAssociationSweeper<RecorderImpl>;
00167
00168 friend class ::DDS_TEST;
00169
00170 DDS::TopicDescription_var topic_desc_;
00171 DDS::StatusMask listener_mask_;
00172 RecorderListener_rch listener_;
00173 DDS::DomainId_t domain_id_;
00174 RcHandle<RemoveAssociationSweeper<RecorderImpl> > remove_association_sweeper_;
00175
00176 ACE_Recursive_Thread_Mutex publication_handle_lock_;
00177
00178 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00179 RepoIdToHandleMap id_to_handle_map_;
00180
00181 DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00182 DDS::SubscriptionMatchedStatus subscription_match_status_;
00183
00184
00185
00186 bool is_bit_;
00187
00188
00189 typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00190 GUID_tKeyLessThan) WriterMapType;
00191 WriterMapType writers_;
00192
00193
00194 ACE_RW_Thread_Mutex writers_lock_;
00195 };
00196
00197
00198 }
00199 }
00200
00201 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00202
00203 #endif