00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_RECORDERIMPL_H
00009 #define OPENDDS_DCPS_RECORDERIMPL_H
00010
00011 #include "dds/DCPS/RcObject_T.h"
00012 #include "dds/DCPS/WriterInfo.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "Recorder.h"
00023 #include "RemoveAssociationSweeper.h"
00024
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 class OpenDDS_Dcps_Export RecorderImpl
00040 : public TransportClient
00041 , public TransportReceiveListener
00042 , public DataReaderCallbacks
00043 , public Recorder
00044 , public EntityImpl
00045 , private WriterInfoListener
00046 {
00047 public:
00048
00049
00050 RecorderImpl();
00051
00052
00053 virtual ~RecorderImpl();
00054
00055
00056
00057
00058 DDS::ReturnCode_t cleanup();
00059
00060 void init(
00061 TopicDescriptionImpl* a_topic_desc,
00062 const DDS::DataReaderQos & qos,
00063 RecorderListener_rch a_listener,
00064 const DDS::StatusMask & mask,
00065 DomainParticipantImpl* participant,
00066 DDS::SubscriberQos subqos);
00067
00068 DDS::ReturnCode_t enable();
00069
00070
00071 virtual bool check_transport_qos(const TransportInst& inst);
00072 virtual const RepoId& get_repo_id() const;
00073 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00074 virtual CORBA::Long get_priority_value(const AssociationData& data) const;
00075
00076
00077 virtual void data_received(const ReceivedDataSample& sample);
00078 virtual void notify_subscription_disconnected(const WriterIdSeq& pubids);
00079 virtual void notify_subscription_reconnected(const WriterIdSeq& pubids);
00080 virtual void notify_subscription_lost(const WriterIdSeq& pubids);
00081 virtual void notify_connection_deleted(const RepoId&);
00082
00083
00084
00085 virtual void add_association(const RepoId& yourId,
00086 const WriterAssociation& writer,
00087 bool active);
00088
00089 virtual void association_complete(const RepoId& remote_id);
00090 virtual void remove_associations(const WriterIdSeq& writers,
00091 CORBA::Boolean callback);
00092
00093 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00094 virtual void inconsistent_topic();
00095
00096 virtual void signal_liveliness(const RepoId& remote_participant);
00097
00098 void remove_all_associations();
00099
00100 #if !defined (DDS_HAS_MINIMUM_BIT)
00101
00102 virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::RepoId& id,
00103 DDS::BuiltinTopicKey_t& key);
00104 #endif
00105
00106
00107
00108
00109 DDS::ReturnCode_t set_qos (const ::DDS::SubscriberQos & subscriber_qos,
00110 const DDS::DataReaderQos & datareader_qos);
00111
00112
00113
00114
00115
00116 DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos,
00117 DDS::DataReaderQos & datareader_qos);
00118
00119
00120 DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener,
00121 DDS::StatusMask mask);
00122
00123 RecorderListener_rch get_listener();
00124
00125 DomainParticipantImpl* participant() {
00126 return participant_servant_;
00127 }
00128
00129 virtual DDS::InstanceHandle_t get_instance_handle();
00130
00131 virtual void register_for_writer(const RepoId& ,
00132 const RepoId& ,
00133 const RepoId& ,
00134 const TransportLocatorSeq& ,
00135 DiscoveryListener* );
00136
00137 virtual void unregister_for_writer(const RepoId& ,
00138 const RepoId& ,
00139 const RepoId& );
00140
00141 protected:
00142 virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00143 void remove_or_reschedule(const PublicationId& pub_id);
00144
00145 private:
00146
00147 void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00148
00149
00150 bool lookup_instance_handles(const WriterIdSeq& ids,
00151 DDS::InstanceHandleSeq& hdls);
00152
00153 void listener_add_ref() { EntityImpl::_add_ref(); }
00154 void listener_remove_ref() { EntityImpl::_remove_ref(); }
00155 void _add_ref() { EntityImpl::_add_ref(); }
00156 void _remove_ref() { EntityImpl::_remove_ref(); }
00157
00158 DDS::DataReaderQos qos_;
00159
00160
00161 ACE_Recursive_Thread_Mutex sample_lock_;
00162
00163 DomainParticipantImpl* participant_servant_;
00164 TopicImpl* topic_servant_;
00165
00166 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00167 bool is_exclusive_ownership_;
00168
00169 OwnershipManager* owner_manager_;
00170 #endif
00171
00172 DDS::SubscriberQos subqos_;
00173
00174 friend class RemoveAssociationSweeper<RecorderImpl>;
00175
00176 friend class ::DDS_TEST;
00177
00178 DDS::TopicDescription_var topic_desc_;
00179 DDS::StatusMask listener_mask_;
00180 RecorderListener_rch listener_;
00181 DDS::DomainId_t domain_id_;
00182 RemoveAssociationSweeper<RecorderImpl>* remove_association_sweeper_;
00183
00184 ACE_Recursive_Thread_Mutex publication_handle_lock_;
00185
00186 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00187 RepoIdToHandleMap id_to_handle_map_;
00188
00189 DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00190 DDS::SubscriptionMatchedStatus subscription_match_status_;
00191
00192
00193
00194 bool is_bit_;
00195
00196
00197 typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00198 GUID_tKeyLessThan) WriterMapType;
00199 WriterMapType writers_;
00200
00201
00202 ACE_RW_Thread_Mutex writers_lock_;
00203 };
00204
00205
00206 }
00207 }
00208
00209 #endif