8 #ifndef OPENDDS_DCPS_DATAREADERIMPL_H 9 #define OPENDDS_DCPS_DATAREADERIMPL_H 43 #include <dds/DdsDcpsTopicC.h> 44 #include <dds/DdsDcpsSubscriptionExtC.h> 45 #include <dds/DdsDcpsDomainC.h> 46 #include <dds/DdsDcpsTopicC.h> 47 #include <dds/DdsDcpsInfrastructureC.h> 55 #if !defined (ACE_LACKS_PRAGMA_ONCE) 67 class DomainParticipantImpl;
68 class SubscriptionInstance;
70 class TopicDescriptionImpl;
103 #ifndef OPENDDS_SAFETY_PROFILE 105 std::ostream& raw_data(std::ostream& str)
const;
113 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 136 int handle_timeout(
const ACE_Time_Value& current_time,
const void* arg);
168 virtual void execute();
177 virtual void execute();
183 virtual const void*
get()
const = 0;
186 template <
typename T>
190 const void*
get()
const {
return &v_; }
215 friend class RequestedDeadlineWatchdog;
231 virtual void add_association(
const GUID_t& yourId,
235 virtual void transport_assoc_done(
int flags,
const GUID_t& remote_id);
237 virtual void remove_associations(
const WriterIdSeq& writers,
bool callback);
241 virtual void signal_liveliness(
const GUID_t& remote_participant);
267 virtual void cleanup();
272 DDS::DataReaderListener_ptr a_listener,
277 virtual DDS::ReadCondition_ptr create_readcondition(
282 #ifndef OPENDDS_NO_QUERY_CONDITION 283 virtual DDS::QueryCondition_ptr create_querycondition(
287 const char * query_expression,
292 DDS::ReadCondition_ptr a_condition);
303 DDS::DataReaderListener_ptr a_listener,
306 virtual DDS::DataReaderListener_ptr get_listener();
308 virtual DDS::TopicDescription_ptr get_topicdescription();
310 virtual DDS::Subscriber_ptr get_subscriber();
336 #if !defined (DDS_HAS_MINIMUM_BIT) 340 #endif // !defined (DDS_HAS_MINIMUM_BIT) 344 #ifndef OPENDDS_SAFETY_PROFILE 345 virtual void get_latency_stats(
349 virtual void reset_latency_stats();
353 virtual void statistics_enabled(
360 const StatsMapType& raw_latency_statistics()
const;
363 unsigned int& raw_latency_buffer_size();
376 void transport_discovery_change();
387 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 398 bool& is_new_instance,
408 void notify_latency(
GUID_t writer);
412 return static_cast<size_t>(depth_);
419 void liveliness_lost();
421 void remove_all_associations();
423 void notify_subscription_disconnected(
const WriterIdSeq& pubids);
424 void notify_subscription_reconnected(
const WriterIdSeq& pubids);
425 void notify_subscription_lost(
const WriterIdSeq& pubids);
426 void notify_liveliness_change();
436 bool has_zero_copies();
445 virtual void release_all_instances() = 0;
453 void get_instance_handles(InstanceHandleVec& instance_handles);
457 void get_writer_states(WriterStatePairVec& writer_states);
459 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 460 void update_ownership_strength (
const GUID_t& pub_id,
472 operator bool()
const {
return participant_.in(); }
475 return participant_ ? participant_->ownership_manager() : 0;
499 if (om_ && !lock_result_) {
500 result = om_->instance_lock_release();
517 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 519 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 523 DDS::ContentFilteredTopic_ptr get_cf_topic()
const;
527 #ifndef OPENDDS_NO_MULTI_TOPIC 533 void update_subscription_params(
const DDS::StringSeq& params)
const;
573 RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(guid);
574 if (pos != publication_id_to_handle_map_.end()) {
575 publication_handle = pos->second;
580 set_instance_state_i(instance, publication_handle, state,
timestamp, guid);
583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 591 void accept_coherent (
const GUID_t& writer_id,
592 const GUID_t& publisher_id);
593 void reject_coherent (
const GUID_t& writer_id,
594 const GUID_t& publisher_id);
599 void reset_coherent_info (
const GUID_t& writer_id,
600 const GUID_t& publisher_id);
611 void disable_transport();
613 virtual void register_for_writer(
const GUID_t& ,
619 virtual void unregister_for_writer(
const GUID_t& ,
623 virtual void update_locators(
const GUID_t& remote,
632 while (!has_subscription_id_ && !get_deleted()) {
633 subscription_id_condition_.wait(thread_status_manager);
635 return subscription_id_;
643 return temp ?
dynamic_cast<const ValueDispatcher*
>(temp->get_type_support()) : 0;
650 static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1;
655 static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1;
660 static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1;
664 static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS;
665 static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS;
672 sample_states &= MAX_SAMPLE_STATE_MASK;
673 view_states &= MAX_VIEW_STATE_MASK;
674 instance_states &= MAX_INSTANCE_STATE_MASK;
675 if (!(sample_states && view_states && instance_states)) {
679 return (sample_states << COMBINED_SAMPLE_STATE_SHIFT) | (view_states << COMBINED_VIEW_STATE_SHIFT) | instance_states;
684 sample_states = (combined >> COMBINED_SAMPLE_STATE_SHIFT) & MAX_SAMPLE_STATE_MASK;
685 view_states = (combined >> COMBINED_VIEW_STATE_SHIFT) & MAX_VIEW_STATE_MASK;
686 instance_states = combined & MAX_INSTANCE_STATE_MASK;
689 void initialize_lookup_maps();
690 void update_lookup_maps(
const SubscriptionInstanceMapType::iterator& input);
697 DataReaderListener_ptr get_ext_listener();
699 virtual void remove_associations_i(
const WriterIdSeq& writers,
bool callback);
701 void prepare_to_delete();
708 void post_read_or_take();
719 void set_sample_rejected_status(
735 bool has_readcondition(DDS::ReadCondition_ptr a_condition);
765 void notify_read_conditions();
790 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 795 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 800 #ifndef OPENDDS_NO_MULTI_TOPIC 826 void lookup_instance_handles(
const WriterIdSeq& ids,
829 void instances_liveliness_update(
const GUID_t& writer,
832 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 833 bool verify_coherent_changes_completion(
WriterInfo* writer);
834 bool coherent_change_received(
WriterInfo* writer);
840 if (participant_servant) {
841 return participant_servant->get_builtin_subscriber_proxy();
853 #if defined(OPENDDS_SECURITY) 858 void resume_sample_processing(
const GUID_t& pub_id);
870 friend class ::DDS_TEST;
924 , data_reader_(*data_reader)
925 , liveliness_timer_id_(-1)
928 void check_liveliness();
932 execute_or_enqueue(make_rch<CancelCommand>(
this));
949 int handle_timeout(
const ACE_Time_Value& current_time,
const void* arg);
979 if (timer_->liveliness_timer_id_ != -1) {
980 timer_->reactor()->cancel_timer(timer_);
999 void reset_deadline_period(
const TimeDuration& deadline_period);
1003 void cancel_all_deadlines();
1038 typedef OPENDDS_SET_CMP(DDS::ReadCondition_var, RCCompLess) ReadConditionSet;
1057 DDS::SubscriberListener_var sub_listener,
1060 bool set_reader_status)
1061 : subscriber_(subscriber)
1062 , sub_listener_(sub_listener)
1063 , data_reader_(data_reader)
1065 , set_reader_status_(set_reader_status)
1069 virtual void execute();
1083 bool set_reader_status,
1084 bool set_subscriber_status)
1085 : listener_(listener)
1086 , data_reader_(data_reader)
1088 , set_reader_status_(set_reader_status)
1089 , set_subscriber_status_(set_subscriber_status)
1093 virtual void execute();
1103 #ifdef OPENDDS_SECURITY 1118 #if defined (__ACE_INLINE__)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
void swap(MessageBlock &lhs, MessageBlock &rhs)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
manage the states of a received data instance.
DDS::DataReaderListener_var listener_
Implements the OpenDDS::DCPS::Entity interfaces.
sequence< InstanceHandle_t > InstanceHandleSeq
bool has_subscription_id_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t
const ValueDispatcher * get_value_dispatcher() const
const InstanceHandle_t HANDLE_NIL
virtual void install_type_support(TypeSupportImpl *)
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
BudgetExceededStatus budget_exceeded_status_
Base class to hold configuration settings for TransportImpls.
WeakRcHandle< DataReaderImpl > reader_
const bool set_subscriber_status_
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
DDS::TopicDescription_var topic_desc_
AtomicBool statistics_enabled_
Flag indicating status of statistics gathering.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
DDS::SubscriptionMatchedStatus subscription_match_status_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
SubscriptionLostStatus subscription_lost_status_
DDS::SubscriberListener_var sub_listener_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
#define OpenDDS_Dcps_Export
TransportMessageBlockAllocator mb_alloc_
sequence< SampleInfo > SampleInfoSeq
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
RcHandle< DomainParticipantImpl > participant_
Defines the interface for Discovery callbacks into the DataReader.
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint ×tamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
virtual bool reactor_is_shut_down() const
Reverse_Lock_t reverse_sample_lock_
RcHandle< LivelinessTimer > liveliness_timer_
#define OPENDDS_MULTIMAP(K, T)
Cached_Allocator_With_Overflow< ReceivedDataElementMemoryBlock, ACE_Thread_Mutex > ReceivedDataAllocator
unsigned long InstanceStateMask
sequence< TransportLocator > TransportLocatorSeq
std::pair< GUID_t, WriterInfo::WriterState > WriterStatePair
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
RcHandle< BitSubscriber > get_builtin_subscriber_proxy() const
unique_ptr< ReceivedDataAllocator > rd_allocator_
Collection of latency statistics for a single association.
DDS::DomainId_t domain_id_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
static TimePoint_T< SystemClock > now()
MessageHolder_T(const T &v)
DeadlineQueue deadline_queue_
LookupMap combined_state_lookup_
DOMAINID_TYPE_NATIVE DomainId_t
DDS::DataReaderListener_var listener_
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
WeakRcHandle< DataReaderImpl > data_reader_
bool coherent_
Is accessing to Group coherent changes ?
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
const ViewStateKind NOT_NEW_VIEW_STATE
DDS::DomainId_t domain_id() const
DDS::DynamicType_var dynamic_type_
ACE_Thread_Mutex subscription_id_mutex_
ACE_Thread_Mutex listener_mutex_
bool deadline_queue_enabled_
EndHistoricSamplesMissedSweeper * sweeper_
void swap(OwnershipManagerScopedAccess &rhs)
sequence< LatencyStatistics > LatencyStatisticsSeq
OnDataAvailable(DDS::DataReaderListener_var listener, WeakRcHandle< DataReaderImpl > data_reader, bool call, bool set_reader_status, bool set_subscriber_status)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
virtual ~AbstractSamples()
Holds a data sample received by the transport.
long ParticipantCryptoHandle
unsigned long InstanceStateKind
A fixed-size allocator that caches items for quicker access but if the pool is exhausted it will use ...
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
WeakRcHandle< SubscriberImpl > subscriber_servant_
virtual bool reactor_is_shut_down() const
Implements the DDS::DataReader interface.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
OwnershipManagerPtr(DataReaderImpl *reader)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
WeakRcHandle< DataReaderImpl > data_reader_
CommandBase(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) GuidSet
CommandBase(LivelinessTimer *timer)
OwnershipManagerScopedAccess(DataReaderImpl::OwnershipManagerPtr om)
OnDataOnReaders(WeakRcHandle< SubscriberImpl > subscriber, DDS::SubscriberListener_var sub_listener, WeakRcHandle< DataReaderImpl > data_reader, bool call, bool set_reader_status)
OwnershipManagerScopedAccess()
RcHandle< DRISporadicTask > deadline_task_
sequence< GUID_t > WriterIdSeq
TimeDuration deadline_period_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
RepoIdToHandleMap publication_id_to_handle_map_
static void split_combined_states(CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
::DDS::ReturnCode_t take(inout<%TYPE%><%SEQ%> received_data, inout ::DDS::SampleInfoSeq info_seq, in long max_samples, in ::DDS::SampleStateMask sample_states, in ::DDS::ViewStateMask view_states, in ::DDS::InstanceStateMask instance_states)
WeakRcHandle< SubscriberImpl > subscriber_
unsigned long SampleStateMask
DDS::SampleLostStatus sample_lost_status_
HANDLE_TYPE_NATIVE InstanceHandle_t
Mix-in class for DDS entities which directly use the transport layer.
WeakRcHandle< DataReaderImpl > data_reader_
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
WeakRcHandle< DomainParticipantImpl > participant_servant_
DDS::StatusMask listener_mask_
ACE_Recursive_Thread_Mutex statistics_lock_
size_t get_n_chunks() const
Priority publication_transport_priority_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Priority get_priority_value(const AssociationData &data) const
TypeSupportImpl * type_support_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
CancelCommand(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
Implements the DDS::TopicDescription interface.
Sequence number abstraction. Only allows positive 64 bit values.
ACE_Reactor_Timer_Interface * reactor_
~OwnershipManagerScopedAccess()
DDS::SampleRejectedStatus sample_rejected_status_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< DataReaderImpl > DataReaderImpl_rch
const SampleStateKind NOT_READ_SAMPLE_STATE
Stats< double > stats_
Latency statistics for the DataWriter to this DataReader.
EncodingKinds decoding_modes_
ACE_Recursive_Thread_Mutex publication_handle_lock_
Security::SecurityConfig_rch security_config_
RcHandle< T > lock() const
DDS::SubscriberQos subqos_
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
OwnershipManagerPtr ownership_manager()
DDS::DataReaderQos passed_qos_
DDS::LivelinessChangedStatus liveliness_changed_status_
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex instances_lock_
Keeps track of a DataWriter's liveliness for a DataReader.
CancelCommand(LivelinessTimer *timer)
Elements stored for managing statistical data.
TopicDescriptionPtr< MultiTopicImpl > multi_topic_
VarLess< DDS::ReadCondition > RCCompLess
The Internal API and Implementation of OpenDDS.
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
unsigned long ViewStateMask
ACE_Thread_Mutex content_filtered_topic_mutex_
ReadConditionSet read_conditions_
TopicDescriptionPtr< TopicImpl > topic_servant_
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
long liveliness_timer_id_
liveliness timer id; -1 if no timer is set
CORBA::Long last_deadline_missed_total_count_
PmfSporadicTask< DataReaderImpl > DRISporadicTask
const bool set_reader_status_
CheckLivelinessCommand(LivelinessTimer *timer)
sequence< string > StringSeq
bool is_exclusive_ownership_
ScheduleCommand(EndHistoricSamplesMissedSweeper *sweeper, WriterInfo_rch &info)
StatsMapType statistics_
Statistics for this reader, collected for each writer.
OwnershipManager * operator->() const
LivelinessTimer(ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader)
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
typedef OPENDDS_SET(NetworkAddress) AddrSet
const bool set_reader_status_