00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATAREADER_H
00009 #define OPENDDS_DCPS_DATAREADER_H
00010
00011 #include "dcps_export.h"
00012 #include "EntityImpl.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "DisjointSequence.h"
00023 #include "SubscriptionInstance.h"
00024 #include "InstanceState.h"
00025 #include "Cached_Allocator_With_Overflow_T.h"
00026 #include "ZeroCopyInfoSeq_T.h"
00027 #include "Stats_T.h"
00028 #include "OwnershipManager.h"
00029 #include "ContentFilteredTopicImpl.h"
00030 #include "GroupRakeData.h"
00031 #include "CoherentChangeControl.h"
00032 #include "AssociationData.h"
00033 #include "dds/DdsDcpsInfrastructureC.h"
00034 #include "RcHandle_T.h"
00035 #include "RcObject.h"
00036 #include "WriterInfo.h"
00037 #include "ReactorInterceptor.h"
00038 #include "Service_Participant.h"
00039 #include "PoolAllocator.h"
00040 #include "RemoveAssociationSweeper.h"
00041 #include "RcEventHandler.h"
00042 #include "TopicImpl.h"
00043 #include "DomainParticipantImpl.h"
00044
00045 #include "ace/String_Base.h"
00046 #include "ace/Reverse_Lock_T.h"
00047 #include "ace/Atomic_Op.h"
00048 #include "ace/Reactor.h"
00049
00050 #include "dds/DCPS/PoolAllocator.h"
00051 #include <memory>
00052
00053 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00054 #pragma once
00055 #endif
00056
00057 class DDS_TEST;
00058
00059 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00060
00061 namespace OpenDDS {
00062 namespace DCPS {
00063
00064 class SubscriberImpl;
00065 class DomainParticipantImpl;
00066 class SubscriptionInstance;
00067 class TopicImpl;
00068 class TopicDescriptionImpl;
00069 class RequestedDeadlineWatchdog;
00070 class Monitor;
00071 class DataReaderImpl;
00072 class FilterEvaluator;
00073
00074 typedef Cached_Allocator_With_Overflow<OpenDDS::DCPS::ReceivedDataElementMemoryBlock, ACE_Null_Mutex>
00075 ReceivedDataAllocator;
00076
00077 enum MarshalingType {
00078 FULL_MARSHALING,
00079 KEY_ONLY_MARSHALING
00080 };
00081
00082
00083 class OpenDDS_Dcps_Export WriterStats {
00084 public:
00085
00086 WriterStats(
00087 int amount = 0,
00088 DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
00089
00090
00091 void add_stat(const ACE_Time_Value& delay);
00092
00093
00094 LatencyStatistics get_stats() const;
00095
00096
00097 void reset_stats();
00098
00099 #ifndef OPENDDS_SAFETY_PROFILE
00100
00101 std::ostream& raw_data(std::ostream& str) const;
00102 #endif
00103
00104 private:
00105
00106 Stats<double> stats_;
00107 };
00108
00109 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00110
00111 class OpenDDS_Dcps_Export AbstractSamples
00112 {
00113 public:
00114 virtual ~AbstractSamples(){}
00115 virtual void reserve(CORBA::ULong size)=0;
00116 virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
00117 };
00118
00119 #endif
00120
00121
00122
00123 class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
00124 public:
00125 EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
00126 ACE_thread_t owner,
00127 DataReaderImpl* reader);
00128
00129 void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00130 void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00131
00132
00133 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00134
00135 virtual bool reactor_is_shut_down() const
00136 {
00137 return TheServiceParticipant->is_shut_down();
00138 }
00139
00140 private:
00141 ~EndHistoricSamplesMissedSweeper();
00142
00143 WeakRcHandle<DataReaderImpl> reader_;
00144 OPENDDS_SET(RcHandle<OpenDDS::DCPS::WriterInfo>) info_set_;
00145
00146 class CommandBase : public Command {
00147 public:
00148 CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
00149 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00150 : sweeper_ (sweeper)
00151 , info_(info)
00152 { }
00153
00154 protected:
00155 EndHistoricSamplesMissedSweeper* sweeper_;
00156 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00157 };
00158
00159 class ScheduleCommand : public CommandBase {
00160 public:
00161 ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
00162 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00163 : CommandBase(sweeper, info)
00164 { }
00165 virtual void execute();
00166 };
00167
00168 class CancelCommand : public CommandBase {
00169 public:
00170 CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
00171 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00172 : CommandBase(sweeper, info)
00173 { }
00174 virtual void execute();
00175 };
00176 };
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190 class OpenDDS_Dcps_Export DataReaderImpl
00191 : public virtual LocalObject<DataReaderEx>,
00192 public virtual DataReaderCallbacks,
00193 public virtual EntityImpl,
00194 public virtual TransportClient,
00195 public virtual TransportReceiveListener,
00196 private WriterInfoListener {
00197 public:
00198 friend class RequestedDeadlineWatchdog;
00199 friend class QueryConditionImpl;
00200 friend class SubscriberImpl;
00201
00202 typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType;
00203
00204
00205 typedef OPENDDS_MAP_CMP(PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType;
00206
00207 DataReaderImpl();
00208
00209 virtual ~DataReaderImpl();
00210
00211 virtual DDS::InstanceHandle_t get_instance_handle();
00212
00213 virtual void add_association(const RepoId& yourId,
00214 const WriterAssociation& writer,
00215 bool active);
00216
00217 virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00218
00219 virtual void association_complete(const RepoId& remote_id);
00220
00221 virtual void remove_associations(const WriterIdSeq& writers, bool callback);
00222
00223 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00224
00225 virtual void inconsistent_topic();
00226
00227 virtual void signal_liveliness(const RepoId& remote_participant);
00228
00229
00230
00231
00232
00233
00234
00235
00236 DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
00237
00238
00239
00240
00241 void writer_became_alive(WriterInfo& info,
00242 const ACE_Time_Value& when);
00243
00244
00245
00246
00247 void writer_became_dead(WriterInfo& info,
00248 const ACE_Time_Value& when);
00249
00250
00251
00252 void writer_removed(WriterInfo& info);
00253
00254 virtual void cleanup();
00255
00256 void init(
00257 TopicDescriptionImpl* a_topic_desc,
00258 const DDS::DataReaderQos & qos,
00259 DDS::DataReaderListener_ptr a_listener,
00260 const DDS::StatusMask & mask,
00261 DomainParticipantImpl* participant,
00262 SubscriberImpl* subscriber);
00263
00264 virtual DDS::ReadCondition_ptr create_readcondition(
00265 DDS::SampleStateMask sample_states,
00266 DDS::ViewStateMask view_states,
00267 DDS::InstanceStateMask instance_states);
00268
00269 #ifndef OPENDDS_NO_QUERY_CONDITION
00270 virtual DDS::QueryCondition_ptr create_querycondition(
00271 DDS::SampleStateMask sample_states,
00272 DDS::ViewStateMask view_states,
00273 DDS::InstanceStateMask instance_states,
00274 const char * query_expression,
00275 const DDS::StringSeq & query_parameters);
00276 #endif
00277
00278 virtual DDS::ReturnCode_t delete_readcondition(
00279 DDS::ReadCondition_ptr a_condition);
00280
00281 virtual DDS::ReturnCode_t delete_contained_entities();
00282
00283 virtual DDS::ReturnCode_t set_qos(
00284 const DDS::DataReaderQos & qos);
00285
00286 virtual DDS::ReturnCode_t get_qos(
00287 DDS::DataReaderQos & qos);
00288
00289 virtual DDS::ReturnCode_t set_listener(
00290 DDS::DataReaderListener_ptr a_listener,
00291 DDS::StatusMask mask);
00292
00293 virtual DDS::DataReaderListener_ptr get_listener();
00294
00295 virtual DDS::TopicDescription_ptr get_topicdescription();
00296
00297 virtual DDS::Subscriber_ptr get_subscriber();
00298
00299 virtual DDS::ReturnCode_t get_sample_rejected_status(
00300 DDS::SampleRejectedStatus & status);
00301
00302 virtual DDS::ReturnCode_t get_liveliness_changed_status(
00303 DDS::LivelinessChangedStatus & status);
00304
00305 virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
00306 DDS::RequestedDeadlineMissedStatus & status);
00307
00308 virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
00309 DDS::RequestedIncompatibleQosStatus & status);
00310
00311 virtual DDS::ReturnCode_t get_subscription_matched_status(
00312 DDS::SubscriptionMatchedStatus & status);
00313
00314 virtual DDS::ReturnCode_t get_sample_lost_status(
00315 DDS::SampleLostStatus & status);
00316
00317 virtual DDS::ReturnCode_t wait_for_historical_data(
00318 const DDS::Duration_t & max_wait);
00319
00320 virtual DDS::ReturnCode_t get_matched_publications(
00321 DDS::InstanceHandleSeq & publication_handles);
00322
00323 #if !defined (DDS_HAS_MINIMUM_BIT)
00324 virtual DDS::ReturnCode_t get_matched_publication_data(
00325 DDS::PublicationBuiltinTopicData & publication_data,
00326 DDS::InstanceHandle_t publication_handle);
00327 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00328
00329 virtual DDS::ReturnCode_t enable();
00330
00331 #ifndef OPENDDS_SAFETY_PROFILE
00332 virtual void get_latency_stats(
00333 OpenDDS::DCPS::LatencyStatisticsSeq & stats);
00334 #endif
00335
00336 virtual void reset_latency_stats();
00337
00338 virtual CORBA::Boolean statistics_enabled();
00339
00340 virtual void statistics_enabled(
00341 CORBA::Boolean statistics_enabled);
00342
00343
00344
00345
00346
00347 const StatsMapType& raw_latency_statistics() const;
00348
00349
00350 unsigned int& raw_latency_buffer_size();
00351
00352
00353 DataCollector<double>::OnFull& raw_latency_buffer_type();
00354
00355
00356
00357
00358 void writer_activity(const DataSampleHeader& header);
00359
00360
00361 virtual void data_received(const ReceivedDataSample& sample);
00362
00363 virtual bool check_transport_qos(const TransportInst& inst);
00364
00365 RepoId get_subscription_id() const;
00366
00367 bool have_sample_states(DDS::SampleStateMask sample_states) const;
00368 bool have_view_states(DDS::ViewStateMask view_states) const;
00369 bool have_instance_states(DDS::InstanceStateMask instance_states) const;
00370 bool contains_sample(DDS::SampleStateMask sample_states,
00371 DDS::ViewStateMask view_states,
00372 DDS::InstanceStateMask instance_states);
00373
00374 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00375 virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00376 DDS::ViewStateMask view_states,
00377 DDS::InstanceStateMask instance_states,
00378 const FilterEvaluator& evaluator,
00379 const DDS::StringSeq& params) = 0;
00380 #endif
00381
00382 virtual void dds_demarshal(const ReceivedDataSample& sample,
00383 SubscriptionInstance_rch& instance,
00384 bool & is_new_instance,
00385 bool & filtered,
00386 MarshalingType marshaling_type)= 0;
00387
00388 virtual void dispose_unregister(const ReceivedDataSample& sample,
00389 SubscriptionInstance_rch& instance);
00390
00391 void process_latency(const ReceivedDataSample& sample);
00392 void notify_latency(PublicationId writer);
00393
00394 CORBA::Long get_depth() const {
00395 return depth_;
00396 }
00397 size_t get_n_chunks() const {
00398 return n_chunks_;
00399 }
00400
00401 void liveliness_lost();
00402
00403 void remove_all_associations();
00404
00405 void notify_subscription_disconnected(const WriterIdSeq& pubids);
00406 void notify_subscription_reconnected(const WriterIdSeq& pubids);
00407 void notify_subscription_lost(const WriterIdSeq& pubids);
00408 void notify_liveliness_change();
00409
00410 bool is_bit() const;
00411
00412
00413
00414
00415
00416
00417
00418 bool has_zero_copies();
00419
00420
00421 void release_instance(DDS::InstanceHandle_t handle);
00422
00423
00424 void reschedule_deadline();
00425
00426 ACE_Reactor_Timer_Interface* get_reactor();
00427
00428 RepoId get_topic_id();
00429 RepoId get_dp_id();
00430
00431 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00432 void get_instance_handles(InstanceHandleVec& instance_handles);
00433
00434 typedef std::pair<PublicationId, WriterInfo::WriterState> WriterStatePair;
00435 typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
00436 void get_writer_states(WriterStatePairVec& writer_states);
00437
00438 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00439 void update_ownership_strength (const PublicationId& pub_id,
00440 const CORBA::Long& ownership_strength);
00441
00442
00443
00444 class OwnershipManagerPtr
00445 {
00446 public:
00447 OwnershipManagerPtr(DataReaderImpl* reader)
00448 : participant_( reader->is_exclusive_ownership_ ? reader->participant_servant_.lock() : RcHandle<DomainParticipantImpl>())
00449 {
00450 }
00451 operator bool() const { return participant_.in(); }
00452 OwnershipManager* operator->() const
00453 {
00454 return participant_->ownership_manager();
00455 }
00456
00457 private:
00458 RcHandle<DomainParticipantImpl> participant_;
00459 };
00460 friend class OwnershipManagerPtr;
00461
00462 OwnershipManagerPtr ownership_manager() { return OwnershipManagerPtr(this); }
00463 #endif
00464
00465 virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00466 OpenDDS::DCPS::SubscriptionInstance_rch& instance) = 0;
00467
00468 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00469
00470 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00471
00472 void enable_filtering(ContentFilteredTopicImpl* cft);
00473
00474 DDS::ContentFilteredTopic_ptr get_cf_topic() const;
00475
00476 #endif
00477
00478 void update_subscription_params(const DDS::StringSeq& params) const;
00479
00480 typedef OPENDDS_VECTOR(void*) GenericSeq;
00481
00482 struct GenericBundle {
00483 GenericSeq samples_;
00484 DDS::SampleInfoSeq info_;
00485 };
00486
00487 virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
00488 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00489 DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
00490
00491 virtual DDS::ReturnCode_t take(
00492 AbstractSamples& samples,
00493 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00494 DDS::InstanceStateMask instance_states)=0;
00495
00496 virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
00497
00498 virtual DDS::ReturnCode_t read_instance_generic(void*& data,
00499 DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00500 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00501 DDS::InstanceStateMask instance_states) = 0;
00502
00503 virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
00504 DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00505 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00506 DDS::InstanceStateMask instance_states) = 0;
00507
00508 virtual void set_instance_state(DDS::InstanceHandle_t instance,
00509 DDS::InstanceStateKind state) = 0;
00510
00511 #endif
00512
00513 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00514 void begin_access();
00515 void end_access();
00516 void get_ordered_data(GroupRakeData& data,
00517 DDS::SampleStateMask sample_states,
00518 DDS::ViewStateMask view_states,
00519 DDS::InstanceStateMask instance_states);
00520
00521 void accept_coherent (PublicationId& writer_id,
00522 RepoId& publisher_id);
00523 void reject_coherent (PublicationId& writer_id,
00524 RepoId& publisher_id);
00525 void coherent_change_received (RepoId publisher_id, Coherent_State& result);
00526
00527 void coherent_changes_completed (DataReaderImpl* reader);
00528
00529 void reset_coherent_info (const PublicationId& writer_id,
00530 const RepoId& publisher_id);
00531 #endif
00532
00533
00534 void set_subscriber_qos(const DDS::SubscriberQos & qos);
00535
00536
00537 void reset_ownership (DDS::InstanceHandle_t instance);
00538
00539 virtual RcHandle<EntityImpl> parent() const;
00540
00541 void disable_transport();
00542
00543 virtual void register_for_writer(const RepoId& ,
00544 const RepoId& ,
00545 const RepoId& ,
00546 const TransportLocatorSeq& ,
00547 DiscoveryListener* );
00548
00549 virtual void unregister_for_writer(const RepoId& ,
00550 const RepoId& ,
00551 const RepoId& );
00552
00553 protected:
00554 virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00555 void remove_publication(const PublicationId& pub_id);
00556
00557 void prepare_to_delete();
00558
00559 RcHandle<SubscriberImpl> get_subscriber_servant();
00560
00561 void post_read_or_take();
00562
00563
00564 virtual DDS::ReturnCode_t enable_specific() = 0;
00565
00566 void sample_info(DDS::SampleInfo & sample_info,
00567 const ReceivedDataElement *ptr);
00568
00569 CORBA::Long total_samples() const;
00570
00571 void set_sample_lost_status(const DDS::SampleLostStatus& status);
00572 void set_sample_rejected_status(
00573 const DDS::SampleRejectedStatus& status);
00574
00575
00576 SubscriptionInstance_rch get_handle_instance(
00577 DDS::InstanceHandle_t handle);
00578
00579
00580
00581
00582 DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
00583
00584 virtual void purge_data(SubscriptionInstance_rch instance) = 0;
00585
00586 virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
00587
00588 bool has_readcondition(DDS::ReadCondition_ptr a_condition);
00589
00590
00591 mutable SubscriptionInstanceMapType instances_;
00592
00593
00594
00595
00596 mutable ACE_Recursive_Thread_Mutex instances_lock_;
00597
00598
00599
00600
00601
00602
00603
00604
00605 bool filter_sample(const DataSampleHeader& header);
00606
00607 bool ownership_filter_instance(const SubscriptionInstance_rch& instance,
00608 const PublicationId& pubid);
00609 bool time_based_filter_instance(const SubscriptionInstance_rch& instance,
00610 ACE_Time_Value& filter_time_expired);
00611
00612 void accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance);
00613
00614 virtual void qos_change(const DDS::DataReaderQos& qos);
00615
00616
00617 void notify_read_conditions();
00618
00619 unique_ptr<ReceivedDataAllocator> rd_allocator_;
00620 DDS::DataReaderQos qos_;
00621
00622
00623 DDS::SampleRejectedStatus sample_rejected_status_;
00624 DDS::SampleLostStatus sample_lost_status_;
00625
00626
00627 ACE_Recursive_Thread_Mutex sample_lock_;
00628
00629 typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
00630 Reverse_Lock_t reverse_sample_lock_;
00631
00632 WeakRcHandle<DomainParticipantImpl> participant_servant_;
00633 TopicDescriptionPtr<TopicImpl> topic_servant_;
00634
00635 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00636 bool is_exclusive_ownership_;
00637
00638 #endif
00639
00640 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00641 TopicDescriptionPtr<ContentFilteredTopicImpl> content_filtered_topic_;
00642 #endif
00643
00644
00645
00646 bool coherent_;
00647
00648
00649 GroupRakeData group_coherent_ordered_data_;
00650
00651 DDS::SubscriberQos subqos_;
00652
00653 protected:
00654 virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00655
00656 private:
00657
00658 void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00659
00660
00661 void lookup_instance_handles(const WriterIdSeq& ids,
00662 DDS::InstanceHandleSeq& hdls);
00663
00664 void instances_liveliness_update(WriterInfo& info,
00665 const ACE_Time_Value& when);
00666
00667 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00668 bool verify_coherent_changes_completion(WriterInfo* writer);
00669 bool coherent_change_received(WriterInfo* writer);
00670 #endif
00671
00672 const RepoId& get_repo_id() const { return this->subscription_id_; }
00673 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00674
00675 Priority get_priority_value(const AssociationData& data) const {
00676 return data.publication_transport_priority_;
00677 }
00678
00679 #if defined(OPENDDS_SECURITY)
00680 DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
00681 #endif
00682
00683
00684 void resume_sample_processing(const PublicationId& pub_id);
00685
00686
00687
00688 bool check_historic(const ReceivedDataSample& sample);
00689
00690
00691 void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
00692
00693 friend class InstanceState;
00694 friend class EndHistoricSamplesMissedSweeper;
00695 friend class RemoveAssociationSweeper<DataReaderImpl>;
00696
00697 friend class ::DDS_TEST;
00698
00699 DDS::TopicDescription_var topic_desc_;
00700 DDS::StatusMask listener_mask_;
00701 DDS::DataReaderListener_var listener_;
00702 DDS::DomainId_t domain_id_;
00703 RepoId dp_id_;
00704
00705
00706
00707 WeakRcHandle<SubscriberImpl> subscriber_servant_;
00708 RcHandle<EndHistoricSamplesMissedSweeper> end_historic_sweeper_;
00709 RcHandle<RemoveAssociationSweeper<DataReaderImpl> > remove_association_sweeper_;
00710
00711 CORBA::Long depth_;
00712 size_t n_chunks_;
00713
00714
00715 ACE_Recursive_Thread_Mutex publication_handle_lock_;
00716 Reverse_Lock_t reverse_pub_handle_lock_;
00717
00718 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00719 RepoIdToHandleMap id_to_handle_map_;
00720
00721
00722 DDS::LivelinessChangedStatus liveliness_changed_status_;
00723 DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_;
00724 DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00725 DDS::SubscriptionMatchedStatus subscription_match_status_;
00726
00727
00728 BudgetExceededStatus budget_exceeded_status_;
00729
00730
00731
00732
00733
00734
00735
00736
00737 SubscriptionLostStatus subscription_lost_status_;
00738
00739
00740
00741
00742
00743
00744 ACE_Reactor_Timer_Interface* reactor_;
00745
00746 class LivelinessTimer : public ReactorInterceptor {
00747 public:
00748 LivelinessTimer(ACE_Reactor* reactor,
00749 ACE_thread_t owner,
00750 DataReaderImpl* data_reader)
00751 : ReactorInterceptor(reactor, owner)
00752 , data_reader_(*data_reader)
00753 , liveliness_timer_id_(-1)
00754 { }
00755
00756 void check_liveliness()
00757 {
00758 CheckLivelinessCommand c(this);
00759 execute_or_enqueue(c);
00760 }
00761
00762 void cancel_timer()
00763 {
00764 CancelCommand c(this);
00765 execute_or_enqueue(c);
00766 }
00767
00768 virtual bool reactor_is_shut_down() const
00769 {
00770 return TheServiceParticipant->is_shut_down();
00771 }
00772
00773 private:
00774 ~LivelinessTimer() { }
00775
00776 WeakRcHandle<DataReaderImpl> data_reader_;
00777
00778
00779 long liveliness_timer_id_;
00780 void check_liveliness_i(bool cancel, const ACE_Time_Value& current_time);
00781
00782 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00783
00784 class CommandBase : public Command {
00785 public:
00786 CommandBase(LivelinessTimer* timer)
00787 : timer_(timer)
00788 { }
00789
00790 protected:
00791 LivelinessTimer* timer_;
00792 };
00793
00794 class CheckLivelinessCommand : public CommandBase {
00795 public:
00796 CheckLivelinessCommand(LivelinessTimer* timer)
00797 : CommandBase(timer)
00798 { }
00799 virtual void execute()
00800 {
00801 timer_->check_liveliness_i(true, ACE_OS::gettimeofday());
00802 }
00803 };
00804
00805 class CancelCommand : public CommandBase {
00806 public:
00807 CancelCommand(LivelinessTimer* timer)
00808 : CommandBase(timer)
00809 { }
00810 virtual void execute()
00811 {
00812 if (timer_->liveliness_timer_id_ != -1) {
00813 timer_->reactor()->cancel_timer(timer_);
00814 }
00815 }
00816 };
00817 };
00818 RcHandle<LivelinessTimer> liveliness_timer_;
00819
00820 CORBA::Long last_deadline_missed_total_count_;
00821
00822
00823 RcHandle<RequestedDeadlineWatchdog> watchdog_;
00824
00825
00826
00827 bool is_bit_;
00828
00829 bool always_get_history_;
00830
00831
00832 bool statistics_enabled_;
00833
00834
00835 typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00836 GUID_tKeyLessThan) WriterMapType;
00837
00838 WriterMapType writers_;
00839
00840
00841 ACE_RW_Thread_Mutex writers_lock_;
00842
00843
00844 StatsMapType statistics_;
00845
00846
00847 unsigned int raw_latency_buffer_size_;
00848
00849
00850 DataCollector<double>::OnFull raw_latency_buffer_type_;
00851
00852 typedef VarLess<DDS::ReadCondition> RCCompLess;
00853 typedef OPENDDS_SET_CMP(DDS::ReadCondition_var, RCCompLess) ReadConditionSet;
00854 ReadConditionSet read_conditions_;
00855
00856
00857 Monitor* monitor_;
00858
00859
00860 Monitor* periodic_monitor_;
00861
00862 bool transport_disabled_;
00863 };
00864
00865 typedef RcHandle<DataReaderImpl> DataReaderImpl_rch;
00866
00867 }
00868 }
00869
00870 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00871
00872 #if defined (__ACE_INLINE__)
00873 # include "DataReaderImpl.inl"
00874 #endif
00875
00876 #endif