OpenDDS  Snapshot(2023/04/28-20:55)
DataReaderImpl.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_DATAREADERIMPL_H
9 #define OPENDDS_DCPS_DATAREADERIMPL_H
10 
11 #include "dcps_export.h"
12 
13 #include "AssociationData.h"
15 #include "CoherentChangeControl.h"
17 #include "DataReaderCallbacks.h"
18 #include "Definitions.h"
19 #include "DisjointSequence.h"
20 #include "DomainParticipantImpl.h"
21 #include "EntityImpl.h"
22 #include "GroupRakeData.h"
23 #include "InstanceState.h"
24 #include "MultiTopicImpl.h"
25 #include "OwnershipManager.h"
26 #include "PoolAllocator.h"
27 #include "RcEventHandler.h"
28 #include "RcHandle_T.h"
29 #include "RcObject.h"
30 #include "ReactorInterceptor.h"
31 #include "Service_Participant.h"
32 #include "Stats_T.h"
33 #include "SubscriptionInstance.h"
34 #include "TimeTypes.h"
35 #include "TopicImpl.h"
36 #include "WriterInfo.h"
37 #include "ZeroCopyInfoSeq_T.h"
38 #include "AtomicBool.h"
42 
43 #include <dds/DdsDcpsTopicC.h>
44 #include <dds/DdsDcpsSubscriptionExtC.h>
45 #include <dds/DdsDcpsDomainC.h>
46 #include <dds/DdsDcpsTopicC.h>
47 #include <dds/DdsDcpsInfrastructureC.h>
48 
49 #include <ace/String_Base.h>
50 #include <ace/Reverse_Lock_T.h>
51 #include <ace/Reactor.h>
52 
53 #include <memory>
54 
55 #if !defined (ACE_LACKS_PRAGMA_ONCE)
56 #pragma once
57 #endif /* ACE_LACKS_PRAGMA_ONCE */
58 
59 class DDS_TEST;
60 
62 
63 namespace OpenDDS {
64 namespace DCPS {
65 
66 class SubscriberImpl;
67 class DomainParticipantImpl;
68 class SubscriptionInstance;
69 class TopicImpl;
70 class TopicDescriptionImpl;
71 class Monitor;
72 class DataReaderImpl;
74 
77 
81 };
82 
83 /// Elements stored for managing statistical data.
85 public:
86  /// Default constructor.
88  int amount = 0,
90 #ifdef ACE_HAS_CPP11
91  WriterStats(const WriterStats&) = default;
92 #endif
93 
94  /// Add a datum to the latency statistics.
95  void add_stat(const TimeDuration& delay);
96 
97  /// Extract the current latency statistics for this writer.
98  LatencyStatistics get_stats() const;
99 
100  /// Reset the latency statistics for this writer.
101  void reset_stats();
102 
103 #ifndef OPENDDS_SAFETY_PROFILE
104  /// Dump any raw data.
105  std::ostream& raw_data(std::ostream& str) const;
106 #endif
107 
108 private:
109  /// Latency statistics for the DataWriter to this DataReader.
111 };
112 
113 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
114 
116 {
117 public:
118  virtual ~AbstractSamples(){}
119  virtual void reserve(CORBA::ULong size)=0;
120  virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
121 };
122 
123 #endif
124 
125 // Class to cleanup in case EndHistoricSamples is missed
127 public:
129  ACE_thread_t owner,
130  DataReaderImpl* reader);
131 
132  void schedule_timer(WriterInfo_rch& info);
133  void cancel_timer(WriterInfo_rch& info);
134 
135  // Arg will be PublicationId
136  int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
137 
138  virtual bool reactor_is_shut_down() const
139  {
140  return TheServiceParticipant->is_shut_down();
141  }
142 
143 private:
145 
147  OPENDDS_SET(WriterInfo_rch) info_set_;
148 
149  class CommandBase : public Command {
150  public:
152  WriterInfo_rch& info)
153  : sweeper_(sweeper)
154  , info_(info)
155  { }
156 
157  protected:
160  };
161 
162  class ScheduleCommand : public CommandBase {
163  public:
165  WriterInfo_rch& info)
166  : CommandBase(sweeper, info)
167  { }
168  virtual void execute();
169  };
170 
171  class CancelCommand : public CommandBase {
172  public:
174  WriterInfo_rch& info)
175  : CommandBase(sweeper, info)
176  { }
177  virtual void execute();
178  };
179 };
180 
181 class MessageHolder : public virtual RcObject {
182 public:
183  virtual const void* get() const = 0;
184 };
185 
186 template <typename T>
188 public:
189  MessageHolder_T(const T& v) : v_(v) {}
190  const void* get() const { return &v_; }
191 private:
192  T v_;
193 };
194 
195 /**
196 * @class DataReaderImpl
197 *
198 * @brief Implements the DDS::DataReader interface.
199 *
200 * See the DDS specification, OMG formal/2015-04-10, for a description of
201 * the interface this class is implementing.
202 *
203 * This class must be inherited by the type-specific datareader which
204 * is specific to the data-type associated with the topic.
205 *
206 */
208  : public virtual LocalObject<DataReaderEx>,
209  public virtual DataReaderCallbacks,
210  public virtual EntityImpl,
211  public virtual TransportClient,
212  public virtual TransportReceiveListener,
213  private WriterInfoListener {
214 public:
215  friend class RequestedDeadlineWatchdog;
216  friend class QueryConditionImpl;
217  friend class SubscriberImpl;
218 
219  typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType;
220  typedef OPENDDS_SET(DDS::InstanceHandle_t) InstanceSet;
221  typedef OPENDDS_SET(SubscriptionInstance_rch) SubscriptionInstanceSet;
222  /// Type of collection of statistics for writers to this reader.
223  typedef OPENDDS_MAP_CMP(GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType;
224 
225  DataReaderImpl();
226 
227  virtual ~DataReaderImpl();
228 
229  virtual DDS::InstanceHandle_t get_instance_handle();
230 
231  virtual void add_association(const GUID_t& yourId,
232  const WriterAssociation& writer,
233  bool active);
234 
235  virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
236 
237  virtual void remove_associations(const WriterIdSeq& writers, bool callback);
238 
239  virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
240 
241  virtual void signal_liveliness(const GUID_t& remote_participant);
242 
243  /**
244  * This is used to retrieve the listener for a certain status change.
245  * If this datareader has a registered listener and the status kind
246  * is in the listener mask then the listener is returned.
247  * Otherwise, the query for the listener is propagated up to the
248  * factory/subscriber.
249  */
250  DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
251 
252  /// tell instances when a DataWriter transitions to being alive
253  /// The writer state is inout parameter, it has to be set ALIVE before
254  /// handle_timeout is called since some subroutine use the state.
255  void writer_became_alive(WriterInfo& info,
256  const MonotonicTimePoint& when);
257 
258  /// tell instances when a DataWriter transitions to DEAD
259  /// The writer state is inout parameter, the state is set to DEAD
260  /// when it returns.
261  void writer_became_dead(WriterInfo& info);
262 
263  /// tell instance when a DataWriter is removed.
264  /// The liveliness status need update.
265  void writer_removed(WriterInfo& info);
266 
267  virtual void cleanup();
268 
269  void init(
270  TopicDescriptionImpl* a_topic_desc,
271  const DDS::DataReaderQos& qos,
272  DDS::DataReaderListener_ptr a_listener,
273  const DDS::StatusMask& mask,
274  DomainParticipantImpl* participant,
275  SubscriberImpl* subscriber);
276 
277  virtual DDS::ReadCondition_ptr create_readcondition(
281 
282 #ifndef OPENDDS_NO_QUERY_CONDITION
283  virtual DDS::QueryCondition_ptr create_querycondition(
287  const char * query_expression,
288  const DDS::StringSeq & query_parameters);
289 #endif
290 
291  virtual DDS::ReturnCode_t delete_readcondition(
292  DDS::ReadCondition_ptr a_condition);
293 
294  virtual DDS::ReturnCode_t delete_contained_entities();
295 
296  virtual DDS::ReturnCode_t set_qos(
297  const DDS::DataReaderQos & qos);
298 
299  virtual DDS::ReturnCode_t get_qos(
300  DDS::DataReaderQos & qos);
301 
302  virtual DDS::ReturnCode_t set_listener(
303  DDS::DataReaderListener_ptr a_listener,
304  DDS::StatusMask mask);
305 
306  virtual DDS::DataReaderListener_ptr get_listener();
307 
308  virtual DDS::TopicDescription_ptr get_topicdescription();
309 
310  virtual DDS::Subscriber_ptr get_subscriber();
311 
312  virtual DDS::ReturnCode_t get_sample_rejected_status(
313  DDS::SampleRejectedStatus & status);
314 
315  virtual DDS::ReturnCode_t get_liveliness_changed_status(
317 
318  virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
320 
321  virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
323 
324  virtual DDS::ReturnCode_t get_subscription_matched_status(
326 
327  virtual DDS::ReturnCode_t get_sample_lost_status(
328  DDS::SampleLostStatus & status);
329 
330  virtual DDS::ReturnCode_t wait_for_historical_data(
331  const DDS::Duration_t & max_wait);
332 
333  virtual DDS::ReturnCode_t get_matched_publications(
334  DDS::InstanceHandleSeq & publication_handles);
335 
336 #if !defined (DDS_HAS_MINIMUM_BIT)
337  virtual DDS::ReturnCode_t get_matched_publication_data(
338  DDS::PublicationBuiltinTopicData & publication_data,
339  DDS::InstanceHandle_t publication_handle);
340 #endif // !defined (DDS_HAS_MINIMUM_BIT)
341 
342  virtual DDS::ReturnCode_t enable();
343 
344 #ifndef OPENDDS_SAFETY_PROFILE
345  virtual void get_latency_stats(
346  LatencyStatisticsSeq & stats);
347 #endif
348 
349  virtual void reset_latency_stats();
350 
351  virtual CORBA::Boolean statistics_enabled();
352 
353  virtual void statistics_enabled(
354  CORBA::Boolean statistics_enabled);
355 
356  /// @name Raw Latency Statistics Interfaces
357  /// @{
358 
359  /// Expose the statistics container.
360  const StatsMapType& raw_latency_statistics() const;
361 
362  /// Configure the size of the raw data collection buffer.
363  unsigned int& raw_latency_buffer_size();
364 
365  /// Configure the type of the raw data collection buffer.
366  DataCollector<double>::OnFull& raw_latency_buffer_type();
367 
368  /// @}
369 
370  /// update liveliness info for this writer.
371  void writer_activity(const DataSampleHeader& header);
372 
373  /// process a message that has been received - could be control or a data sample.
374  virtual void data_received(const ReceivedDataSample& sample);
375 
376  void transport_discovery_change();
377 
378  virtual bool check_transport_qos(const TransportInst& inst);
379 
380  bool have_sample_states(DDS::SampleStateMask sample_states) const;
381  bool have_view_states(DDS::ViewStateMask view_states) const;
382  bool have_instance_states(DDS::InstanceStateMask instance_states) const;
383  bool contains_sample(DDS::SampleStateMask sample_states,
386 
387 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
388  virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
391  const FilterEvaluator& evaluator,
392  const DDS::StringSeq& params) = 0;
393 #endif
394 
395  virtual RcHandle<MessageHolder> dds_demarshal(const ReceivedDataSample& sample,
396  DDS::InstanceHandle_t publication_handle,
397  SubscriptionInstance_rch& instance,
398  bool& is_new_instance,
399  bool& filtered,
400  MarshalingType marshaling_type,
401  bool full_copy) = 0;
402 
403  virtual void dispose_unregister(const ReceivedDataSample& sample,
404  DDS::InstanceHandle_t publication_handle,
405  SubscriptionInstance_rch& instance);
406 
407  void process_latency(const ReceivedDataSample& sample);
408  void notify_latency(GUID_t writer);
409 
410  size_t get_depth() const
411  {
412  return static_cast<size_t>(depth_);
413  }
414  size_t get_n_chunks() const
415  {
416  return n_chunks_;
417  }
418 
419  void liveliness_lost();
420 
421  void remove_all_associations();
422 
423  void notify_subscription_disconnected(const WriterIdSeq& pubids);
424  void notify_subscription_reconnected(const WriterIdSeq& pubids);
425  void notify_subscription_lost(const WriterIdSeq& pubids);
426  void notify_liveliness_change();
427 
428  bool is_bit() const;
429 
430  /**
431  * This method is used for a precondition check of delete_datareader.
432  *
433  * @retval true We have zero-copy samples loaned out
434  * @retval false We have no zero-copy samples loaned out
435  */
436  bool has_zero_copies();
437 
438  /// Release the instance with the handle.
439  void release_instance(DDS::InstanceHandle_t handle);
440 
441  // Take appropriate actions upon learning instance or view state has been updated
442  void state_updated(DDS::InstanceHandle_t handle);
443 
444  /// Release all instances held by the reader.
445  virtual void release_all_instances() = 0;
446 
447  ACE_Reactor_Timer_Interface* get_reactor();
448 
449  GUID_t get_topic_id();
450  GUID_t get_dp_id();
451 
452  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
453  void get_instance_handles(InstanceHandleVec& instance_handles);
454 
455  typedef std::pair<GUID_t, WriterInfo::WriterState> WriterStatePair;
456  typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
457  void get_writer_states(WriterStatePairVec& writer_states);
458 
459 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
460  void update_ownership_strength (const GUID_t& pub_id,
461  const CORBA::Long& ownership_strength);
462 
463  // Access to OwnershipManager is only valid when the domain participant is valid;
464  // therefore, we must lock the domain pariticipant when using OwnershipManager.
466  {
467  public:
469  : participant_( (reader && reader->is_exclusive_ownership_) ? reader->participant_servant_.lock() : RcHandle<DomainParticipantImpl>())
470  {
471  }
472  operator bool() const { return participant_.in(); }
474  {
475  return participant_ ? participant_->ownership_manager() : 0;
476  }
477 
478  private:
480  };
481  friend class OwnershipManagerPtr;
482 
484  OwnershipManagerScopedAccess() : om_(0), lock_result_(0) {}
485  explicit OwnershipManagerScopedAccess(DataReaderImpl::OwnershipManagerPtr om) : om_(om), lock_result_(om_ ? om_->instance_lock_acquire() : 0) {}
487 
489  {
490  if (&rhs != this) {
491  std::swap(om_, rhs.om_);
492  std::swap(lock_result_, rhs.lock_result_);
493  }
494  }
495 
496  int release()
497  {
498  int result = 0;
499  if (om_ && !lock_result_) {
500  result = om_->instance_lock_release();
501  }
502  om_ = 0;
503  lock_result_ = 0;
504  return result;
505  }
506 
509  };
510 
512 #endif
513 
514  virtual void lookup_instance(const ReceivedDataSample& sample,
515  SubscriptionInstance_rch& instance) = 0;
516 
517 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
518 
519 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
520 
521  void enable_filtering(ContentFilteredTopicImpl* cft);
522 
523  DDS::ContentFilteredTopic_ptr get_cf_topic() const;
524 
525 #endif
526 
527 #ifndef OPENDDS_NO_MULTI_TOPIC
528 
529  void enable_multi_topic(MultiTopicImpl* mt);
530 
531 #endif
532 
533  void update_subscription_params(const DDS::StringSeq& params) const;
534 
535  typedef OPENDDS_VECTOR(void*) GenericSeq;
536 
537  struct GenericBundle {
538  GenericSeq samples_;
540  };
541 
542  virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
544  DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
545 
546  virtual DDS::ReturnCode_t take(
547  AbstractSamples& samples,
550 
551  virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
552 
553  virtual DDS::ReturnCode_t read_instance_generic(void*& data,
554  DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
557 
558  virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
559  DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
562 
563 #endif
564 
567  const SystemTimePoint& timestamp = SystemTimePoint::now(),
568  const GUID_t& guid = GUID_UNKNOWN)
569  {
570  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
571  {
572  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
573  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(guid);
574  if (pos != publication_id_to_handle_map_.end()) {
575  publication_handle = pos->second;
576  }
577  }
578 
579  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
580  set_instance_state_i(instance, publication_handle, state, timestamp, guid);
581  }
582 
583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
584  void begin_access();
585  void end_access();
586  void get_ordered_data(GroupRakeData& data,
590 
591  void accept_coherent (const GUID_t& writer_id,
592  const GUID_t& publisher_id);
593  void reject_coherent (const GUID_t& writer_id,
594  const GUID_t& publisher_id);
595  void coherent_change_received (const GUID_t& publisher_id, Coherent_State& result);
596 
597  void coherent_changes_completed (DataReaderImpl* reader);
598 
599  void reset_coherent_info (const GUID_t& writer_id,
600  const GUID_t& publisher_id);
601 #endif
602 
603  // Called upon subscriber qos change to update the local cache.
604  void set_subscriber_qos(const DDS::SubscriberQos & qos);
605 
606  // Set the instance related writers to reevaluate the owner.
607  void reset_ownership (DDS::InstanceHandle_t instance);
608 
609  virtual RcHandle<EntityImpl> parent() const;
610 
611  void disable_transport();
612 
613  virtual void register_for_writer(const GUID_t& /*participant*/,
614  const GUID_t& /*readerid*/,
615  const GUID_t& /*writerid*/,
616  const TransportLocatorSeq& /*locators*/,
617  DiscoveryListener* /*listener*/);
618 
619  virtual void unregister_for_writer(const GUID_t& /*participant*/,
620  const GUID_t& /*readerid*/,
621  const GUID_t& /*writerid*/);
622 
623  virtual void update_locators(const GUID_t& remote,
624  const TransportLocatorSeq& locators);
625 
626  virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
627 
628  GUID_t get_guid() const
629  {
630  ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
631  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
632  while (!has_subscription_id_ && !get_deleted()) {
633  subscription_id_condition_.wait(thread_status_manager);
634  }
635  return subscription_id_;
636  }
637 
638  void return_handle(DDS::InstanceHandle_t handle);
639 
641  {
642  TopicDescriptionPtr<TopicImpl> temp(topic_servant_);
643  return temp ? dynamic_cast<const ValueDispatcher*>(temp->get_type_support()) : 0;
644  }
645 
646 protected:
647 
648  // Update max flag if the spec ever changes
649  static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE;
650  static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1;
651  static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u;
652 
653  // Update max flag if the spec ever changes
654  static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE;
655  static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1;
656  static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u;
657 
658  // Update max flag if the spec ever changes
659  static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
660  static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1;
661  static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u;
662 
663  // These may need to be updated if the spec ever changes
664  static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS;
665  static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS;
666 
667  typedef OPENDDS_SET(DDS::InstanceHandle_t) HandleSet;
668  typedef OPENDDS_MAP(CORBA::ULong, HandleSet) LookupMap;
669 
670  static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
671  {
672  sample_states &= MAX_SAMPLE_STATE_MASK;
673  view_states &= MAX_VIEW_STATE_MASK;
674  instance_states &= MAX_INSTANCE_STATE_MASK;
675  if (!(sample_states && view_states && instance_states)) {
676  // catch-all for "bogus" lookups
677  return 0;
678  }
679  return (sample_states << COMBINED_SAMPLE_STATE_SHIFT) | (view_states << COMBINED_VIEW_STATE_SHIFT) | instance_states;
680  }
681 
682  static void split_combined_states(CORBA::ULong combined, CORBA::ULong& sample_states, CORBA::ULong& view_states, CORBA::ULong& instance_states)
683  {
684  sample_states = (combined >> COMBINED_SAMPLE_STATE_SHIFT) & MAX_SAMPLE_STATE_MASK;
685  view_states = (combined >> COMBINED_VIEW_STATE_SHIFT) & MAX_VIEW_STATE_MASK;
686  instance_states = combined & MAX_INSTANCE_STATE_MASK;
687  }
688 
689  void initialize_lookup_maps();
690  void update_lookup_maps(const SubscriptionInstanceMapType::iterator& input);
691  void remove_from_lookup_maps(DDS::InstanceHandle_t handle);
692  const HandleSet& lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const;
693 
695 
696  // Perform cast to get extended version of listener (otherwise nil)
697  DataReaderListener_ptr get_ext_listener();
698 
699  virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
700 
701  void prepare_to_delete();
702 
703  /// Setup deserialization options
704  DDS::ReturnCode_t setup_deserialization();
705 
706  RcHandle<SubscriberImpl> get_subscriber_servant();
707 
708  void post_read_or_take();
709 
710  // type specific DataReader's part of enable.
711  virtual DDS::ReturnCode_t enable_specific() = 0;
712 
713  void sample_info(DDS::SampleInfo & sample_info,
714  const ReceivedDataElement *ptr);
715 
716  CORBA::Long total_samples() const;
717 
718  void set_sample_lost_status(const DDS::SampleLostStatus& status);
719  void set_sample_rejected_status(
720  const DDS::SampleRejectedStatus& status);
721 
722  SubscriptionInstance_rch get_handle_instance(
723  DDS::InstanceHandle_t handle);
724 
725  /**
726  * Get an instance handle for a new instance.
727  */
728  DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
729 
730  virtual void purge_data(SubscriptionInstance_rch instance) = 0;
731 
732  virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
733  virtual void state_updated_i(DDS::InstanceHandle_t handle) = 0;
734 
735  bool has_readcondition(DDS::ReadCondition_ptr a_condition);
736 
737  /// @TODO: document why the instances_ container is mutable.
738  mutable SubscriptionInstanceMapType instances_;
739 
740  /// Assume since the container is mutable(?!!?) it may need to use the
741  /// lock while const.
742  /// @TODO: remove the recursive nature of the instances_lock if not needed.
744 
745  /// Check if the received data sample or instance should
746  /// be filtered.
747  /**
748  * @note Filtering will only occur if the application
749  * configured a finite duration in the Topic's LIFESPAN
750  * QoS policy or DataReader's TIME_BASED_FILTER QoS policy.
751  */
752  bool filter_sample(const DataSampleHeader& header);
753 
754  bool ownership_filter_instance(const SubscriptionInstance_rch& instance,
755  const GUID_t& pubid);
756  bool time_based_filter_instance(const SubscriptionInstance_rch& instance,
757  MonotonicTimePoint& now,
758  MonotonicTimePoint& deadline);
759 
760  void accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance);
761 
762  virtual void qos_change(const DDS::DataReaderQos& qos);
763 
764  /// Data has arrived into the cache, unblock waiting ReadConditions
765  void notify_read_conditions();
766 
770 
774 
775  // Status conditions accessible by subclasses.
778 
779  /// lock protecting sample container as well as statuses.
781 
783  Reverse_Lock_t reverse_sample_lock_;
784 
789 
790 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
792 
793 #endif
794 
795 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
798 #endif
799 
800 #ifndef OPENDDS_NO_MULTI_TOPIC
802 #endif
803 
804  /// Is accessing to Group coherent changes ?
805  bool coherent_;
806 
807  /// Ordered group samples.
809 
811 
812  virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
813 
814 private:
816 
817  virtual void set_instance_state_i(DDS::InstanceHandle_t instance,
818  DDS::InstanceHandle_t publication_handle,
820  const SystemTimePoint& timestamp,
821  const GUID_t& guid) = 0;
822 
823  void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
824 
825  /// Lookup the instance handles by the publication repo ids
826  void lookup_instance_handles(const WriterIdSeq& ids,
827  DDS::InstanceHandleSeq& hdls);
828 
829  void instances_liveliness_update(const GUID_t& writer,
830  DDS::InstanceHandle_t publication_handle);
831 
832 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
833  bool verify_coherent_changes_completion(WriterInfo* writer);
834  bool coherent_change_received(WriterInfo* writer);
835 #endif
836 
838  {
839  RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
840  if (participant_servant) {
841  return participant_servant->get_builtin_subscriber_proxy();
842  }
843 
844  return RcHandle<BitSubscriber>();
845  }
846 
847  DDS::DomainId_t domain_id() const { return this->domain_id_; }
848 
851  }
852 
853 #if defined(OPENDDS_SECURITY)
854  DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
855 #endif
856 
857  /// when done handling historic samples, resume
858  void resume_sample_processing(const GUID_t& pub_id);
859 
860  /// collect samples received before END_HISTORIC_SAMPLES
861  /// returns false if normal processing of this sample should be skipped
862  bool check_historic(const ReceivedDataSample& sample);
863 
864  /// deliver samples that were held by check_historic()
865  void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
866 
867  friend class InstanceState;
869 
870  friend class ::DDS_TEST; //allows tests to get at private data
871 
872  DDS::TopicDescription_var topic_desc_;
875  DDS::DataReaderListener_var listener_;
878  // subscriber_servant_ has to be a weak pinter because it may be used from the
879  // transport reactor thread and that thread doesn't have the owenership of the
880  // the subscriber_servant_ object.
883 
885  size_t n_chunks_;
886 
887  //Used to protect access to id_to_handle_map_
889 
890  typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
891  RepoIdToHandleMap publication_id_to_handle_map_;
892 
893  // Status conditions.
898 
899  // OpenDDS extended status. This is only available via listener.
901 
902  /**
903  * @todo The subscription_lost_status_ and
904  * subscription_reconnecting_status_ are left here for
905  * future use when we add get_subscription_lost_status()
906  * and get_subscription_reconnecting_status() methods.
907  */
908  // Statistics of the lost subscriptions due to lost connection.
910  // Statistics of the subscriptions that are associated with a
911  // reconnecting datalink.
912  // SubscriptionReconnectingStatus subscription_reconnecting_status_;
913 
914  /// The orb's reactor to be used to register the liveliness
915  /// timer.
917 
919  public:
921  ACE_thread_t owner,
922  DataReaderImpl* data_reader)
923  : ReactorInterceptor(reactor, owner)
924  , data_reader_(*data_reader)
925  , liveliness_timer_id_(-1)
926  { }
927 
928  void check_liveliness();
929 
931  {
932  execute_or_enqueue(make_rch<CancelCommand>(this));
933  }
934 
935  virtual bool reactor_is_shut_down() const
936  {
937  return TheServiceParticipant->is_shut_down();
938  }
939 
940  private:
942 
944 
945  /// liveliness timer id; -1 if no timer is set
947  void check_liveliness_i(bool cancel, const MonotonicTimePoint& now);
948 
949  int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
950 
951  class CommandBase : public Command {
952  public:
954  : timer_(timer)
955  { }
956 
957  protected:
959  };
960 
962  public:
964  : CommandBase(timer)
965  { }
966  virtual void execute()
967  {
968  timer_->check_liveliness_i(true, MonotonicTimePoint::now());
969  }
970  };
971 
972  class CancelCommand : public CommandBase {
973  public:
975  : CommandBase(timer)
976  { }
977  virtual void execute()
978  {
979  if (timer_->liveliness_timer_id_ != -1) {
980  timer_->reactor()->cancel_timer(timer_);
981  }
982  }
983  };
984  };
986 
988  /// Watchdog responsible for reporting missed offered
989  /// deadlines.
992  DeadlineQueue deadline_queue_;
996 
997  void schedule_deadline(SubscriptionInstance_rch instance,
998  bool timer_called);
999  void reset_deadline_period(const TimeDuration& deadline_period);
1000  void reschedule_deadline(SubscriptionInstance_rch instance,
1001  const MonotonicTimePoint& now);
1002  void cancel_deadline(SubscriptionInstance_rch instance);
1003  void cancel_all_deadlines();
1004  void deadline_task(const MonotonicTimePoint& now);
1005  void process_deadline(SubscriptionInstance_rch instance,
1006  const MonotonicTimePoint& now,
1007  bool timer_called);
1008 
1009  /// Flag indicates that this datareader is a builtin topic
1010  /// datareader.
1011  bool is_bit_;
1012 
1014 
1015  /// Flag indicating status of statistics gathering.
1017 
1018  /// publications writing to this reader.
1020  GUID_tKeyLessThan) WriterMapType;
1021 
1022  WriterMapType writers_;
1023 
1024  /// RW lock for reading/writing publications.
1026 
1027  /// Statistics for this reader, collected for each writer.
1028  StatsMapType statistics_;
1030 
1031  /// Bound (or initial reservation) of raw latency buffer.
1033 
1034  /// Type of raw latency data buffer.
1036 
1038  typedef OPENDDS_SET_CMP(DDS::ReadCondition_var, RCCompLess) ReadConditionSet;
1039  ReadConditionSet read_conditions_;
1040 
1041  /// Monitor object for this entity
1043 
1044  /// Periodic Monitor object for this entity
1046 
1048 
1049 protected:
1050  typedef OPENDDS_SET(Encoding::Kind) EncodingKinds;
1051  EncodingKinds decoding_modes_;
1052 
1053 public:
1055  public:
1057  DDS::SubscriberListener_var sub_listener,
1058  WeakRcHandle<DataReaderImpl> data_reader,
1059  bool call,
1060  bool set_reader_status)
1061  : subscriber_(subscriber)
1062  , sub_listener_(sub_listener)
1063  , data_reader_(data_reader)
1064  , call_(call)
1065  , set_reader_status_(set_reader_status)
1066  {}
1067 
1068  private:
1069  virtual void execute();
1070 
1072  DDS::SubscriberListener_var sub_listener_;
1074  const bool call_;
1076  };
1077 
1079  public:
1080  OnDataAvailable(DDS::DataReaderListener_var listener,
1081  WeakRcHandle<DataReaderImpl> data_reader,
1082  bool call,
1083  bool set_reader_status,
1084  bool set_subscriber_status)
1085  : listener_(listener)
1086  , data_reader_(data_reader)
1087  , call_(call)
1088  , set_reader_status_(set_reader_status)
1089  , set_subscriber_status_(set_subscriber_status)
1090  {}
1091 
1092  private:
1093  virtual void execute();
1094 
1095  DDS::DataReaderListener_var listener_;
1097  const bool call_;
1100  };
1101 
1102 protected:
1103 #ifdef OPENDDS_SECURITY
1105  DDS::DynamicType_var dynamic_type_;
1106 #endif
1107 
1109 };
1110 
1112 
1113 } // namespace DCPS
1114 } // namespace OpenDDS
1115 
1117 
1118 #if defined (__ACE_INLINE__)
1119 # include "DataReaderImpl.inl"
1120 #endif /* __ACE_INLINE__ */
1121 
1122 #endif /* OPENDDS_DCPS_DATAREADER_H */
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
void swap(MessageBlock &lhs, MessageBlock &rhs)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
manage the states of a received data instance.
Definition: InstanceState.h:49
DDS::DataReaderListener_var listener_
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t
const ValueDispatcher * get_value_dispatcher() const
const InstanceHandle_t HANDLE_NIL
virtual void install_type_support(TypeSupportImpl *)
void release(T x)
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
BudgetExceededStatus budget_exceeded_status_
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
WeakRcHandle< DataReaderImpl > reader_
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
DDS::TopicDescription_var topic_desc_
AtomicBool statistics_enabled_
Flag indicating status of statistics gathering.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::SubscriptionMatchedStatus subscription_match_status_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
SubscriptionLostStatus subscription_lost_status_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
sequence< octet > key
TransportMessageBlockAllocator mb_alloc_
sequence< SampleInfo > SampleInfoSeq
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
RcHandle< DomainParticipantImpl > participant_
Defines the interface for Discovery callbacks into the DataReader.
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
RcHandle< LivelinessTimer > liveliness_timer_
#define OPENDDS_MULTIMAP(K, T)
Cached_Allocator_With_Overflow< ReceivedDataElementMemoryBlock, ACE_Thread_Mutex > ReceivedDataAllocator
unsigned long InstanceStateMask
sequence< TransportLocator > TransportLocatorSeq
std::pair< GUID_t, WriterInfo::WriterState > WriterStatePair
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
RcHandle< BitSubscriber > get_builtin_subscriber_proxy() const
unique_ptr< ReceivedDataAllocator > rd_allocator_
Collection of latency statistics for a single association.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
DOMAINID_TYPE_NATIVE DomainId_t
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
WeakRcHandle< DataReaderImpl > data_reader_
bool coherent_
Is accessing to Group coherent changes ?
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
const ViewStateKind NOT_NEW_VIEW_STATE
DDS::DomainId_t domain_id() const
DDS::DynamicType_var dynamic_type_
ACE_Thread_Mutex subscription_id_mutex_
ACE_CDR::ULong ULong
ACE_Thread_Mutex listener_mutex_
sequence< LatencyStatistics > LatencyStatisticsSeq
OnDataAvailable(DDS::DataReaderListener_var listener, WeakRcHandle< DataReaderImpl > data_reader, bool call, bool set_reader_status, bool set_subscriber_status)
ACE_CDR::Boolean Boolean
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
DWORD ACE_thread_t
unsigned long InstanceStateKind
A fixed-size allocator that caches items for quicker access but if the pool is exhausted it will use ...
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
WeakRcHandle< SubscriberImpl > subscriber_servant_
Implements the DDS::DataReader interface.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
WeakRcHandle< DataReaderImpl > data_reader_
CommandBase(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) GuidSet
OwnershipManagerScopedAccess(DataReaderImpl::OwnershipManagerPtr om)
OnDataOnReaders(WeakRcHandle< SubscriberImpl > subscriber, DDS::SubscriberListener_var sub_listener, WeakRcHandle< DataReaderImpl > data_reader, bool call, bool set_reader_status)
int init(void)
RcHandle< DRISporadicTask > deadline_task_
sequence< GUID_t > WriterIdSeq
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
RepoIdToHandleMap publication_id_to_handle_map_
static void split_combined_states(CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
::DDS::ReturnCode_t take(inout<%TYPE%><%SEQ%> received_data, inout ::DDS::SampleInfoSeq info_seq, in long max_samples, in ::DDS::SampleStateMask sample_states, in ::DDS::ViewStateMask view_states, in ::DDS::InstanceStateMask instance_states)
WeakRcHandle< SubscriberImpl > subscriber_
unsigned long SampleStateMask
DDS::SampleLostStatus sample_lost_status_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
Mix-in class for DDS entities which directly use the transport layer.
unsigned long StatusMask
WeakRcHandle< DataReaderImpl > data_reader_
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
WeakRcHandle< DomainParticipantImpl > participant_servant_
ACE_Recursive_Thread_Mutex statistics_lock_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
Priority get_priority_value(const AssociationData &data) const
ACE_CDR::Long Priority
TypeSupportImpl * type_support_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
CancelCommand(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
Implements the DDS::TopicDescription interface.
Sequence number abstraction. Only allows positive 64 bit values.
ACE_Reactor_Timer_Interface * reactor_
DDS::SampleRejectedStatus sample_rejected_status_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< DataReaderImpl > DataReaderImpl_rch
const SampleStateKind NOT_READ_SAMPLE_STATE
Stats< double > stats_
Latency statistics for the DataWriter to this DataReader.
ACE_Recursive_Thread_Mutex publication_handle_lock_
Security::SecurityConfig_rch security_config_
RcHandle< T > lock() const
Definition: RcObject.h:188
unsigned long StatusKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
OwnershipManagerPtr ownership_manager()
DDS::DataReaderQos passed_qos_
DDS::LivelinessChangedStatus liveliness_changed_status_
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex instances_lock_
Keeps track of a DataWriter&#39;s liveliness for a DataReader.
Definition: WriterInfo.h:81
Elements stored for managing statistical data.
TopicDescriptionPtr< MultiTopicImpl > multi_topic_
VarLess< DDS::ReadCondition > RCCompLess
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
unsigned long ViewStateMask
ACE_Thread_Mutex content_filtered_topic_mutex_
TopicDescriptionPtr< TopicImpl > topic_servant_
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
long liveliness_timer_id_
liveliness timer id; -1 if no timer is set
CORBA::Long last_deadline_missed_total_count_
PmfSporadicTask< DataReaderImpl > DRISporadicTask
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
ScheduleCommand(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
StatsMapType statistics_
Statistics for this reader, collected for each writer.
LivelinessTimer(ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader)
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
typedef OPENDDS_SET(NetworkAddress) AddrSet