00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATAREADER_H
00009 #define OPENDDS_DCPS_DATAREADER_H
00010
00011 #include "dcps_export.h"
00012 #include "EntityImpl.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "DisjointSequence.h"
00023 #include "SubscriptionInstance.h"
00024 #include "InstanceState.h"
00025 #include "Cached_Allocator_With_Overflow_T.h"
00026 #include "ZeroCopyInfoSeq_T.h"
00027 #include "Stats_T.h"
00028 #include "OwnershipManager.h"
00029 #include "ContentFilteredTopicImpl.h"
00030 #include "GroupRakeData.h"
00031 #include "CoherentChangeControl.h"
00032 #include "AssociationData.h"
00033 #include "dds/DdsDcpsInfrastructureC.h"
00034 #include "RcHandle_T.h"
00035 #include "RcObject_T.h"
00036 #include "WriterInfo.h"
00037 #include "ReactorInterceptor.h"
00038 #include "Service_Participant.h"
00039 #include "PoolAllocator.h"
00040 #include "RemoveAssociationSweeper.h"
00041
00042 #include "ace/String_Base.h"
00043 #include "ace/Reverse_Lock_T.h"
00044 #include "ace/Atomic_Op.h"
00045 #include "ace/Reactor.h"
00046
00047 #include "dds/DCPS/PoolAllocator.h"
00048 #include <memory>
00049
00050 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00051 #pragma once
00052 #endif
00053
00054 class DDS_TEST;
00055
00056 namespace OpenDDS {
00057 namespace DCPS {
00058
00059 class SubscriberImpl;
00060 class DomainParticipantImpl;
00061 class SubscriptionInstance;
00062 class TopicImpl;
00063 class TopicDescriptionImpl;
00064 class RequestedDeadlineWatchdog;
00065 class Monitor;
00066 class DataReaderImpl;
00067 class FilterEvaluator;
00068
00069 typedef Cached_Allocator_With_Overflow<OpenDDS::DCPS::ReceivedDataElement, ACE_Null_Mutex>
00070 ReceivedDataAllocator;
00071
00072 enum MarshalingType {
00073 FULL_MARSHALING,
00074 KEY_ONLY_MARSHALING
00075 };
00076
00077
00078 class OpenDDS_Dcps_Export WriterStats {
00079 public:
00080
00081 WriterStats(
00082 int amount = 0,
00083 DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
00084
00085
00086 void add_stat(const ACE_Time_Value& delay);
00087
00088
00089 LatencyStatistics get_stats() const;
00090
00091
00092 void reset_stats();
00093
00094 #ifndef OPENDDS_SAFETY_PROFILE
00095
00096 std::ostream& raw_data(std::ostream& str) const;
00097 #endif
00098
00099 private:
00100
00101 Stats<double> stats_;
00102 };
00103
00104 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00105
00106 class OpenDDS_Dcps_Export AbstractSamples
00107 {
00108 public:
00109 virtual ~AbstractSamples(){}
00110 virtual void reserve(CORBA::ULong size)=0;
00111 virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
00112 };
00113
00114 #endif
00115
00116
00117 class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
00118 public:
00119 EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
00120 ACE_thread_t owner,
00121 DataReaderImpl* reader);
00122
00123 void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00124 void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00125
00126
00127 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00128
00129 virtual bool reactor_is_shut_down() const
00130 {
00131 return TheServiceParticipant->is_shut_down();
00132 }
00133
00134 private:
00135 ~EndHistoricSamplesMissedSweeper();
00136
00137 DataReaderImpl* reader_;
00138
00139 class CommandBase : public Command {
00140 public:
00141 CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
00142 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00143 : sweeper_ (sweeper)
00144 , info_(info)
00145 { }
00146
00147 protected:
00148 EndHistoricSamplesMissedSweeper* sweeper_;
00149 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00150 };
00151
00152 class ScheduleCommand : public CommandBase {
00153 public:
00154 ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
00155 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00156 : CommandBase(sweeper, info)
00157 { }
00158 virtual void execute();
00159 };
00160
00161 class CancelCommand : public CommandBase {
00162 public:
00163 CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
00164 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00165 : CommandBase(sweeper, info)
00166 { }
00167 virtual void execute();
00168 };
00169 };
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 class OpenDDS_Dcps_Export DataReaderImpl
00184 : public virtual LocalObject<DataReaderEx>,
00185 public virtual DataReaderCallbacks,
00186 public virtual EntityImpl,
00187 public virtual TransportClient,
00188 public virtual TransportReceiveListener,
00189 private WriterInfoListener {
00190 public:
00191 friend class RequestedDeadlineWatchdog;
00192 friend class QueryConditionImpl;
00193 friend class SubscriberImpl;
00194
00195 typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance*) SubscriptionInstanceMapType;
00196
00197
00198 typedef OPENDDS_MAP_CMP(PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType;
00199
00200
00201 DataReaderImpl();
00202
00203
00204 virtual ~DataReaderImpl();
00205
00206 virtual DDS::InstanceHandle_t get_instance_handle();
00207
00208 virtual void add_association(const RepoId& yourId,
00209 const WriterAssociation& writer,
00210 bool active);
00211
00212 virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00213
00214 virtual void association_complete(const RepoId& remote_id);
00215
00216 virtual void remove_associations(const WriterIdSeq& writers, bool callback);
00217
00218 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00219
00220 virtual void inconsistent_topic();
00221
00222 virtual void signal_liveliness(const RepoId& remote_participant);
00223
00224
00225
00226
00227
00228
00229
00230
00231 DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
00232
00233
00234
00235
00236 void writer_became_alive(WriterInfo& info,
00237 const ACE_Time_Value& when);
00238
00239
00240
00241
00242 void writer_became_dead(WriterInfo& info,
00243 const ACE_Time_Value& when);
00244
00245
00246
00247 void writer_removed(WriterInfo& info);
00248
00249
00250
00251
00252 void cleanup();
00253
00254 void init(
00255 TopicDescriptionImpl* a_topic_desc,
00256 const DDS::DataReaderQos & qos,
00257 DDS::DataReaderListener_ptr a_listener,
00258 const DDS::StatusMask & mask,
00259 DomainParticipantImpl* participant,
00260 SubscriberImpl* subscriber,
00261 DDS::DataReader_ptr dr_objref);
00262
00263 virtual DDS::ReadCondition_ptr create_readcondition(
00264 DDS::SampleStateMask sample_states,
00265 DDS::ViewStateMask view_states,
00266 DDS::InstanceStateMask instance_states);
00267
00268 #ifndef OPENDDS_NO_QUERY_CONDITION
00269 virtual DDS::QueryCondition_ptr create_querycondition(
00270 DDS::SampleStateMask sample_states,
00271 DDS::ViewStateMask view_states,
00272 DDS::InstanceStateMask instance_states,
00273 const char * query_expression,
00274 const DDS::StringSeq & query_parameters);
00275 #endif
00276
00277 virtual DDS::ReturnCode_t delete_readcondition(
00278 DDS::ReadCondition_ptr a_condition);
00279
00280 virtual DDS::ReturnCode_t delete_contained_entities();
00281
00282 virtual DDS::ReturnCode_t set_qos(
00283 const DDS::DataReaderQos & qos);
00284
00285 virtual DDS::ReturnCode_t get_qos(
00286 DDS::DataReaderQos & qos);
00287
00288 virtual DDS::ReturnCode_t set_listener(
00289 DDS::DataReaderListener_ptr a_listener,
00290 DDS::StatusMask mask);
00291
00292 virtual DDS::DataReaderListener_ptr get_listener();
00293
00294 virtual DDS::TopicDescription_ptr get_topicdescription();
00295
00296 virtual DDS::Subscriber_ptr get_subscriber();
00297
00298 virtual DDS::ReturnCode_t get_sample_rejected_status(
00299 DDS::SampleRejectedStatus & status);
00300
00301 virtual DDS::ReturnCode_t get_liveliness_changed_status(
00302 DDS::LivelinessChangedStatus & status);
00303
00304 virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
00305 DDS::RequestedDeadlineMissedStatus & status);
00306
00307 virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
00308 DDS::RequestedIncompatibleQosStatus & status);
00309
00310 virtual DDS::ReturnCode_t get_subscription_matched_status(
00311 DDS::SubscriptionMatchedStatus & status);
00312
00313 virtual DDS::ReturnCode_t get_sample_lost_status(
00314 DDS::SampleLostStatus & status);
00315
00316 virtual DDS::ReturnCode_t wait_for_historical_data(
00317 const DDS::Duration_t & max_wait);
00318
00319 virtual DDS::ReturnCode_t get_matched_publications(
00320 DDS::InstanceHandleSeq & publication_handles);
00321
00322 #if !defined (DDS_HAS_MINIMUM_BIT)
00323 virtual DDS::ReturnCode_t get_matched_publication_data(
00324 DDS::PublicationBuiltinTopicData & publication_data,
00325 DDS::InstanceHandle_t publication_handle);
00326 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00327
00328 virtual DDS::ReturnCode_t enable();
00329
00330 #ifndef OPENDDS_SAFETY_PROFILE
00331 virtual void get_latency_stats(
00332 OpenDDS::DCPS::LatencyStatisticsSeq & stats);
00333 #endif
00334
00335 virtual void reset_latency_stats();
00336
00337 virtual CORBA::Boolean statistics_enabled();
00338
00339 virtual void statistics_enabled(
00340 CORBA::Boolean statistics_enabled);
00341
00342
00343
00344
00345
00346 const StatsMapType& raw_latency_statistics() const;
00347
00348
00349 unsigned int& raw_latency_buffer_size();
00350
00351
00352 DataCollector<double>::OnFull& raw_latency_buffer_type();
00353
00354
00355
00356
00357 void writer_activity(const DataSampleHeader& header);
00358
00359
00360 virtual void data_received(const ReceivedDataSample& sample);
00361
00362 virtual bool check_transport_qos(const TransportInst& inst);
00363
00364 RepoId get_subscription_id() const;
00365
00366 DDS::DataReader_ptr get_dr_obj_ref();
00367
00368 char *get_topic_name() const;
00369
00370 bool have_sample_states(DDS::SampleStateMask sample_states) const;
00371 bool have_view_states(DDS::ViewStateMask view_states) const;
00372 bool have_instance_states(DDS::InstanceStateMask instance_states) const;
00373 bool contains_sample(DDS::SampleStateMask sample_states,
00374 DDS::ViewStateMask view_states,
00375 DDS::InstanceStateMask instance_states);
00376
00377 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00378 virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00379 DDS::ViewStateMask view_states,
00380 DDS::InstanceStateMask instance_states,
00381 const FilterEvaluator& evaluator,
00382 const DDS::StringSeq& params) = 0;
00383 #endif
00384
00385 virtual void dds_demarshal(const ReceivedDataSample& sample,
00386 SubscriptionInstance*& instance,
00387 bool & is_new_instance,
00388 bool & filtered,
00389 MarshalingType marshaling_type)= 0;
00390
00391 virtual void dispose_unregister(const ReceivedDataSample& sample,
00392 SubscriptionInstance*& instance);
00393
00394 void process_latency(const ReceivedDataSample& sample);
00395 void notify_latency(PublicationId writer);
00396
00397 CORBA::Long get_depth() const {
00398 return depth_;
00399 }
00400 size_t get_n_chunks() const {
00401 return n_chunks_;
00402 }
00403
00404 void liveliness_lost();
00405
00406 void remove_all_associations();
00407
00408 void notify_subscription_disconnected(const WriterIdSeq& pubids);
00409 void notify_subscription_reconnected(const WriterIdSeq& pubids);
00410 void notify_subscription_lost(const WriterIdSeq& pubids);
00411 virtual void notify_connection_deleted(const RepoId& peerId);
00412 void notify_liveliness_change();
00413
00414 bool is_bit() const;
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 virtual DDS::ReturnCode_t auto_return_loan(void* seq) = 0;
00428
00429
00430
00431
00432
00433 virtual int num_zero_copies();
00434
00435 virtual void dec_ref_data_element(ReceivedDataElement* r) = 0;
00436
00437
00438 void release_instance(DDS::InstanceHandle_t handle);
00439
00440
00441 void reschedule_deadline();
00442
00443 ACE_Reactor_Timer_Interface* get_reactor();
00444
00445 RepoId get_topic_id();
00446 RepoId get_dp_id();
00447
00448 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00449 void get_instance_handles(InstanceHandleVec& instance_handles);
00450
00451 typedef std::pair<PublicationId, WriterInfo::WriterState> WriterStatePair;
00452 typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
00453 void get_writer_states(WriterStatePairVec& writer_states);
00454
00455 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00456 void update_ownership_strength (const PublicationId& pub_id,
00457 const CORBA::Long& ownership_strength);
00458 OwnershipManager* ownership_manager() const { return owner_manager_; }
00459 #endif
00460
00461 virtual void delete_instance_map (void* map) = 0;
00462 virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00463 OpenDDS::DCPS::SubscriptionInstance*& instance) = 0;
00464
00465 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00466
00467 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00468
00469 void enable_filtering(ContentFilteredTopicImpl* cft);
00470
00471 DDS::ContentFilteredTopic_ptr get_cf_topic() const;
00472
00473 #endif
00474
00475 void update_subscription_params(const DDS::StringSeq& params) const;
00476
00477 typedef OPENDDS_VECTOR(void*) GenericSeq;
00478
00479 struct GenericBundle {
00480 GenericSeq samples_;
00481 DDS::SampleInfoSeq info_;
00482 };
00483
00484 virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
00485 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00486 DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
00487
00488 virtual DDS::ReturnCode_t take(
00489 AbstractSamples& samples,
00490 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00491 DDS::InstanceStateMask instance_states)=0;
00492
00493 virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
00494
00495 virtual DDS::ReturnCode_t read_instance_generic(void*& data,
00496 DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00497 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00498 DDS::InstanceStateMask instance_states) = 0;
00499
00500 virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
00501 DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00502 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00503 DDS::InstanceStateMask instance_states) = 0;
00504
00505 virtual void set_instance_state(DDS::InstanceHandle_t instance,
00506 DDS::InstanceStateKind state) = 0;
00507
00508 #endif
00509
00510 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00511 void begin_access();
00512 void end_access();
00513 void get_ordered_data(GroupRakeData& data,
00514 DDS::SampleStateMask sample_states,
00515 DDS::ViewStateMask view_states,
00516 DDS::InstanceStateMask instance_states);
00517
00518 void accept_coherent (PublicationId& writer_id,
00519 RepoId& publisher_id);
00520 void reject_coherent (PublicationId& writer_id,
00521 RepoId& publisher_id);
00522 void coherent_change_received (RepoId publisher_id, Coherent_State& result);
00523
00524 void coherent_changes_completed (DataReaderImpl* reader);
00525
00526 void reset_coherent_info (const PublicationId& writer_id,
00527 const RepoId& publisher_id);
00528 #endif
00529
00530
00531 void set_subscriber_qos(const DDS::SubscriberQos & qos);
00532
00533
00534 void reset_ownership (::DDS::InstanceHandle_t instance);
00535
00536 virtual EntityImpl* parent() const;
00537
00538 void disable_transport();
00539
00540 virtual void register_for_writer(const RepoId& ,
00541 const RepoId& ,
00542 const RepoId& ,
00543 const TransportLocatorSeq& ,
00544 DiscoveryListener* );
00545
00546 virtual void unregister_for_writer(const RepoId& ,
00547 const RepoId& ,
00548 const RepoId& );
00549
00550 protected:
00551 virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00552 void remove_or_reschedule(const PublicationId& pub_id);
00553
00554 void prepare_to_delete();
00555
00556 SubscriberImpl* get_subscriber_servant();
00557
00558 void post_read_or_take();
00559
00560
00561 virtual DDS::ReturnCode_t enable_specific() = 0;
00562
00563 void sample_info(DDS::SampleInfo & sample_info,
00564 const ReceivedDataElement *ptr);
00565
00566 CORBA::Long total_samples() const;
00567
00568 void set_sample_lost_status(const DDS::SampleLostStatus& status);
00569 void set_sample_rejected_status(
00570 const DDS::SampleRejectedStatus& status);
00571
00572
00573 SubscriptionInstance* get_handle_instance(
00574 DDS::InstanceHandle_t handle);
00575
00576
00577
00578
00579 DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
00580
00581 virtual void purge_data(SubscriptionInstance* instance) = 0;
00582
00583 virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
00584
00585 bool has_readcondition(DDS::ReadCondition_ptr a_condition);
00586
00587
00588 mutable SubscriptionInstanceMapType instances_;
00589
00590
00591
00592
00593 mutable ACE_Recursive_Thread_Mutex instances_lock_;
00594
00595
00596
00597
00598
00599
00600
00601
00602 bool filter_sample(const DataSampleHeader& header);
00603 bool filter_instance(SubscriptionInstance* instance,
00604 const PublicationId& pubid);
00605
00606
00607 void notify_read_conditions();
00608
00609 ReceivedDataAllocator *rd_allocator_;
00610 DDS::DataReaderQos qos_;
00611
00612
00613 DDS::SampleRejectedStatus sample_rejected_status_;
00614 DDS::SampleLostStatus sample_lost_status_;
00615
00616
00617 ACE_Recursive_Thread_Mutex sample_lock_;
00618
00619 typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
00620 Reverse_Lock_t reverse_sample_lock_;
00621
00622 DomainParticipantImpl* participant_servant_;
00623 TopicImpl* topic_servant_;
00624
00625 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00626 bool is_exclusive_ownership_;
00627
00628 OwnershipManager* owner_manager_;
00629 #endif
00630
00631 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00632 DDS::ContentFilteredTopic_var content_filtered_topic_;
00633 #endif
00634
00635
00636
00637 bool coherent_;
00638
00639
00640 GroupRakeData group_coherent_ordered_data_;
00641
00642 DDS::SubscriberQos subqos_;
00643
00644 protected:
00645 virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00646
00647 private:
00648
00649 void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00650
00651
00652 bool lookup_instance_handles(const WriterIdSeq& ids,
00653 DDS::InstanceHandleSeq& hdls);
00654
00655 void instances_liveliness_update(WriterInfo& info,
00656 const ACE_Time_Value& when);
00657
00658 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00659 bool verify_coherent_changes_completion(WriterInfo* writer);
00660 bool coherent_change_received(WriterInfo* writer);
00661 #endif
00662
00663 const RepoId& get_repo_id() const { return this->subscription_id_; }
00664 DDS::DomainId_t domain_id() const { return this->domain_id_; }
00665
00666 Priority get_priority_value(const AssociationData& data) const {
00667 return data.publication_transport_priority_;
00668 }
00669
00670
00671 void resume_sample_processing(const PublicationId& pub_id);
00672
00673
00674
00675 bool check_historic(const ReceivedDataSample& sample);
00676
00677
00678 void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
00679
00680 void listener_add_ref() { EntityImpl::_add_ref(); }
00681 void listener_remove_ref() { EntityImpl::_remove_ref(); }
00682
00683 friend class InstanceState;
00684 friend class EndHistoricSamplesMissedSweeper;
00685 friend class RemoveAssociationSweeper<DataReaderImpl>;
00686
00687 friend class ::DDS_TEST;
00688
00689 DDS::TopicDescription_var topic_desc_;
00690 DDS::StatusMask listener_mask_;
00691 DDS::DataReaderListener_var listener_;
00692 DDS::DomainId_t domain_id_;
00693 SubscriberImpl* subscriber_servant_;
00694 DDS::DataReader_var dr_local_objref_;
00695 EndHistoricSamplesMissedSweeper* end_historic_sweeper_;
00696 RemoveAssociationSweeper<DataReaderImpl>* remove_association_sweeper_;
00697
00698 CORBA::Long depth_;
00699 size_t n_chunks_;
00700
00701
00702 ACE_Recursive_Thread_Mutex publication_handle_lock_;
00703 Reverse_Lock_t reverse_pub_handle_lock_;
00704
00705 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00706 RepoIdToHandleMap id_to_handle_map_;
00707
00708
00709 DDS::LivelinessChangedStatus liveliness_changed_status_;
00710 DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_;
00711 DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
00712 DDS::SubscriptionMatchedStatus subscription_match_status_;
00713
00714
00715 BudgetExceededStatus budget_exceeded_status_;
00716
00717
00718
00719
00720
00721
00722
00723
00724 SubscriptionLostStatus subscription_lost_status_;
00725
00726
00727
00728
00729
00730
00731 ACE_Reactor_Timer_Interface* reactor_;
00732
00733 class LivelinessTimer : public ReactorInterceptor {
00734 public:
00735 LivelinessTimer(ACE_Reactor* reactor,
00736 ACE_thread_t owner,
00737 DataReaderImpl* data_reader)
00738 : ReactorInterceptor(reactor, owner)
00739 , data_reader_(data_reader)
00740 , liveliness_timer_id_(-1)
00741 { }
00742
00743 void check_liveliness()
00744 {
00745 CheckLivelinessCommand c(this);
00746 execute_or_enqueue(c);
00747 }
00748
00749 void cancel_timer()
00750 {
00751 CancelCommand c(this);
00752 execute_or_enqueue(c);
00753 }
00754
00755 virtual bool reactor_is_shut_down() const
00756 {
00757 return TheServiceParticipant->is_shut_down();
00758 }
00759
00760 private:
00761 ~LivelinessTimer() { }
00762
00763 DataReaderImpl* data_reader_;
00764
00765
00766 long liveliness_timer_id_;
00767 void check_liveliness_i(bool cancel, const ACE_Time_Value& current_time);
00768
00769 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00770
00771 class CommandBase : public Command {
00772 public:
00773 CommandBase(LivelinessTimer* timer)
00774 : timer_(timer)
00775 { }
00776
00777 protected:
00778 LivelinessTimer* timer_;
00779 };
00780
00781 class CheckLivelinessCommand : public CommandBase {
00782 public:
00783 CheckLivelinessCommand(LivelinessTimer* timer)
00784 : CommandBase(timer)
00785 { }
00786 virtual void execute()
00787 {
00788 timer_->check_liveliness_i(true, ACE_OS::gettimeofday());
00789 }
00790 };
00791
00792 class CancelCommand : public CommandBase {
00793 public:
00794 CancelCommand(LivelinessTimer* timer)
00795 : CommandBase(timer)
00796 { }
00797 virtual void execute()
00798 {
00799 if (timer_->liveliness_timer_id_ != -1) {
00800 timer_->reactor()->cancel_timer(timer_);
00801 }
00802 }
00803 };
00804 };
00805 LivelinessTimer* liveliness_timer_;
00806
00807 CORBA::Long last_deadline_missed_total_count_;
00808
00809
00810 RequestedDeadlineWatchdog* watchdog_;
00811
00812
00813
00814 bool is_bit_;
00815
00816
00817 bool initialized_;
00818 bool always_get_history_;
00819
00820
00821 bool statistics_enabled_;
00822
00823
00824 typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00825 GUID_tKeyLessThan) WriterMapType;
00826
00827 WriterMapType writers_;
00828
00829
00830 ACE_RW_Thread_Mutex writers_lock_;
00831
00832
00833 StatsMapType statistics_;
00834
00835
00836 unsigned int raw_latency_buffer_size_;
00837
00838
00839 DataCollector<double>::OnFull raw_latency_buffer_type_;
00840
00841 typedef VarLess<DDS::ReadCondition> RCCompLess;
00842 typedef OPENDDS_SET_CMP(DDS::ReadCondition_var, RCCompLess) ReadConditionSet;
00843 ReadConditionSet read_conditions_;
00844
00845
00846 Monitor* monitor_;
00847
00848
00849 Monitor* periodic_monitor_;
00850
00851 bool transport_disabled_;
00852 };
00853
00854 }
00855 }
00856
00857 #if defined (__ACE_INLINE__)
00858 # include "DataReaderImpl.inl"
00859 #endif
00860
00861 #endif