Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_RECORDERIMPL_H 9 : #define OPENDDS_DCPS_RECORDERIMPL_H 10 : 11 : #include "RcObject.h" 12 : #include "WriterInfo.h" 13 : #include "Definitions.h" 14 : #include "DataReaderCallbacks.h" 15 : #include "Recorder.h" 16 : #include "EntityImpl.h" 17 : #include "TopicImpl.h" 18 : #include "OwnershipManager.h" 19 : 20 : #include "transport/framework/TransportClient.h" 21 : #include "transport/framework/TransportDefs.h" 22 : #include "transport/framework/TransportReceiveListener.h" 23 : #include "transport/framework/ReceivedDataSample.h" 24 : 25 : #include <dds/DdsDcpsTopicC.h> 26 : #include <dds/DdsDcpsSubscriptionExtC.h> 27 : #include <dds/DdsDcpsDomainC.h> 28 : #include <dds/DdsDcpsTopicC.h> 29 : 30 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 31 : 32 : namespace OpenDDS { 33 : namespace DCPS { 34 : 35 : /** 36 : * @class RecorderImpl 37 : * 38 : * @brief Implementation of Recorder functionality 39 : * 40 : * This class is the implementation of the Recorder. 41 : * Inheritance is used to limit the applications access to 42 : * underlying system methods. 43 : */ 44 : 45 : class OpenDDS_Dcps_Export RecorderImpl 46 : : public TransportClient 47 : , public TransportReceiveListener 48 : , public DataReaderCallbacks 49 : , public Recorder 50 : , public EntityImpl 51 : , private WriterInfoListener 52 : { 53 : public: 54 : RecorderImpl(); 55 : 56 : virtual ~RecorderImpl(); 57 : 58 : /** 59 : * cleanup the DataWriter. 60 : */ 61 : DDS::ReturnCode_t cleanup(); 62 : 63 : void init( 64 : TopicDescriptionImpl* a_topic_desc, 65 : const DDS::DataReaderQos & qos, 66 : RecorderListener_rch a_listener, 67 : const DDS::StatusMask & mask, 68 : DomainParticipantImpl* participant, 69 : DDS::SubscriberQos subqos); 70 : 71 : DDS::ReturnCode_t enable(); 72 : 73 : // Implement TransportClient 74 : virtual bool check_transport_qos(const TransportInst& inst); 75 : virtual GUID_t get_guid() const; 76 0 : DDS::DomainId_t domain_id() const { return this->domain_id_; } 77 : virtual CORBA::Long get_priority_value(const AssociationData& data) const; 78 : 79 : //Implement TransportReceiveListener 80 : virtual void data_received(const ReceivedDataSample& sample); 81 : virtual void notify_subscription_disconnected(const WriterIdSeq& pubids); 82 : virtual void notify_subscription_reconnected(const WriterIdSeq& pubids); 83 : virtual void notify_subscription_lost(const WriterIdSeq& pubids); 84 : 85 : // Implement DataReaderCallbacks 86 : 87 : virtual void add_association(const GUID_t& yourId, 88 : const WriterAssociation& writer, 89 : bool active); 90 : 91 : virtual void remove_associations(const WriterIdSeq& writers, 92 : CORBA::Boolean callback); 93 : 94 : virtual void update_incompatible_qos(const IncompatibleQosStatus& status); 95 : 96 : virtual void signal_liveliness(const GUID_t& remote_participant); 97 : 98 : void remove_all_associations(); 99 : 100 : #ifndef OPENDDS_SAFETY_PROFILE 101 : void add_to_dynamic_type_map(const GUID_t& pub_id, const XTypes::TypeIdentifier& ti); 102 : #endif 103 : 104 : #if !defined (DDS_HAS_MINIMUM_BIT) 105 : // implement Recoder 106 : virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::GUID_t& id, 107 : DDS::BuiltinTopicKey_t& key); 108 : #endif 109 : /** 110 : * Set the Quality of Service settings for the Recorder. 111 : * 112 : */ 113 : DDS::ReturnCode_t set_qos (const DDS::SubscriberQos & subscriber_qos, 114 : const DDS::DataReaderQos & datareader_qos); 115 : 116 : /** 117 : * Get the Quality of Service settings for the Recorder. 118 : * 119 : */ 120 : DDS::ReturnCode_t get_qos (DDS::SubscriberQos & subscriber_qos, 121 : DDS::DataReaderQos & datareader_qos); 122 : 123 : 124 : DDS::ReturnCode_t set_listener(const RecorderListener_rch& a_listener, 125 : DDS::StatusMask mask); 126 : 127 : RecorderListener_rch get_listener(); 128 : 129 0 : DomainParticipantImpl* participant() { 130 0 : return participant_servant_; 131 : } 132 : 133 : virtual DDS::InstanceHandle_t get_instance_handle(); 134 : 135 : virtual void register_for_writer(const GUID_t& /*participant*/, 136 : const GUID_t& /*readerid*/, 137 : const GUID_t& /*writerid*/, 138 : const TransportLocatorSeq& /*locators*/, 139 : DiscoveryListener* /*listener*/); 140 : 141 : virtual void unregister_for_writer(const GUID_t& /*participant*/, 142 : const GUID_t& /*readerid*/, 143 : const GUID_t& /*writerid*/); 144 : 145 0 : virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint() { return WeakRcHandle<ICE::Endpoint>(); } 146 : 147 : protected: 148 : virtual void remove_associations_i(const WriterIdSeq& writers, bool callback); 149 : 150 : private: 151 : 152 : void notify_subscription_lost(const DDS::InstanceHandleSeq& handles); 153 : 154 : /// Lookup the instance handles by the publication repo ids 155 : void lookup_instance_handles(const WriterIdSeq& ids, 156 : DDS::InstanceHandleSeq& hdls); 157 : 158 : #ifndef OPENDDS_SAFETY_PROFILE 159 : DDS::DynamicData_ptr get_dynamic_data(const RawDataSample& sample); 160 : #endif 161 0 : void check_encap(bool b) { check_encap_ = b; } 162 0 : bool check_encap() const { return check_encap_; } 163 : 164 : DDS::DataReaderQos qos_; 165 : DDS::DataReaderQos passed_qos_; 166 : 167 : /// lock protecting sample container as well as statuses. 168 : ACE_Recursive_Thread_Mutex sample_lock_; 169 : 170 : DomainParticipantImpl* participant_servant_; 171 : TopicDescriptionPtr<TopicImpl> topic_servant_; 172 : 173 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 174 : bool is_exclusive_ownership_; 175 : 176 : OwnershipManager* owner_manager_; 177 : #endif 178 : 179 : DDS::SubscriberQos subqos_; 180 : 181 : friend class ::DDS_TEST; //allows tests to get at private data 182 : 183 : DDS::TopicDescription_var topic_desc_; 184 : DDS::StatusMask listener_mask_; 185 : RecorderListener_rch listener_; 186 : DDS::DomainId_t domain_id_; 187 : 188 : ACE_Recursive_Thread_Mutex publication_handle_lock_; 189 : 190 : typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap; 191 : RepoIdToHandleMap id_to_handle_map_; 192 : 193 : DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_; 194 : DDS::SubscriptionMatchedStatus subscription_match_status_; 195 : 196 : /// Flag indicates that this datareader is a builtin topic 197 : /// datareader. 198 : bool is_bit_; 199 : 200 : /// publications writing to this reader. 201 : typedef OPENDDS_MAP_CMP(GUID_t, RcHandle<WriterInfo>, 202 : GUID_tKeyLessThan) WriterMapType; 203 : WriterMapType writers_; 204 : 205 : /// RW lock for reading/writing publications. 206 : ACE_RW_Thread_Mutex writers_lock_; 207 : 208 : #ifndef OPENDDS_SAFETY_PROFILE 209 : typedef OPENDDS_MAP(GUID_t, DDS::DynamicType_var) DynamicTypeByPubId; 210 : DynamicTypeByPubId dt_map_; 211 : #endif 212 : bool check_encap_; 213 : 214 : TransportMessageBlockAllocator mb_alloc_; 215 : }; 216 : 217 : 218 : } // namespace DCPS 219 : } // namespace 220 : 221 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 222 : 223 : #endif /* end of include guard: OPENDDS_DCPS_RECORDERIMPL_H */