Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_DATAREADERIMPL_H
9 : #define OPENDDS_DCPS_DATAREADERIMPL_H
10 :
11 : #include "dcps_export.h"
12 :
13 : #include "AssociationData.h"
14 : #include "Cached_Allocator_With_Overflow_T.h"
15 : #include "CoherentChangeControl.h"
16 : #include "ContentFilteredTopicImpl.h"
17 : #include "DataReaderCallbacks.h"
18 : #include "Definitions.h"
19 : #include "DisjointSequence.h"
20 : #include "DomainParticipantImpl.h"
21 : #include "EntityImpl.h"
22 : #include "GroupRakeData.h"
23 : #include "InstanceState.h"
24 : #include "MultiTopicImpl.h"
25 : #include "OwnershipManager.h"
26 : #include "PoolAllocator.h"
27 : #include "RcEventHandler.h"
28 : #include "RcHandle_T.h"
29 : #include "RcObject.h"
30 : #include "ReactorInterceptor.h"
31 : #include "Service_Participant.h"
32 : #include "Stats_T.h"
33 : #include "SubscriptionInstance.h"
34 : #include "TimeTypes.h"
35 : #include "TopicImpl.h"
36 : #include "WriterInfo.h"
37 : #include "ZeroCopyInfoSeq_T.h"
38 : #include "AtomicBool.h"
39 : #include "transport/framework/ReceivedDataSample.h"
40 : #include "transport/framework/TransportClient.h"
41 : #include "transport/framework/TransportReceiveListener.h"
42 :
43 : #include <dds/DdsDcpsTopicC.h>
44 : #include <dds/DdsDcpsSubscriptionExtC.h>
45 : #include <dds/DdsDcpsDomainC.h>
46 : #include <dds/DdsDcpsTopicC.h>
47 : #include <dds/DdsDcpsInfrastructureC.h>
48 :
49 : #include <ace/String_Base.h>
50 : #include <ace/Reverse_Lock_T.h>
51 : #include <ace/Reactor.h>
52 :
53 : #include <memory>
54 :
55 : #if !defined (ACE_LACKS_PRAGMA_ONCE)
56 : #pragma once
57 : #endif /* ACE_LACKS_PRAGMA_ONCE */
58 :
59 : class DDS_TEST;
60 :
61 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
62 :
63 : namespace OpenDDS {
64 : namespace DCPS {
65 :
66 : class SubscriberImpl;
67 : class DomainParticipantImpl;
68 : class SubscriptionInstance;
69 : class TopicImpl;
70 : class TopicDescriptionImpl;
71 : class Monitor;
72 : class DataReaderImpl;
73 : class FilterEvaluator;
74 :
75 : typedef Cached_Allocator_With_Overflow<ReceivedDataElementMemoryBlock, ACE_Thread_Mutex>
76 : ReceivedDataAllocator;
77 :
78 : enum MarshalingType {
79 : FULL_MARSHALING,
80 : KEY_ONLY_MARSHALING
81 : };
82 :
83 : /// Elements stored for managing statistical data.
84 : class OpenDDS_Dcps_Export WriterStats {
85 : public:
86 : /// Default constructor.
87 : WriterStats(
88 : int amount = 0,
89 : DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
90 : #ifdef ACE_HAS_CPP11
91 0 : WriterStats(const WriterStats&) = default;
92 : #endif
93 :
94 : /// Add a datum to the latency statistics.
95 : void add_stat(const TimeDuration& delay);
96 :
97 : /// Extract the current latency statistics for this writer.
98 : LatencyStatistics get_stats() const;
99 :
100 : /// Reset the latency statistics for this writer.
101 : void reset_stats();
102 :
103 : #ifndef OPENDDS_SAFETY_PROFILE
104 : /// Dump any raw data.
105 : std::ostream& raw_data(std::ostream& str) const;
106 : #endif
107 :
108 : private:
109 : /// Latency statistics for the DataWriter to this DataReader.
110 : Stats<double> stats_;
111 : };
112 :
113 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
114 :
115 : class OpenDDS_Dcps_Export AbstractSamples
116 : {
117 : public:
118 : virtual ~AbstractSamples(){}
119 : virtual void reserve(CORBA::ULong size)=0;
120 : virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
121 : };
122 :
123 : #endif
124 :
125 : // Class to cleanup in case EndHistoricSamples is missed
126 : class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
127 : public:
128 : EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
129 : ACE_thread_t owner,
130 : DataReaderImpl* reader);
131 :
132 : void schedule_timer(WriterInfo_rch& info);
133 : void cancel_timer(WriterInfo_rch& info);
134 :
135 : // Arg will be PublicationId
136 : int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
137 :
138 0 : virtual bool reactor_is_shut_down() const
139 : {
140 0 : return TheServiceParticipant->is_shut_down();
141 : }
142 :
143 : private:
144 : ~EndHistoricSamplesMissedSweeper();
145 :
146 : WeakRcHandle<DataReaderImpl> reader_;
147 : OPENDDS_SET(WriterInfo_rch) info_set_;
148 :
149 : class CommandBase : public Command {
150 : public:
151 0 : CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
152 : WriterInfo_rch& info)
153 0 : : sweeper_(sweeper)
154 0 : , info_(info)
155 0 : { }
156 :
157 : protected:
158 : EndHistoricSamplesMissedSweeper* sweeper_;
159 : WriterInfo_rch info_;
160 : };
161 :
162 : class ScheduleCommand : public CommandBase {
163 : public:
164 0 : ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
165 : WriterInfo_rch& info)
166 0 : : CommandBase(sweeper, info)
167 0 : { }
168 : virtual void execute();
169 : };
170 :
171 : class CancelCommand : public CommandBase {
172 : public:
173 0 : CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
174 : WriterInfo_rch& info)
175 0 : : CommandBase(sweeper, info)
176 0 : { }
177 : virtual void execute();
178 : };
179 : };
180 :
181 : class MessageHolder : public virtual RcObject {
182 : public:
183 : virtual const void* get() const = 0;
184 : };
185 :
186 : template <typename T>
187 : class MessageHolder_T : public MessageHolder {
188 : public:
189 0 : MessageHolder_T(const T& v) : v_(v) {}
190 0 : const void* get() const { return &v_; }
191 : private:
192 : T v_;
193 : };
194 :
195 : /**
196 : * @class DataReaderImpl
197 : *
198 : * @brief Implements the DDS::DataReader interface.
199 : *
200 : * See the DDS specification, OMG formal/2015-04-10, for a description of
201 : * the interface this class is implementing.
202 : *
203 : * This class must be inherited by the type-specific datareader which
204 : * is specific to the data-type associated with the topic.
205 : *
206 : */
207 : class OpenDDS_Dcps_Export DataReaderImpl
208 : : public virtual LocalObject<DataReaderEx>,
209 : public virtual DataReaderCallbacks,
210 : public virtual EntityImpl,
211 : public virtual TransportClient,
212 : public virtual TransportReceiveListener,
213 : private WriterInfoListener {
214 : public:
215 : friend class RequestedDeadlineWatchdog;
216 : friend class QueryConditionImpl;
217 : friend class SubscriberImpl;
218 :
219 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType;
220 : typedef OPENDDS_SET(DDS::InstanceHandle_t) InstanceSet;
221 : typedef OPENDDS_SET(SubscriptionInstance_rch) SubscriptionInstanceSet;
222 : /// Type of collection of statistics for writers to this reader.
223 : typedef OPENDDS_MAP_CMP(GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType;
224 :
225 : DataReaderImpl();
226 :
227 : virtual ~DataReaderImpl();
228 :
229 : virtual DDS::InstanceHandle_t get_instance_handle();
230 :
231 : virtual void add_association(const GUID_t& yourId,
232 : const WriterAssociation& writer,
233 : bool active);
234 :
235 : virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
236 :
237 : virtual void remove_associations(const WriterIdSeq& writers, bool callback);
238 :
239 : virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
240 :
241 : virtual void signal_liveliness(const GUID_t& remote_participant);
242 :
243 : /**
244 : * This is used to retrieve the listener for a certain status change.
245 : * If this datareader has a registered listener and the status kind
246 : * is in the listener mask then the listener is returned.
247 : * Otherwise, the query for the listener is propagated up to the
248 : * factory/subscriber.
249 : */
250 : DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
251 :
252 : /// tell instances when a DataWriter transitions to being alive
253 : /// The writer state is inout parameter, it has to be set ALIVE before
254 : /// handle_timeout is called since some subroutine use the state.
255 : void writer_became_alive(WriterInfo& info,
256 : const MonotonicTimePoint& when);
257 :
258 : /// tell instances when a DataWriter transitions to DEAD
259 : /// The writer state is inout parameter, the state is set to DEAD
260 : /// when it returns.
261 : void writer_became_dead(WriterInfo& info);
262 :
263 : /// tell instance when a DataWriter is removed.
264 : /// The liveliness status need update.
265 : void writer_removed(WriterInfo& info);
266 :
267 : virtual void cleanup();
268 :
269 : void init(
270 : TopicDescriptionImpl* a_topic_desc,
271 : const DDS::DataReaderQos& qos,
272 : DDS::DataReaderListener_ptr a_listener,
273 : const DDS::StatusMask& mask,
274 : DomainParticipantImpl* participant,
275 : SubscriberImpl* subscriber);
276 :
277 : virtual DDS::ReadCondition_ptr create_readcondition(
278 : DDS::SampleStateMask sample_states,
279 : DDS::ViewStateMask view_states,
280 : DDS::InstanceStateMask instance_states);
281 :
282 : #ifndef OPENDDS_NO_QUERY_CONDITION
283 : virtual DDS::QueryCondition_ptr create_querycondition(
284 : DDS::SampleStateMask sample_states,
285 : DDS::ViewStateMask view_states,
286 : DDS::InstanceStateMask instance_states,
287 : const char * query_expression,
288 : const DDS::StringSeq & query_parameters);
289 : #endif
290 :
291 : virtual DDS::ReturnCode_t delete_readcondition(
292 : DDS::ReadCondition_ptr a_condition);
293 :
294 : virtual DDS::ReturnCode_t delete_contained_entities();
295 :
296 : virtual DDS::ReturnCode_t set_qos(
297 : const DDS::DataReaderQos & qos);
298 :
299 : virtual DDS::ReturnCode_t get_qos(
300 : DDS::DataReaderQos & qos);
301 :
302 : virtual DDS::ReturnCode_t set_listener(
303 : DDS::DataReaderListener_ptr a_listener,
304 : DDS::StatusMask mask);
305 :
306 : virtual DDS::DataReaderListener_ptr get_listener();
307 :
308 : virtual DDS::TopicDescription_ptr get_topicdescription();
309 :
310 : virtual DDS::Subscriber_ptr get_subscriber();
311 :
312 : virtual DDS::ReturnCode_t get_sample_rejected_status(
313 : DDS::SampleRejectedStatus & status);
314 :
315 : virtual DDS::ReturnCode_t get_liveliness_changed_status(
316 : DDS::LivelinessChangedStatus & status);
317 :
318 : virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
319 : DDS::RequestedDeadlineMissedStatus & status);
320 :
321 : virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
322 : DDS::RequestedIncompatibleQosStatus & status);
323 :
324 : virtual DDS::ReturnCode_t get_subscription_matched_status(
325 : DDS::SubscriptionMatchedStatus & status);
326 :
327 : virtual DDS::ReturnCode_t get_sample_lost_status(
328 : DDS::SampleLostStatus & status);
329 :
330 : virtual DDS::ReturnCode_t wait_for_historical_data(
331 : const DDS::Duration_t & max_wait);
332 :
333 : virtual DDS::ReturnCode_t get_matched_publications(
334 : DDS::InstanceHandleSeq & publication_handles);
335 :
336 : #if !defined (DDS_HAS_MINIMUM_BIT)
337 : virtual DDS::ReturnCode_t get_matched_publication_data(
338 : DDS::PublicationBuiltinTopicData & publication_data,
339 : DDS::InstanceHandle_t publication_handle);
340 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
341 :
342 : virtual DDS::ReturnCode_t enable();
343 :
344 : #ifndef OPENDDS_SAFETY_PROFILE
345 : virtual void get_latency_stats(
346 : LatencyStatisticsSeq & stats);
347 : #endif
348 :
349 : virtual void reset_latency_stats();
350 :
351 : virtual CORBA::Boolean statistics_enabled();
352 :
353 : virtual void statistics_enabled(
354 : CORBA::Boolean statistics_enabled);
355 :
356 : /// @name Raw Latency Statistics Interfaces
357 : /// @{
358 :
359 : /// Expose the statistics container.
360 : const StatsMapType& raw_latency_statistics() const;
361 :
362 : /// Configure the size of the raw data collection buffer.
363 : unsigned int& raw_latency_buffer_size();
364 :
365 : /// Configure the type of the raw data collection buffer.
366 : DataCollector<double>::OnFull& raw_latency_buffer_type();
367 :
368 : /// @}
369 :
370 : /// update liveliness info for this writer.
371 : void writer_activity(const DataSampleHeader& header);
372 :
373 : /// process a message that has been received - could be control or a data sample.
374 : virtual void data_received(const ReceivedDataSample& sample);
375 :
376 : void transport_discovery_change();
377 :
378 : virtual bool check_transport_qos(const TransportInst& inst);
379 :
380 : bool have_sample_states(DDS::SampleStateMask sample_states) const;
381 : bool have_view_states(DDS::ViewStateMask view_states) const;
382 : bool have_instance_states(DDS::InstanceStateMask instance_states) const;
383 : bool contains_sample(DDS::SampleStateMask sample_states,
384 : DDS::ViewStateMask view_states,
385 : DDS::InstanceStateMask instance_states);
386 :
387 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
388 : virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
389 : DDS::ViewStateMask view_states,
390 : DDS::InstanceStateMask instance_states,
391 : const FilterEvaluator& evaluator,
392 : const DDS::StringSeq& params) = 0;
393 : #endif
394 :
395 : virtual RcHandle<MessageHolder> dds_demarshal(const ReceivedDataSample& sample,
396 : DDS::InstanceHandle_t publication_handle,
397 : SubscriptionInstance_rch& instance,
398 : bool& is_new_instance,
399 : bool& filtered,
400 : MarshalingType marshaling_type,
401 : bool full_copy) = 0;
402 :
403 : virtual void dispose_unregister(const ReceivedDataSample& sample,
404 : DDS::InstanceHandle_t publication_handle,
405 : SubscriptionInstance_rch& instance);
406 :
407 : void process_latency(const ReceivedDataSample& sample);
408 : void notify_latency(GUID_t writer);
409 :
410 0 : size_t get_depth() const
411 : {
412 0 : return static_cast<size_t>(depth_);
413 : }
414 0 : size_t get_n_chunks() const
415 : {
416 0 : return n_chunks_;
417 : }
418 :
419 : void liveliness_lost();
420 :
421 : void remove_all_associations();
422 :
423 : void notify_subscription_disconnected(const WriterIdSeq& pubids);
424 : void notify_subscription_reconnected(const WriterIdSeq& pubids);
425 : void notify_subscription_lost(const WriterIdSeq& pubids);
426 : void notify_liveliness_change();
427 :
428 : bool is_bit() const;
429 :
430 : /**
431 : * This method is used for a precondition check of delete_datareader.
432 : *
433 : * @retval true We have zero-copy samples loaned out
434 : * @retval false We have no zero-copy samples loaned out
435 : */
436 : bool has_zero_copies();
437 :
438 : /// Release the instance with the handle.
439 : void release_instance(DDS::InstanceHandle_t handle);
440 :
441 : // Take appropriate actions upon learning instance or view state has been updated
442 : void state_updated(DDS::InstanceHandle_t handle);
443 :
444 : /// Release all instances held by the reader.
445 : virtual void release_all_instances() = 0;
446 :
447 : ACE_Reactor_Timer_Interface* get_reactor();
448 :
449 : GUID_t get_topic_id();
450 : GUID_t get_dp_id();
451 :
452 : typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
453 : void get_instance_handles(InstanceHandleVec& instance_handles);
454 :
455 : typedef std::pair<GUID_t, WriterInfo::WriterState> WriterStatePair;
456 : typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
457 : void get_writer_states(WriterStatePairVec& writer_states);
458 :
459 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
460 : void update_ownership_strength (const GUID_t& pub_id,
461 : const CORBA::Long& ownership_strength);
462 :
463 : // Access to OwnershipManager is only valid when the domain participant is valid;
464 : // therefore, we must lock the domain pariticipant when using OwnershipManager.
465 : class OwnershipManagerPtr
466 : {
467 : public:
468 0 : OwnershipManagerPtr(DataReaderImpl* reader)
469 0 : : participant_( (reader && reader->is_exclusive_ownership_) ? reader->participant_servant_.lock() : RcHandle<DomainParticipantImpl>())
470 : {
471 0 : }
472 0 : operator bool() const { return participant_.in(); }
473 0 : OwnershipManager* operator->() const
474 : {
475 0 : return participant_ ? participant_->ownership_manager() : 0;
476 : }
477 :
478 : private:
479 : RcHandle<DomainParticipantImpl> participant_;
480 : };
481 : friend class OwnershipManagerPtr;
482 :
483 : struct OwnershipManagerScopedAccess {
484 0 : OwnershipManagerScopedAccess() : om_(0), lock_result_(0) {}
485 0 : explicit OwnershipManagerScopedAccess(DataReaderImpl::OwnershipManagerPtr om) : om_(om), lock_result_(om_ ? om_->instance_lock_acquire() : 0) {}
486 0 : ~OwnershipManagerScopedAccess() { release(); }
487 :
488 0 : void swap(OwnershipManagerScopedAccess& rhs)
489 : {
490 0 : if (&rhs != this) {
491 0 : std::swap(om_, rhs.om_);
492 0 : std::swap(lock_result_, rhs.lock_result_);
493 : }
494 0 : }
495 :
496 0 : int release()
497 : {
498 0 : int result = 0;
499 0 : if (om_ && !lock_result_) {
500 0 : result = om_->instance_lock_release();
501 : }
502 0 : om_ = 0;
503 0 : lock_result_ = 0;
504 0 : return result;
505 : }
506 :
507 : OwnershipManagerPtr om_;
508 : int lock_result_;
509 : };
510 :
511 0 : OwnershipManagerPtr ownership_manager() { return OwnershipManagerPtr(this); }
512 : #endif
513 :
514 : virtual void lookup_instance(const ReceivedDataSample& sample,
515 : SubscriptionInstance_rch& instance) = 0;
516 :
517 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
518 :
519 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
520 :
521 : void enable_filtering(ContentFilteredTopicImpl* cft);
522 :
523 : DDS::ContentFilteredTopic_ptr get_cf_topic() const;
524 :
525 : #endif
526 :
527 : #ifndef OPENDDS_NO_MULTI_TOPIC
528 :
529 : void enable_multi_topic(MultiTopicImpl* mt);
530 :
531 : #endif
532 :
533 : void update_subscription_params(const DDS::StringSeq& params) const;
534 :
535 : typedef OPENDDS_VECTOR(void*) GenericSeq;
536 :
537 : struct GenericBundle {
538 : GenericSeq samples_;
539 : DDS::SampleInfoSeq info_;
540 : };
541 :
542 : virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
543 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
544 : DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
545 :
546 : virtual DDS::ReturnCode_t take(
547 : AbstractSamples& samples,
548 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
549 : DDS::InstanceStateMask instance_states)=0;
550 :
551 : virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
552 :
553 : virtual DDS::ReturnCode_t read_instance_generic(void*& data,
554 : DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
555 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
556 : DDS::InstanceStateMask instance_states) = 0;
557 :
558 : virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
559 : DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
560 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
561 : DDS::InstanceStateMask instance_states) = 0;
562 :
563 : #endif
564 :
565 0 : void set_instance_state(DDS::InstanceHandle_t instance,
566 : DDS::InstanceStateKind state,
567 : const SystemTimePoint& timestamp = SystemTimePoint::now(),
568 : const GUID_t& guid = GUID_UNKNOWN)
569 : {
570 0 : DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
571 : {
572 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
573 0 : RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(guid);
574 0 : if (pos != publication_id_to_handle_map_.end()) {
575 0 : publication_handle = pos->second;
576 : }
577 0 : }
578 :
579 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
580 0 : set_instance_state_i(instance, publication_handle, state, timestamp, guid);
581 0 : }
582 :
583 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
584 : void begin_access();
585 : void end_access();
586 : void get_ordered_data(GroupRakeData& data,
587 : DDS::SampleStateMask sample_states,
588 : DDS::ViewStateMask view_states,
589 : DDS::InstanceStateMask instance_states);
590 :
591 : void accept_coherent (const GUID_t& writer_id,
592 : const GUID_t& publisher_id);
593 : void reject_coherent (const GUID_t& writer_id,
594 : const GUID_t& publisher_id);
595 : void coherent_change_received (const GUID_t& publisher_id, Coherent_State& result);
596 :
597 : void coherent_changes_completed (DataReaderImpl* reader);
598 :
599 : void reset_coherent_info (const GUID_t& writer_id,
600 : const GUID_t& publisher_id);
601 : #endif
602 :
603 : // Called upon subscriber qos change to update the local cache.
604 : void set_subscriber_qos(const DDS::SubscriberQos & qos);
605 :
606 : // Set the instance related writers to reevaluate the owner.
607 : void reset_ownership (DDS::InstanceHandle_t instance);
608 :
609 : virtual RcHandle<EntityImpl> parent() const;
610 :
611 : void disable_transport();
612 :
613 : virtual void register_for_writer(const GUID_t& /*participant*/,
614 : const GUID_t& /*readerid*/,
615 : const GUID_t& /*writerid*/,
616 : const TransportLocatorSeq& /*locators*/,
617 : DiscoveryListener* /*listener*/);
618 :
619 : virtual void unregister_for_writer(const GUID_t& /*participant*/,
620 : const GUID_t& /*readerid*/,
621 : const GUID_t& /*writerid*/);
622 :
623 : virtual void update_locators(const GUID_t& remote,
624 : const TransportLocatorSeq& locators);
625 :
626 : virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
627 :
628 0 : GUID_t get_guid() const
629 : {
630 0 : ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
631 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
632 0 : while (!has_subscription_id_ && !get_deleted()) {
633 0 : subscription_id_condition_.wait(thread_status_manager);
634 : }
635 0 : return subscription_id_;
636 0 : }
637 :
638 : void return_handle(DDS::InstanceHandle_t handle);
639 :
640 0 : const ValueDispatcher* get_value_dispatcher() const
641 : {
642 0 : TopicDescriptionPtr<TopicImpl> temp(topic_servant_);
643 0 : return temp ? dynamic_cast<const ValueDispatcher*>(temp->get_type_support()) : 0;
644 0 : }
645 :
646 : protected:
647 :
648 : // Update max flag if the spec ever changes
649 : static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE;
650 : static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1;
651 : static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u;
652 :
653 : // Update max flag if the spec ever changes
654 : static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE;
655 : static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1;
656 : static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u;
657 :
658 : // Update max flag if the spec ever changes
659 : static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
660 : static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1;
661 : static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u;
662 :
663 : // These may need to be updated if the spec ever changes
664 : static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS;
665 : static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS;
666 :
667 : typedef OPENDDS_SET(DDS::InstanceHandle_t) HandleSet;
668 : typedef OPENDDS_MAP(CORBA::ULong, HandleSet) LookupMap;
669 :
670 0 : static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
671 : {
672 0 : sample_states &= MAX_SAMPLE_STATE_MASK;
673 0 : view_states &= MAX_VIEW_STATE_MASK;
674 0 : instance_states &= MAX_INSTANCE_STATE_MASK;
675 0 : if (!(sample_states && view_states && instance_states)) {
676 : // catch-all for "bogus" lookups
677 0 : return 0;
678 : }
679 0 : return (sample_states << COMBINED_SAMPLE_STATE_SHIFT) | (view_states << COMBINED_VIEW_STATE_SHIFT) | instance_states;
680 : }
681 :
682 0 : static void split_combined_states(CORBA::ULong combined, CORBA::ULong& sample_states, CORBA::ULong& view_states, CORBA::ULong& instance_states)
683 : {
684 0 : sample_states = (combined >> COMBINED_SAMPLE_STATE_SHIFT) & MAX_SAMPLE_STATE_MASK;
685 0 : view_states = (combined >> COMBINED_VIEW_STATE_SHIFT) & MAX_VIEW_STATE_MASK;
686 0 : instance_states = combined & MAX_INSTANCE_STATE_MASK;
687 0 : }
688 :
689 : void initialize_lookup_maps();
690 : void update_lookup_maps(const SubscriptionInstanceMapType::iterator& input);
691 : void remove_from_lookup_maps(DDS::InstanceHandle_t handle);
692 : const HandleSet& lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const;
693 :
694 : LookupMap combined_state_lookup_;
695 :
696 : // Perform cast to get extended version of listener (otherwise nil)
697 : DataReaderListener_ptr get_ext_listener();
698 :
699 : virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
700 :
701 : void prepare_to_delete();
702 :
703 : /// Setup deserialization options
704 : DDS::ReturnCode_t setup_deserialization();
705 :
706 : RcHandle<SubscriberImpl> get_subscriber_servant();
707 :
708 : void post_read_or_take();
709 :
710 : // type specific DataReader's part of enable.
711 : virtual DDS::ReturnCode_t enable_specific() = 0;
712 :
713 : void sample_info(DDS::SampleInfo & sample_info,
714 : const ReceivedDataElement *ptr);
715 :
716 : CORBA::Long total_samples() const;
717 :
718 : void set_sample_lost_status(const DDS::SampleLostStatus& status);
719 : void set_sample_rejected_status(
720 : const DDS::SampleRejectedStatus& status);
721 :
722 : SubscriptionInstance_rch get_handle_instance(
723 : DDS::InstanceHandle_t handle);
724 :
725 : /**
726 : * Get an instance handle for a new instance.
727 : */
728 : DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
729 :
730 : virtual void purge_data(SubscriptionInstance_rch instance) = 0;
731 :
732 : virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
733 : virtual void state_updated_i(DDS::InstanceHandle_t handle) = 0;
734 :
735 : bool has_readcondition(DDS::ReadCondition_ptr a_condition);
736 :
737 : /// @TODO: document why the instances_ container is mutable.
738 : mutable SubscriptionInstanceMapType instances_;
739 :
740 : /// Assume since the container is mutable(?!!?) it may need to use the
741 : /// lock while const.
742 : /// @TODO: remove the recursive nature of the instances_lock if not needed.
743 : mutable ACE_Recursive_Thread_Mutex instances_lock_;
744 :
745 : /// Check if the received data sample or instance should
746 : /// be filtered.
747 : /**
748 : * @note Filtering will only occur if the application
749 : * configured a finite duration in the Topic's LIFESPAN
750 : * QoS policy or DataReader's TIME_BASED_FILTER QoS policy.
751 : */
752 : bool filter_sample(const DataSampleHeader& header);
753 :
754 : bool ownership_filter_instance(const SubscriptionInstance_rch& instance,
755 : const GUID_t& pubid);
756 : bool time_based_filter_instance(const SubscriptionInstance_rch& instance,
757 : MonotonicTimePoint& now,
758 : MonotonicTimePoint& deadline);
759 :
760 : void accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance);
761 :
762 : virtual void qos_change(const DDS::DataReaderQos& qos);
763 :
764 : /// Data has arrived into the cache, unblock waiting ReadConditions
765 : void notify_read_conditions();
766 :
767 : bool has_subscription_id_;
768 : mutable ACE_Thread_Mutex subscription_id_mutex_;
769 : mutable ConditionVariable<ACE_Thread_Mutex> subscription_id_condition_;
770 :
771 : unique_ptr<ReceivedDataAllocator> rd_allocator_;
772 : DDS::DataReaderQos qos_;
773 : DDS::DataReaderQos passed_qos_;
774 :
775 : // Status conditions accessible by subclasses.
776 : DDS::SampleRejectedStatus sample_rejected_status_;
777 : DDS::SampleLostStatus sample_lost_status_;
778 :
779 : /// lock protecting sample container as well as statuses.
780 : ACE_Recursive_Thread_Mutex sample_lock_;
781 :
782 : typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
783 : Reverse_Lock_t reverse_sample_lock_;
784 :
785 : WeakRcHandle<DomainParticipantImpl> participant_servant_;
786 : TopicDescriptionPtr<TopicImpl> topic_servant_;
787 : TypeSupportImpl* type_support_;
788 : GUID_t topic_id_;
789 :
790 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
791 : bool is_exclusive_ownership_;
792 :
793 : #endif
794 :
795 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
796 : mutable ACE_Thread_Mutex content_filtered_topic_mutex_;
797 : TopicDescriptionPtr<ContentFilteredTopicImpl> content_filtered_topic_;
798 : #endif
799 :
800 : #ifndef OPENDDS_NO_MULTI_TOPIC
801 : TopicDescriptionPtr<MultiTopicImpl> multi_topic_;
802 : #endif
803 :
804 : /// Is accessing to Group coherent changes ?
805 : bool coherent_;
806 :
807 : /// Ordered group samples.
808 : GroupRakeData group_coherent_ordered_data_;
809 :
810 : DDS::SubscriberQos subqos_;
811 :
812 : virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
813 :
814 : private:
815 0 : virtual void install_type_support(TypeSupportImpl*) {}
816 :
817 : virtual void set_instance_state_i(DDS::InstanceHandle_t instance,
818 : DDS::InstanceHandle_t publication_handle,
819 : DDS::InstanceStateKind state,
820 : const SystemTimePoint& timestamp,
821 : const GUID_t& guid) = 0;
822 :
823 : void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
824 :
825 : /// Lookup the instance handles by the publication repo ids
826 : void lookup_instance_handles(const WriterIdSeq& ids,
827 : DDS::InstanceHandleSeq& hdls);
828 :
829 : void instances_liveliness_update(const GUID_t& writer,
830 : DDS::InstanceHandle_t publication_handle);
831 :
832 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
833 : bool verify_coherent_changes_completion(WriterInfo* writer);
834 : bool coherent_change_received(WriterInfo* writer);
835 : #endif
836 :
837 0 : RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const
838 : {
839 0 : RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
840 0 : if (participant_servant) {
841 0 : return participant_servant->get_builtin_subscriber_proxy();
842 : }
843 :
844 0 : return RcHandle<BitSubscriber>();
845 0 : }
846 :
847 0 : DDS::DomainId_t domain_id() const { return this->domain_id_; }
848 :
849 0 : Priority get_priority_value(const AssociationData& data) const {
850 0 : return data.publication_transport_priority_;
851 : }
852 :
853 : #if defined(OPENDDS_SECURITY)
854 : DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
855 : #endif
856 :
857 : /// when done handling historic samples, resume
858 : void resume_sample_processing(const GUID_t& pub_id);
859 :
860 : /// collect samples received before END_HISTORIC_SAMPLES
861 : /// returns false if normal processing of this sample should be skipped
862 : bool check_historic(const ReceivedDataSample& sample);
863 :
864 : /// deliver samples that were held by check_historic()
865 : void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
866 :
867 : friend class InstanceState;
868 : friend class EndHistoricSamplesMissedSweeper;
869 :
870 : friend class ::DDS_TEST; //allows tests to get at private data
871 :
872 : DDS::TopicDescription_var topic_desc_;
873 : ACE_Thread_Mutex listener_mutex_;
874 : DDS::StatusMask listener_mask_;
875 : DDS::DataReaderListener_var listener_;
876 : DDS::DomainId_t domain_id_;
877 : GUID_t dp_id_;
878 : // subscriber_servant_ has to be a weak pinter because it may be used from the
879 : // transport reactor thread and that thread doesn't have the owenership of the
880 : // the subscriber_servant_ object.
881 : WeakRcHandle<SubscriberImpl> subscriber_servant_;
882 : RcHandle<EndHistoricSamplesMissedSweeper> end_historic_sweeper_;
883 :
884 : CORBA::Long depth_;
885 : size_t n_chunks_;
886 :
887 : //Used to protect access to id_to_handle_map_
888 : ACE_Recursive_Thread_Mutex publication_handle_lock_;
889 :
890 : typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
891 : RepoIdToHandleMap publication_id_to_handle_map_;
892 :
893 : // Status conditions.
894 : DDS::LivelinessChangedStatus liveliness_changed_status_;
895 : DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_;
896 : DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_;
897 : DDS::SubscriptionMatchedStatus subscription_match_status_;
898 :
899 : // OpenDDS extended status. This is only available via listener.
900 : BudgetExceededStatus budget_exceeded_status_;
901 :
902 : /**
903 : * @todo The subscription_lost_status_ and
904 : * subscription_reconnecting_status_ are left here for
905 : * future use when we add get_subscription_lost_status()
906 : * and get_subscription_reconnecting_status() methods.
907 : */
908 : // Statistics of the lost subscriptions due to lost connection.
909 : SubscriptionLostStatus subscription_lost_status_;
910 : // Statistics of the subscriptions that are associated with a
911 : // reconnecting datalink.
912 : // SubscriptionReconnectingStatus subscription_reconnecting_status_;
913 :
914 : /// The orb's reactor to be used to register the liveliness
915 : /// timer.
916 : ACE_Reactor_Timer_Interface* reactor_;
917 :
918 : class LivelinessTimer : public ReactorInterceptor {
919 : public:
920 0 : LivelinessTimer(ACE_Reactor* reactor,
921 : ACE_thread_t owner,
922 : DataReaderImpl* data_reader)
923 0 : : ReactorInterceptor(reactor, owner)
924 0 : , data_reader_(*data_reader)
925 0 : , liveliness_timer_id_(-1)
926 0 : { }
927 :
928 : void check_liveliness();
929 :
930 : void cancel_timer()
931 : {
932 : execute_or_enqueue(make_rch<CancelCommand>(this));
933 : }
934 :
935 0 : virtual bool reactor_is_shut_down() const
936 : {
937 0 : return TheServiceParticipant->is_shut_down();
938 : }
939 :
940 : private:
941 0 : ~LivelinessTimer() { }
942 :
943 : WeakRcHandle<DataReaderImpl> data_reader_;
944 :
945 : /// liveliness timer id; -1 if no timer is set
946 : long liveliness_timer_id_;
947 : void check_liveliness_i(bool cancel, const MonotonicTimePoint& now);
948 :
949 : int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
950 :
951 : class CommandBase : public Command {
952 : public:
953 0 : CommandBase(LivelinessTimer* timer)
954 0 : : timer_(timer)
955 0 : { }
956 :
957 : protected:
958 : LivelinessTimer* timer_;
959 : };
960 :
961 : class CheckLivelinessCommand : public CommandBase {
962 : public:
963 0 : CheckLivelinessCommand(LivelinessTimer* timer)
964 0 : : CommandBase(timer)
965 0 : { }
966 0 : virtual void execute()
967 : {
968 0 : timer_->check_liveliness_i(true, MonotonicTimePoint::now());
969 0 : }
970 : };
971 :
972 : class CancelCommand : public CommandBase {
973 : public:
974 : CancelCommand(LivelinessTimer* timer)
975 : : CommandBase(timer)
976 : { }
977 : virtual void execute()
978 : {
979 : if (timer_->liveliness_timer_id_ != -1) {
980 : timer_->reactor()->cancel_timer(timer_);
981 : }
982 : }
983 : };
984 : };
985 : RcHandle<LivelinessTimer> liveliness_timer_;
986 :
987 : CORBA::Long last_deadline_missed_total_count_;
988 : /// Watchdog responsible for reporting missed offered
989 : /// deadlines.
990 : TimeDuration deadline_period_;
991 : typedef OPENDDS_MULTIMAP(MonotonicTimePoint, SubscriptionInstance_rch) DeadlineQueue;
992 : DeadlineQueue deadline_queue_;
993 : bool deadline_queue_enabled_;
994 : typedef PmfSporadicTask<DataReaderImpl> DRISporadicTask;
995 : RcHandle<DRISporadicTask> deadline_task_;
996 :
997 : void schedule_deadline(SubscriptionInstance_rch instance,
998 : bool timer_called);
999 : void reset_deadline_period(const TimeDuration& deadline_period);
1000 : void reschedule_deadline(SubscriptionInstance_rch instance,
1001 : const MonotonicTimePoint& now);
1002 : void cancel_deadline(SubscriptionInstance_rch instance);
1003 : void cancel_all_deadlines();
1004 : void deadline_task(const MonotonicTimePoint& now);
1005 : void process_deadline(SubscriptionInstance_rch instance,
1006 : const MonotonicTimePoint& now,
1007 : bool timer_called);
1008 :
1009 : /// Flag indicates that this datareader is a builtin topic
1010 : /// datareader.
1011 : bool is_bit_;
1012 :
1013 : bool always_get_history_;
1014 :
1015 : /// Flag indicating status of statistics gathering.
1016 : AtomicBool statistics_enabled_;
1017 :
1018 : /// publications writing to this reader.
1019 : typedef OPENDDS_MAP_CMP(GUID_t, WriterInfo_rch,
1020 : GUID_tKeyLessThan) WriterMapType;
1021 :
1022 : WriterMapType writers_;
1023 :
1024 : /// RW lock for reading/writing publications.
1025 : ACE_RW_Thread_Mutex writers_lock_;
1026 :
1027 : /// Statistics for this reader, collected for each writer.
1028 : StatsMapType statistics_;
1029 : ACE_Recursive_Thread_Mutex statistics_lock_;
1030 :
1031 : /// Bound (or initial reservation) of raw latency buffer.
1032 : unsigned int raw_latency_buffer_size_;
1033 :
1034 : /// Type of raw latency data buffer.
1035 : DataCollector<double>::OnFull raw_latency_buffer_type_;
1036 :
1037 : typedef VarLess<DDS::ReadCondition> RCCompLess;
1038 : typedef OPENDDS_SET_CMP(DDS::ReadCondition_var, RCCompLess) ReadConditionSet;
1039 : ReadConditionSet read_conditions_;
1040 :
1041 : /// Monitor object for this entity
1042 : unique_ptr<Monitor> monitor_;
1043 :
1044 : /// Periodic Monitor object for this entity
1045 : unique_ptr<Monitor> periodic_monitor_;
1046 :
1047 : bool transport_disabled_;
1048 :
1049 : protected:
1050 : typedef OPENDDS_SET(Encoding::Kind) EncodingKinds;
1051 : EncodingKinds decoding_modes_;
1052 :
1053 : public:
1054 : class OpenDDS_Dcps_Export OnDataOnReaders : public Job {
1055 : public:
1056 0 : OnDataOnReaders(WeakRcHandle<SubscriberImpl> subscriber,
1057 : DDS::SubscriberListener_var sub_listener,
1058 : WeakRcHandle<DataReaderImpl> data_reader,
1059 : bool call,
1060 : bool set_reader_status)
1061 0 : : subscriber_(subscriber)
1062 0 : , sub_listener_(sub_listener)
1063 0 : , data_reader_(data_reader)
1064 0 : , call_(call)
1065 0 : , set_reader_status_(set_reader_status)
1066 0 : {}
1067 :
1068 : private:
1069 : virtual void execute();
1070 :
1071 : WeakRcHandle<SubscriberImpl> subscriber_;
1072 : DDS::SubscriberListener_var sub_listener_;
1073 : WeakRcHandle<DataReaderImpl> data_reader_;
1074 : const bool call_;
1075 : const bool set_reader_status_;
1076 : };
1077 :
1078 : class OpenDDS_Dcps_Export OnDataAvailable : public Job {
1079 : public:
1080 0 : OnDataAvailable(DDS::DataReaderListener_var listener,
1081 : WeakRcHandle<DataReaderImpl> data_reader,
1082 : bool call,
1083 : bool set_reader_status,
1084 : bool set_subscriber_status)
1085 0 : : listener_(listener)
1086 0 : , data_reader_(data_reader)
1087 0 : , call_(call)
1088 0 : , set_reader_status_(set_reader_status)
1089 0 : , set_subscriber_status_(set_subscriber_status)
1090 0 : {}
1091 :
1092 : private:
1093 : virtual void execute();
1094 :
1095 : DDS::DataReaderListener_var listener_;
1096 : WeakRcHandle<DataReaderImpl> data_reader_;
1097 : const bool call_;
1098 : const bool set_reader_status_;
1099 : const bool set_subscriber_status_;
1100 : };
1101 :
1102 : protected:
1103 : #ifdef OPENDDS_SECURITY
1104 : Security::SecurityConfig_rch security_config_;
1105 : DDS::DynamicType_var dynamic_type_;
1106 : #endif
1107 :
1108 : TransportMessageBlockAllocator mb_alloc_;
1109 : };
1110 :
1111 : typedef RcHandle<DataReaderImpl> DataReaderImpl_rch;
1112 :
1113 : } // namespace DCPS
1114 : } // namespace OpenDDS
1115 :
1116 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
1117 :
1118 : #if defined (__ACE_INLINE__)
1119 : # include "DataReaderImpl.inl"
1120 : #endif /* __ACE_INLINE__ */
1121 :
1122 : #endif /* OPENDDS_DCPS_DATAREADER_H */
|