OpenDDS::DCPS::DataReaderImpl Class Reference

Implements the DDS::DataReader interface. More...

#include <DataReaderImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl:
Collaboration graph
[legend]

List of all members.

Classes

struct  GenericBundle
class  LivelinessTimer
class  OwnershipManagerPtr

Public Types

typedef std::pair
< PublicationId,
WriterInfo::WriterState
WriterStatePair

Public Member Functions

typedef OPENDDS_MAP (DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType
typedef OPENDDS_MAP_CMP (PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType
 Type of collection of statistics for writers to this reader.
 DataReaderImpl ()
virtual ~DataReaderImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual void add_association (const RepoId &yourId, const WriterAssociation &writer, bool active)
virtual void transport_assoc_done (int flags, const RepoId &remote_id)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const WriterIdSeq &writers, bool callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void inconsistent_topic ()
virtual void signal_liveliness (const RepoId &remote_participant)
DDS::DataReaderListener_ptr listener_for (DDS::StatusKind kind)
void writer_became_alive (WriterInfo &info, const ACE_Time_Value &when)
void writer_became_dead (WriterInfo &info, const ACE_Time_Value &when)
void writer_removed (WriterInfo &info)
virtual void cleanup ()
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
virtual DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
virtual DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t delete_contained_entities ()
virtual DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataReaderListener_ptr get_listener ()
virtual DDS::TopicDescription_ptr get_topicdescription ()
virtual DDS::Subscriber_ptr get_subscriber ()
virtual DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
virtual DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
virtual DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
virtual DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
virtual DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
virtual DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
virtual DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
virtual DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
virtual DDS::ReturnCode_t enable ()
virtual void get_latency_stats (OpenDDS::DCPS::LatencyStatisticsSeq &stats)
virtual void reset_latency_stats ()
 Clear any intermediate statistical values.
virtual CORBA::Boolean statistics_enabled ()
virtual void statistics_enabled (CORBA::Boolean statistics_enabled)
void writer_activity (const DataSampleHeader &header)
 update liveliness info for this writer.
virtual void data_received (const ReceivedDataSample &sample)
 process a message that has been received - could be control or a data sample.
virtual bool check_transport_qos (const TransportInst &inst)
RepoId get_subscription_id () const
bool have_sample_states (DDS::SampleStateMask sample_states) const
bool have_view_states (DDS::ViewStateMask view_states) const
bool have_instance_states (DDS::InstanceStateMask instance_states) const
bool contains_sample (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)=0
virtual void dds_demarshal (const ReceivedDataSample &sample, SubscriptionInstance_rch &instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type)=0
virtual void dispose_unregister (const ReceivedDataSample &sample, SubscriptionInstance_rch &instance)
void process_latency (const ReceivedDataSample &sample)
void notify_latency (PublicationId writer)
CORBA::Long get_depth () const
size_t get_n_chunks () const
void liveliness_lost ()
void remove_all_associations ()
void notify_subscription_disconnected (const WriterIdSeq &pubids)
void notify_subscription_reconnected (const WriterIdSeq &pubids)
void notify_subscription_lost (const WriterIdSeq &pubids)
void notify_liveliness_change ()
bool is_bit () const
bool has_zero_copies ()
void release_instance (DDS::InstanceHandle_t handle)
 Release the instance with the handle.
void reschedule_deadline ()
ACE_Reactor_Timer_Interfaceget_reactor ()
RepoId get_topic_id ()
RepoId get_dp_id ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
typedef OPENDDS_VECTOR (WriterStatePair) WriterStatePairVec
void get_writer_states (WriterStatePairVec &writer_states)
void update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength)
OwnershipManagerPtr ownership_manager ()
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)=0
void enable_filtering (ContentFilteredTopicImpl *cft)
DDS::ContentFilteredTopic_ptr get_cf_topic () const
void update_subscription_params (const DDS::StringSeq &params) const
typedef OPENDDS_VECTOR (void *) GenericSeq
virtual DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)=0
virtual DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual DDS::InstanceHandle_t lookup_instance_generic (const void *data)=0
virtual DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state)=0
void begin_access ()
void end_access ()
void get_ordered_data (GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void accept_coherent (PublicationId &writer_id, RepoId &publisher_id)
void reject_coherent (PublicationId &writer_id, RepoId &publisher_id)
void coherent_change_received (RepoId publisher_id, Coherent_State &result)
void coherent_changes_completed (DataReaderImpl *reader)
void reset_coherent_info (const PublicationId &writer_id, const RepoId &publisher_id)
void set_subscriber_qos (const DDS::SubscriberQos &qos)
void reset_ownership (DDS::InstanceHandle_t instance)
virtual RcHandle< EntityImplparent () const
void disable_transport ()
virtual void register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *)
virtual void unregister_for_writer (const RepoId &, const RepoId &, const RepoId &)
Raw Latency Statistics Interfaces



const StatsMapType & raw_latency_statistics () const
 Expose the statistics container.
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer.
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer.

Protected Types

typedef ACE_Reverse_Lock
< ACE_Recursive_Thread_Mutex
Reverse_Lock_t

Protected Member Functions

virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
void remove_publication (const PublicationId &pub_id)
void prepare_to_delete ()
RcHandle< SubscriberImplget_subscriber_servant ()
void post_read_or_take ()
virtual DDS::ReturnCode_t enable_specific ()=0
void sample_info (DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
CORBA::Long total_samples () const
void set_sample_lost_status (const DDS::SampleLostStatus &status)
void set_sample_rejected_status (const DDS::SampleRejectedStatus &status)
SubscriptionInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
DDS::InstanceHandle_t get_next_handle (const DDS::BuiltinTopicKey_t &key)
virtual void purge_data (SubscriptionInstance_rch instance)=0
virtual void release_instance_i (DDS::InstanceHandle_t handle)=0
bool has_readcondition (DDS::ReadCondition_ptr a_condition)
bool filter_sample (const DataSampleHeader &header)
bool ownership_filter_instance (const SubscriptionInstance_rch &instance, const PublicationId &pubid)
bool time_based_filter_instance (const SubscriptionInstance_rch &instance, ACE_Time_Value &filter_time_expired)
void accept_sample_processing (const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
virtual void qos_change (const DDS::DataReaderQos &qos)
void notify_read_conditions ()
 Data has arrived into the cache, unblock waiting ReadConditions.
virtual void add_link (const DataLink_rch &link, const RepoId &peer)

Protected Attributes

SubscriptionInstanceMapType instances_
 : document why the instances_ container is mutable.
ACE_Recursive_Thread_Mutex instances_lock_
unique_ptr< ReceivedDataAllocatorrd_allocator_
DDS::DataReaderQos qos_
DDS::SampleRejectedStatus sample_rejected_status_
DDS::SampleLostStatus sample_lost_status_
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses.
Reverse_Lock_t reverse_sample_lock_
WeakRcHandle
< DomainParticipantImpl
participant_servant_
TopicDescriptionPtr< TopicImpltopic_servant_
bool is_exclusive_ownership_
TopicDescriptionPtr
< ContentFilteredTopicImpl
content_filtered_topic_
bool coherent_
 Is accessing to Group coherent changes ?
GroupRakeData group_coherent_ordered_data_
 Ordered group samples.
DDS::SubscriberQos subqos_

Private Types

typedef VarLess
< DDS::ReadCondition
RCCompLess

Private Member Functions

void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
void lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids.
void instances_liveliness_update (WriterInfo &info, const ACE_Time_Value &when)
bool verify_coherent_changes_completion (WriterInfo *writer)
bool coherent_change_received (WriterInfo *writer)
const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
Priority get_priority_value (const AssociationData &data) const
void resume_sample_processing (const PublicationId &pub_id)
 when done handling historic samples, resume
bool check_historic (const ReceivedDataSample &sample)
void deliver_historic (OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
 deliver samples that were held by check_historic()
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (PublicationId, RcHandle< WriterInfo >, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader.
typedef OPENDDS_SET_CMP (DDS::ReadCondition_var, RCCompLess) ReadConditionSet

Private Attributes

DDS::TopicDescription_var topic_desc_
DDS::StatusMask listener_mask_
DDS::DataReaderListener_var listener_
DDS::DomainId_t domain_id_
RepoId dp_id_
WeakRcHandle< SubscriberImplsubscriber_servant_
RcHandle
< EndHistoricSamplesMissedSweeper
end_historic_sweeper_
RcHandle
< RemoveAssociationSweeper
< DataReaderImpl > > 
remove_association_sweeper_
CORBA::Long depth_
size_t n_chunks_
ACE_Recursive_Thread_Mutex publication_handle_lock_
Reverse_Lock_t reverse_pub_handle_lock_
RepoIdToHandleMap id_to_handle_map_
DDS::LivelinessChangedStatus liveliness_changed_status_
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
DDS::SubscriptionMatchedStatus subscription_match_status_
BudgetExceededStatus budget_exceeded_status_
SubscriptionLostStatus subscription_lost_status_
ACE_Reactor_Timer_Interfacereactor_
RcHandle< LivelinessTimerliveliness_timer_
CORBA::Long last_deadline_missed_total_count_
RcHandle
< RequestedDeadlineWatchdog
watchdog_
bool is_bit_
bool always_get_history_
bool statistics_enabled_
 Flag indicating status of statistics gathering.
WriterMapType writers_
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications.
StatsMapType statistics_
 Statistics for this reader, collected for each writer.
unsigned int raw_latency_buffer_size_
 Bound (or initial reservation) of raw latency buffer.
DataCollector< double >::OnFull raw_latency_buffer_type_
 Type of raw latency data buffer.
ReadConditionSet read_conditions_
Monitormonitor_
 Monitor object for this entity.
Monitorperiodic_monitor_
 Periodic Monitor object for this entity.
bool transport_disabled_

Friends

class RequestedDeadlineWatchdog
class QueryConditionImpl
class SubscriberImpl
class OwnershipManagerPtr
class InstanceState
class EndHistoricSamplesMissedSweeper
class RemoveAssociationSweeper< DataReaderImpl >
class ::DDS_TEST

Detailed Description

Implements the DDS::DataReader interface.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datareader which is specific to the data-type associated with the topic.

Definition at line 190 of file DataReaderImpl.h.


Member Typedef Documentation

Definition at line 852 of file DataReaderImpl.h.

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 629 of file DataReaderImpl.h.

Definition at line 434 of file DataReaderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataReaderImpl::DataReaderImpl (  ) 

Definition at line 53 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, budget_exceeded_status_, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, DDS::SampleRejectedStatus::last_instance_handle, DDS::RequestedDeadlineMissedStatus::last_instance_handle, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::LivelinessChangedStatus::last_publication_handle, DDS::SampleRejectedStatus::last_reason, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::NOT_REJECTED, periodic_monitor_, DDS::RequestedIncompatibleQosStatus::policies, reactor_, requested_deadline_missed_status_, requested_incompatible_qos_status_, sample_lost_status_, sample_rejected_status_, subscription_match_status_, TheServiceParticipant, OpenDDS::DCPS::BudgetExceededStatus::total_count, DDS::SampleRejectedStatus::total_count, DDS::SampleLostStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count, OpenDDS::DCPS::BudgetExceededStatus::total_count_change, DDS::SampleRejectedStatus::total_count_change, DDS::SampleLostStatus::total_count_change, DDS::SubscriptionMatchedStatus::total_count_change, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::RequestedDeadlineMissedStatus::total_count_change.

00054 : qos_(TheServiceParticipant->initial_DataReaderQos()),
00055   reverse_sample_lock_(sample_lock_),
00056   topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058   is_exclusive_ownership_ (false),
00059 #endif
00060     coherent_(false),
00061     subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00062     topic_desc_(0),
00063     listener_mask_(DEFAULT_STATUS_MASK),
00064     domain_id_(0),
00065     end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00066     remove_association_sweeper_(make_rch<RemoveAssociationSweeper<DataReaderImpl> >(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00067     n_chunks_(TheServiceParticipant->n_chunks()),
00068     reverse_pub_handle_lock_(publication_handle_lock_),
00069     reactor_(0),
00070     liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071     last_deadline_missed_total_count_(0),
00072     is_bit_(false),
00073     always_get_history_(false),
00074     statistics_enabled_(false),
00075     raw_latency_buffer_size_(0),
00076     raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00077     monitor_(0),
00078     periodic_monitor_(0),
00079     transport_disabled_(false)
00080 {
00081   reactor_ = TheServiceParticipant->timer();
00082 
00083   liveliness_changed_status_.alive_count = 0;
00084   liveliness_changed_status_.not_alive_count = 0;
00085   liveliness_changed_status_.alive_count_change = 0;
00086   liveliness_changed_status_.not_alive_count_change = 0;
00087   liveliness_changed_status_.last_publication_handle =
00088       DDS::HANDLE_NIL;
00089 
00090   requested_deadline_missed_status_.total_count = 0;
00091   requested_deadline_missed_status_.total_count_change = 0;
00092   requested_deadline_missed_status_.last_instance_handle =
00093       DDS::HANDLE_NIL;
00094 
00095   requested_incompatible_qos_status_.total_count = 0;
00096   requested_incompatible_qos_status_.total_count_change = 0;
00097   requested_incompatible_qos_status_.last_policy_id = 0;
00098   requested_incompatible_qos_status_.policies.length(0);
00099 
00100   subscription_match_status_.total_count = 0;
00101   subscription_match_status_.total_count_change = 0;
00102   subscription_match_status_.current_count = 0;
00103   subscription_match_status_.current_count_change = 0;
00104   subscription_match_status_.last_publication_handle =
00105       DDS::HANDLE_NIL;
00106 
00107   sample_lost_status_.total_count = 0;
00108   sample_lost_status_.total_count_change = 0;
00109 
00110   sample_rejected_status_.total_count = 0;
00111   sample_rejected_status_.total_count_change = 0;
00112   sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00113   sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00114 
00115   this->budget_exceeded_status_.total_count = 0;
00116   this->budget_exceeded_status_.total_count_change = 0;
00117   this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00118 
00119   monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00120   periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00121 }

OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl (  )  [virtual]

Definition at line 125 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, ownership_manager(), and topic_servant_.

00126 {
00127   DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00128 
00129 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00130   OwnershipManagerPtr owner_manager = this->ownership_manager();
00131   if (owner_manager) {
00132     owner_manager->unregister_reader(topic_servant_->type_name(), this);
00133   }
00134 #endif
00135 
00136 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::DataReaderImpl::accept_coherent ( PublicationId writer_id,
RepoId publisher_id 
)

Definition at line 2934 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instances_, instances_lock_, LM_DEBUG, OPENDDS_STRING, sample_lock_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

Referenced by verify_coherent_changes_completion().

02936 {
02937   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02938     GuidConverter reader (this->subscription_id_);
02939     GuidConverter writer (writer_id);
02940     GuidConverter publisher (publisher_id);
02941     ACE_DEBUG((LM_DEBUG,
02942         ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
02943         ACE_TEXT(" reader %C writer %C publisher %C \n"),
02944         OPENDDS_STRING(reader).c_str(),
02945         OPENDDS_STRING(writer).c_str(),
02946         OPENDDS_STRING(publisher).c_str()));
02947   }
02948 
02949   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02950   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02951 
02952   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02953       iter != this->instances_.end(); ++iter) {
02954     iter->second->rcvd_strategy_->accept_coherent(
02955         writer_id, publisher_id);
02956   }
02957 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::accept_sample_processing ( const SubscriptionInstance_rch instance,
const DataSampleHeader header,
bool  is_new_instance 
) [protected]

Definition at line 3259 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER, ACE_OS::gettimeofday(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, OpenDDS::DCPS::RcHandle< T >::in(), LM_WARNING, notify_read_conditions(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, reverse_sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, verify_coherent_changes_completion(), watchdog_, writers_, and writers_lock_.

Referenced by data_received().

03260 {
03261   bool accepted = true;
03262 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03263   bool verify_coherent = false;
03264 #endif
03265   RcHandle<WriterInfo> writer;
03266 
03267   if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
03268     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
03269 
03270     WriterMapType::iterator where = writers_.find(header.publication_id_);
03271 
03272     if (where != writers_.end()) {
03273       if (header.coherent_change_) {
03274 
03275 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03276         // Received coherent change
03277         where->second->group_coherent_ = header.group_coherent_;
03278         where->second->publisher_id_ = header.publisher_id_;
03279         ++where->second->coherent_samples_;
03280         verify_coherent = true;
03281 #endif
03282         writer = where->second;
03283       }
03284     }
03285     else {
03286       GuidConverter subscriptionBuffer(subscription_id_);
03287       GuidConverter publicationBuffer(header.publication_id_);
03288       ACE_DEBUG((LM_WARNING,
03289         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
03290         ACE_TEXT("subscription %C failed to find ")
03291         ACE_TEXT("publication data for %C.\n"),
03292         OPENDDS_STRING(subscriptionBuffer).c_str(),
03293         OPENDDS_STRING(publicationBuffer).c_str()));
03294     }
03295   }
03296 
03297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
03298   if (verify_coherent) {
03299     accepted = verify_coherent_changes_completion(writer.in());
03300   }
03301 #endif
03302 
03303   if (instance && watchdog_.in()) {
03304     instance->last_sample_tv_ = instance->cur_sample_tv_;
03305     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
03306 
03307     // Watchdog can't be called with sample_lock_ due to reactor deadlock
03308     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03309     if (is_new_instance) {
03310       watchdog_->schedule_timer(instance);
03311     } else {
03312       watchdog_->execute(instance, false);
03313     }
03314   }
03315 
03316   if (accepted) {
03317     notify_read_conditions();
03318   }
03319 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::add_association ( const RepoId yourId,
const WriterAssociation writer,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 207 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, qos_, raw_latency_buffer_size_, raw_latency_buffer_type_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataWriterQos::transport_priority, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

00210 {
00211   if (DCPS_debug_level) {
00212     GuidConverter reader_converter(yourId);
00213     GuidConverter writer_converter(writer.writerId);
00214     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00215         ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00216         OPENDDS_STRING(reader_converter).c_str(),
00217         OPENDDS_STRING(writer_converter).c_str()));
00218   }
00219 
00220   if (entity_deleted_.value()) {
00221     if (DCPS_debug_level) {
00222       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00223           ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00224     }
00225     return;
00226   }
00227 
00228   // We are being called back from the repository before we are done
00229   // processing after our call to the repository that caused this call
00230   // (from the repository) to be made.
00231   if (GUID_UNKNOWN == subscription_id_) {
00232     subscription_id_ = yourId;
00233   }
00234 
00235   //Why do we need the publication_handle_lock_ here?  No access to id_to_handle_map_...
00236   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00237 
00238 
00239   // For each writer in the list of writers to associate with, we
00240   // create a WriterInfo and a WriterStats object and store them in
00241   // our internal maps.
00242   //
00243   {
00244 
00245     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00246 
00247     const PublicationId& writer_id = writer.writerId;
00248     RcHandle<WriterInfo> info = make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos);
00249     std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00250         // This insertion is idempotent.
00251         WriterMapType::value_type(
00252           writer_id,
00253           info));
00254 
00255       // Schedule timer if necessary
00256       //   - only need to check reader qos - we know the writer must be >= reader
00257       if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00258         info->waiting_for_end_historic_samples_ = true;
00259       }
00260 
00261       this->statistics_.insert(
00262         StatsMapType::value_type(
00263             writer_id,
00264             WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00265 
00266     // If this is a durable reader
00267     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00268       // TODO schedule timer for removing flag from writers
00269     }
00270 
00271     if (DCPS_debug_level > 4) {
00272       GuidConverter converter(writer_id);
00273       ACE_DEBUG((LM_DEBUG,
00274           "(%P|%t) DataReaderImpl::add_association: "
00275           "inserted writer %C.return %d \n",
00276           OPENDDS_STRING(converter).c_str(), bpair.second));
00277 
00278       WriterMapType::iterator iter = writers_.find(writer_id);
00279       if (iter != writers_.end()) {
00280         // This may not be an error since it could happen that the sample
00281         // is delivered to the datareader after the write is dis-associated
00282         // with this datareader.
00283         GuidConverter reader_converter(subscription_id_);
00284         GuidConverter writer_converter(writer_id);
00285         ACE_DEBUG((LM_DEBUG,
00286             ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00287             ACE_TEXT("reader %C is associated with writer %C.\n"),
00288             OPENDDS_STRING(reader_converter).c_str(),
00289             OPENDDS_STRING(writer_converter).c_str()));
00290       }
00291     }
00292   }
00293 
00294   // Propagate the add_associations processing down into the Transport
00295   // layer here.  This will establish the transport support and reserve
00296   // usage of an existing connection or initiate creation of a new
00297   // connection if no suitable connection is available.
00298   AssociationData data;
00299   data.remote_id_ = writer.writerId;
00300   data.remote_data_ = writer.writerTransInfo;
00301   data.publication_transport_priority_ =
00302       writer.writerQos.transport_priority.value;
00303   data.remote_reliable_ =
00304       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00305   data.remote_durable_ =
00306       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00307 
00308   //Do not hold publication_handle_lock_ when calling associate due to possible reactor
00309   //deadlock on passive side completion
00310   //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_
00311   //is held here anyway
00312   guard.release();
00313 
00314   if (!associate(data, active)) {
00315     if (DCPS_debug_level) {
00316       ACE_ERROR((LM_ERROR,
00317           ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00318           ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00319     }
00320   }
00321 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::add_link ( const DataLink_rch link,
const RepoId peer 
) [protected, virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 3219 of file DataReaderImpl.cpp.

References DDS::DataReaderQos::durability, end_historic_sweeper_, OPENDDS_STRING, qos_, resume_sample_processing(), OpenDDS::DCPS::TransportImpl::transport_type(), DDS::VOLATILE_DURABILITY_QOS, writers_, and writers_lock_.

03220 {
03221   if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03222 
03223     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03224 
03225     WriterMapType::iterator it = writers_.find(peer);
03226     if (it != writers_.end()) {
03227       // Schedule timer if necessary
03228       //   - only need to check reader qos - we know the writer must be >= reader
03229       end_historic_sweeper_->schedule_timer(it->second);
03230     }
03231   }
03232   TransportClient::add_link(link, peer);
03233   TransportImpl& impl = link->impl();
03234   OPENDDS_STRING type = impl.transport_type();
03235 
03236   if (type == "rtps_udp" || type == "multicast") {
03237     resume_sample_processing(peer);
03238   }
03239 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 440 of file DataReaderImpl.cpp.

00441 {
00442   // For the current DCPSInfoRepo implementation, the DataReader side will
00443   // always be passive, so association_complete() will not be called.
00444 }

void OpenDDS::DCPS::DataReaderImpl::begin_access (  ) 

Definition at line 3078 of file DataReaderImpl.cpp.

References coherent_, and sample_lock_.

03079 {
03080   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03081   this->coherent_ = true;
03082 }

bool OpenDDS::DCPS::DataReaderImpl::check_historic ( const ReceivedDataSample sample  )  [private]

collect samples received before END_HISTORIC_SAMPLES returns false if normal processing of this sample should be skipped

Definition at line 3188 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), writers_, and writers_lock_.

Referenced by data_received().

03189 {
03190   ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03191   WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03192   if (iter != writers_.end()) {
03193     const SequenceNumber& seq = sample.header_.sequence_;
03194     if (iter->second->waiting_for_end_historic_samples_) {
03195       iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03196       return false;
03197     }
03198     if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03199         && !sample.header_.historic_sample_
03200         && seq <= iter->second->last_historic_seq_) {
03201       // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
03202       return false;
03203     }
03204   }
03205   return true;
03206 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 1647 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.

01648 {
01649   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01650     return ti.is_reliable();
01651   }
01652   return true;
01653 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::cleanup ( void   )  [virtual]

Definition at line 140 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::NO_STATUS_MASK, and set_listener().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().

00141 {
00142   // As first step set our listener to nill which will prevent us from calling
00143   // back onto the listener at the moment the related DDS entity has been
00144   // deleted
00145   set_listener(0, NO_STATUS_MASK);
00146 
00147 
00148 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( WriterInfo writer  )  [private]
void OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( RepoId  publisher_id,
Coherent_State result 
)

Definition at line 3004 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, state, writers_, and writers_lock_.

03005 {
03006   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03007 
03008   result = COMPLETED;
03009   for (WriterMapType::iterator iter = writers_.begin();
03010       iter != writers_.end();
03011       ++iter) {
03012 
03013     if (iter->second->publisher_id_ == publisher_id) {
03014       const Coherent_State state = iter->second->coherent_change_received();
03015       if (state == NOT_COMPLETED_YET) {
03016         result = NOT_COMPLETED_YET;
03017         break;
03018       }
03019       else if (state == REJECTED) {
03020         result = REJECTED;
03021       }
03022     }
03023   }
03024 }

void OpenDDS::DCPS::DataReaderImpl::coherent_changes_completed ( DataReaderImpl reader  ) 

Definition at line 3028 of file DataReaderImpl.cpp.

References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), reverse_sample_lock_, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

Referenced by verify_coherent_changes_completion().

03029 {
03030   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
03031   if (!subscriber)
03032     return;
03033 
03034   subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03035   this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03036 
03037   ::DDS::SubscriberListener_var sub_listener =
03038       subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
03039   if (!CORBA::is_nil(sub_listener.in()))
03040   {
03041     if (reader == this) {
03042       // Release the sample_lock before listener callback.
03043       ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03044       sub_listener->on_data_on_readers(subscriber.in());
03045     }
03046 
03047     this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03048     subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03049   }
03050   else
03051   {
03052     subscriber->notify_status_condition();
03053 
03054     ::DDS::DataReaderListener_var listener =
03055         this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03056 
03057     if (!CORBA::is_nil(listener.in()))
03058     {
03059       if (reader == this) {
03060         // Release the sample_lock before listener callback.
03061         ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03062         listener->on_data_available(this);
03063       } else {
03064         listener->on_data_available(this);
03065       }
03066 
03067       set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03068       subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03069     }
03070     else
03071     {
03072       this->notify_status_condition();
03073     }
03074   }
03075 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::contains_sample ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Fold-in the three separate loops of have_sample_states(), have_view_states(), and have_instance_states(). Takes the sample_lock_.

Definition at line 1750 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, instances_lock_, item(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, sample_lock_, and OpenDDS::DCPS::InstanceState::view_state().

Referenced by OpenDDS::DCPS::ReadConditionImpl::get_trigger_value().

01752 {
01753   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01754   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01755 
01756   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01757       end = instances_.end(); iter != end; ++iter) {
01758     SubscriptionInstance& inst = *iter->second;
01759 
01760     if ((inst.instance_state_.view_state() & view_states) &&
01761         (inst.instance_state_.instance_state() & instance_states)) {
01762       for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01763           item = item->next_data_sample_) {
01764         if (item->sample_state_ & sample_states
01765 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01766             && !item->coherent_change_
01767 #endif
01768         ) {
01769           return true;
01770         }
01771       }
01772     }
01773   }
01774 
01775   return false;
01776 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual bool OpenDDS::DCPS::DataReaderImpl::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const FilterEvaluator evaluator,
const DDS::StringSeq params 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by OpenDDS::DCPS::QueryConditionImpl::get_trigger_value().

Here is the caller graph for this function:

DDS::QueryCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_querycondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const char *  query_expression,
const DDS::StringSeq query_parameters 
) [virtual]

Definition at line 793 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, QueryConditionImpl, read_conditions_, DDS::RETCODE_OK, and sample_lock_.

00799 {
00800   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00801   try {
00802     DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00803         view_states, instance_states, query_expression);
00804     if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
00805       return 0;
00806     }
00807     DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00808     read_conditions_.insert(rc);
00809     return qc._retn();
00810   } catch (const std::exception& e) {
00811     if (DCPS_debug_level) {
00812       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
00813           ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00814           e.what()));
00815     }
00816   }
00817   return 0;
00818 }

Here is the call graph for this function:

DDS::ReadCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_readcondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [virtual]

Definition at line 780 of file DataReaderImpl.cpp.

References read_conditions_, and sample_lock_.

00784 {
00785   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00786   DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00787       view_states, instance_states);
00788   read_conditions_.insert(rc);
00789   return rc._retn();
00790 }

void OpenDDS::DCPS::DataReaderImpl::data_received ( const ReceivedDataSample sample  )  [virtual]

process a message that has been received - could be control or a data sample.

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 1354 of file DataReaderImpl.cpp.

References accept_sample_processing(), ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, check_historic(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_demarshal(), OpenDDS::DCPS::DISPOSE_INSTANCE, dispose_unregister(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, dup(), OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::END_HISTORIC_SAMPLES, filter_sample(), OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::EntityImpl::get_deleted(), get_repo_id(), get_subscriber_servant(), OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::INSTANCE_REGISTRATION, instances_, instances_lock_, is_exclusive_ownership_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, notify_read_conditions(), OPENDDS_STRING, ownership_manager(), process_latency(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::RcHandle< T >::reset(), resume_sample_processing(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, verify_coherent_changes_completion(), watchdog_, writer_activity(), writers_, and writers_lock_.

Referenced by deliver_historic().

01355 {
01356   DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01357 
01358   // ensure some other thread is not changing the sample container
01359   // or statuses related to samples.
01360   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01361 
01362   if (get_deleted()) return;
01363 
01364   if (DCPS_debug_level > 9) {
01365     GuidConverter converter(subscription_id_);
01366     ACE_DEBUG((LM_DEBUG,
01367         ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01368         ACE_TEXT("%C received sample: %C.\n"),
01369         OPENDDS_STRING(converter).c_str(),
01370         to_string(sample.header_).c_str()));
01371   }
01372 
01373   switch (sample.header_.message_id_) {
01374   case SAMPLE_DATA:
01375   case INSTANCE_REGISTRATION: {
01376     if (!check_historic(sample)) break;
01377 
01378     DataSampleHeader const & header = sample.header_;
01379 
01380     this->writer_activity(header);
01381 
01382     // Verify data has not exceeded its lifespan.
01383     if (this->filter_sample(header)) break;
01384 
01385     // This adds the reader to the set/list of readers with data.
01386     RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01387     if (subscriber)
01388       subscriber->data_received(this);
01389 
01390     // Only gather statistics about real samples, not registration data, etc.
01391     if (header.message_id_ == SAMPLE_DATA) {
01392       this->process_latency(sample);
01393     }
01394 
01395     // This also adds to the sample container and makes any callbacks
01396     // and condition modifications.
01397 
01398     SubscriptionInstance_rch instance;
01399     bool is_new_instance = false;
01400     bool filtered = false;
01401     if (sample.header_.key_fields_only_) {
01402       dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01403     } else {
01404       dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01405     }
01406 
01407     // Per sample logging
01408     if (DCPS_debug_level >= 8) {
01409       GuidConverter reader_converter(subscription_id_);
01410       GuidConverter writer_converter(header.publication_id_);
01411 
01412       ACE_DEBUG ((LM_DEBUG,
01413           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01414           ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01415           OPENDDS_STRING(reader_converter).c_str(),
01416           OPENDDS_STRING(writer_converter).c_str(),
01417           instance ? instance->instance_handle_ : 0,
01418               is_new_instance, filtered));
01419     }
01420 
01421     if (filtered) break; // sample filtered from instance
01422 
01423     if (instance) accept_sample_processing(instance, header, is_new_instance);
01424   }
01425   break;
01426 
01427 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01428   case END_COHERENT_CHANGES: {
01429     CoherentChangeControl control;
01430 
01431     this->writer_activity(sample.header_);
01432 
01433     Serializer serializer(
01434         sample.sample_.get(), sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01435     if (!(serializer >> control)) {
01436       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01437           ACE_TEXT("deserialization coherent change control failed.\n")));
01438       return;
01439     }
01440 
01441     if (DCPS_debug_level > 0) {
01442       std::stringstream buffer;
01443       buffer << control << std::endl;
01444 
01445       ACE_DEBUG((LM_DEBUG,
01446           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01447           ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01448           buffer.str().c_str()));
01449     }
01450 
01451     RcHandle<WriterInfo> writer;
01452     {
01453       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01454 
01455       WriterMapType::iterator it =
01456           this->writers_.find(sample.header_.publication_id_);
01457 
01458       if (it == this->writers_.end()) {
01459         GuidConverter sub_id(this->subscription_id_);
01460         GuidConverter pub_id(sample.header_.publication_id_);
01461         ACE_DEBUG((LM_WARNING,
01462             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01463             ACE_TEXT(" subscription %C failed to find ")
01464             ACE_TEXT(" publication data for %C!\n"),
01465             OPENDDS_STRING(sub_id).c_str(),
01466             OPENDDS_STRING(pub_id).c_str()));
01467         return;
01468       }
01469       else {
01470         writer = it->second;
01471       }
01472       it->second->set_group_info (control);
01473     }
01474 
01475     if (this->verify_coherent_changes_completion(writer.in())) {
01476       this->notify_read_conditions();
01477     }
01478   }
01479   break;
01480 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01481 
01482   case DATAWRITER_LIVELINESS: {
01483     if (DCPS_debug_level >= 4) {
01484       GuidConverter reader_converter(subscription_id_);
01485       GuidConverter writer_converter(sample.header_.publication_id_);
01486       ACE_DEBUG((LM_DEBUG,
01487                  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01488                  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01489                  OPENDDS_STRING(reader_converter).c_str(),
01490                  OPENDDS_STRING(writer_converter).c_str()));
01491     }
01492     this->writer_activity(sample.header_);
01493 
01494     // tell all instances they got a liveliness message
01495     {
01496       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01497       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01498           iter != instances_.end();
01499           ++iter) {
01500         iter->second->instance_state_.lively(sample.header_.publication_id_);
01501       }
01502     }
01503 
01504   }
01505   break;
01506 
01507   case DISPOSE_INSTANCE: {
01508     if (!check_historic(sample)) break;
01509     this->writer_activity(sample.header_);
01510     SubscriptionInstance_rch instance;
01511 
01512     if (this->watchdog_.in()) {
01513       // Find the instance first for timer cancellation since
01514       // the instance may be deleted during dispose and can
01515       // not be accessed.
01516       ReceivedDataSample dup(sample);
01517       this->lookup_instance(dup, instance);
01518 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01519       OwnershipManagerPtr owner_manager = this->ownership_manager();
01520 
01521       if (! this->is_exclusive_ownership_
01522           || (owner_manager
01523               && (instance)
01524               && (owner_manager->is_owner (instance->instance_handle_,
01525                   sample.header_.publication_id_)))) {
01526 #endif
01527         this->watchdog_->cancel_timer(instance);
01528 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01529       }
01530 #endif
01531     }
01532     instance.reset();
01533     this->dispose_unregister(sample, instance);
01534   }
01535   this->notify_read_conditions();
01536   break;
01537 
01538   case UNREGISTER_INSTANCE: {
01539     if (!check_historic(sample)) break;
01540     this->writer_activity(sample.header_);
01541     SubscriptionInstance_rch instance;
01542 
01543     if (this->watchdog_.in()) {
01544       // Find the instance first for timer cancellation since
01545       // the instance may be deleted during dispose and can
01546       // not be accessed.
01547       ReceivedDataSample dup(sample);
01548       this->lookup_instance(dup, instance);
01549       if (instance) {
01550 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01551         if (! this->is_exclusive_ownership_
01552             || (this->is_exclusive_ownership_
01553                 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01554 #endif
01555           this->watchdog_->cancel_timer(instance);
01556 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01557         }
01558 #endif
01559       }
01560     }
01561     instance.reset();
01562     this->dispose_unregister(sample, instance);
01563   }
01564   this->notify_read_conditions();
01565   break;
01566 
01567   case DISPOSE_UNREGISTER_INSTANCE: {
01568     if (!check_historic(sample)) break;
01569     this->writer_activity(sample.header_);
01570     SubscriptionInstance_rch instance;
01571 
01572     if (this->watchdog_.in()) {
01573       // Find the instance first for timer cancellation since
01574       // the instance may be deleted during dispose and can
01575       // not be accessed.
01576       ReceivedDataSample dup(sample);
01577       this->lookup_instance(dup, instance);
01578 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01579       OwnershipManagerPtr owner_manager = this->ownership_manager();
01580       if (! this->is_exclusive_ownership_
01581           || (owner_manager
01582               && (instance)
01583               && (owner_manager->is_owner (instance->instance_handle_,
01584                   sample.header_.publication_id_)))
01585                   || (this->is_exclusive_ownership_
01586                       && (instance)
01587                       && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01588 #endif
01589         if (instance) {
01590           this->watchdog_->cancel_timer(instance);
01591         }
01592 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01593       }
01594 #endif
01595     }
01596     instance.reset();
01597     this->dispose_unregister(sample, instance);
01598   }
01599   this->notify_read_conditions();
01600   break;
01601 
01602   case END_HISTORIC_SAMPLES: {
01603     if (sample.header_.message_length_ >= sizeof(RepoId)) {
01604       Serializer ser(sample.sample_.get());
01605       RepoId readerId = GUID_UNKNOWN;
01606       if (!(ser >> readerId)) {
01607         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
01608             ACE_TEXT("deserialization reader failed.\n")));
01609         return;
01610       }
01611       if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01612         break; // not our message
01613       }
01614     }
01615     if (DCPS_debug_level > 4) {
01616       ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01617     }
01618     // Going to acquire writers lock, release samples lock
01619     guard.release();
01620     this->resume_sample_processing(sample.header_.publication_id_);
01621     if (DCPS_debug_level > 4) {
01622       GuidConverter pub_id(sample.header_.publication_id_);
01623       ACE_DEBUG((
01624           LM_INFO,
01625           "(%P|%t) Resumed sample processing for durable writer %C\n",
01626           OPENDDS_STRING(pub_id).c_str()));
01627     }
01628     break;
01629   }
01630 
01631   default:
01632     ACE_ERROR((LM_ERROR,
01633         "(%P|%t) ERROR: DataReaderImpl::data_received"
01634         "unexpected message_id = %d\n",
01635         sample.header_.message_id_));
01636     break;
01637   }
01638 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::DataReaderImpl::dds_demarshal ( const ReceivedDataSample sample,
SubscriptionInstance_rch instance,
bool &  is_new_instance,
bool &  filtered,
MarshalingType  marshaling_type 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by data_received().

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_contained_entities (  )  [virtual]

Implements DDS::DataReader.

Definition at line 838 of file DataReaderImpl.cpp.

References read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and sample_lock_.

00839 {
00840   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00841       DDS::RETCODE_OUT_OF_RESOURCES);
00842   read_conditions_.clear();
00843   return DDS::RETCODE_OK;
00844 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_readcondition ( DDS::ReadCondition_ptr  a_condition  )  [virtual]

Definition at line 828 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, and sample_lock_.

00830 {
00831   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00832       DDS::RETCODE_OUT_OF_RESOURCES);
00833   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00834   return read_conditions_.erase(rc)
00835       ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00836 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::deliver_historic ( OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&  samples  )  [private]

deliver samples that were held by check_historic()

Definition at line 3208 of file DataReaderImpl.cpp.

References data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, and OPENDDS_MAP().

Referenced by resume_sample_processing().

03209 {
03210   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03211   const iter_t end = samples.end();
03212   for (iter_t iter = samples.begin(); iter != end; ++iter) {
03213     iter->second.header_.historic_sample_ = true;
03214     data_received(iter->second);
03215   }
03216 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataReaderImpl::disable_transport (  ) 

Definition at line 33 of file DataReaderImpl.inl.

References transport_disabled_.

Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr().

00034 {
00035   this->transport_disabled_ = true;
00036 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::dispose_unregister ( const ReceivedDataSample sample,
SubscriptionInstance_rch instance 
) [virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Definition at line 2263 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

Referenced by data_received().

02265 {
02266   if (DCPS_debug_level > 0) {
02267     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02268   }
02269 }

Here is the caller graph for this function:

DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 673 of file DataReaderImpl.h.

00673 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 1136 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::connection_info(), content_filtered_topic_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::deadline, depth_, domain_id_, dp_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_subscriber_servant(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::DataReaderQos::history, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataReaderQos::liveliness, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, n_chunks_, DDS::Duration_t::nanosec, participant_servant_, qos_, rd_allocator_, OpenDDS::DCPS::ref(), DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), requested_deadline_missed_status_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataReaderQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, sample_lock_, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, transport_disabled_, DDS::VOLATILE_DURABILITY_QOS, and watchdog_.

Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), and OpenDDS::DCPS::SubscriberImpl::create_datareader().

01137 {
01138   //According spec:
01139   // - Calling enable on an already enabled Entity returns OK and has no
01140   // effect.
01141   // - Calling enable on an Entity whose factory is not enabled will fail
01142   // and return PRECONDITION_NOT_MET.
01143 
01144   if (this->is_enabled()) {
01145     return DDS::RETCODE_OK;
01146   }
01147 
01148   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01149   if (!subscriber) {
01150     return DDS::RETCODE_ERROR;
01151   }
01152 
01153   if (!subscriber->is_enabled()) {
01154     return DDS::RETCODE_PRECONDITION_NOT_MET;
01155   }
01156 
01157   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01158   if (participant) {
01159     dp_id_ = participant->get_id();
01160   }
01161 
01162   if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01163     // The spec says qos_.history.depth is "has no effect"
01164     // when history.kind = KEEP_ALL so use max_samples_per_instance
01165     depth_ = qos_.resource_limits.max_samples_per_instance;
01166 
01167   } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
01168     depth_ = qos_.history.depth;
01169   }
01170 
01171   if (depth_ == DDS::LENGTH_UNLIMITED) {
01172     // DDS::LENGTH_UNLIMITED is negative so make it a positive
01173     // value that is for all intents and purposes unlimited
01174     // and we can use it for comparisons.
01175     // use 2147483647L because that is the greatest value a signed
01176     // CORBA::Long can have.
01177     // WARNING: The client risks running out of memory in this case.
01178     depth_ = 2147483647L;
01179   }
01180 
01181   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01182     n_chunks_ = qos_.resource_limits.max_samples;
01183   }
01184 
01185   //else using value from Service_Participant
01186 
01187   // enable the type specific part of this DataReader
01188   this->enable_specific();
01189 
01190   //Note: the QoS used to set n_chunks_ is Changable=No so
01191   // it is OK that we cannot change the size of our allocators.
01192   rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_));
01193 
01194   if (DCPS_debug_level >= 2)
01195     ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01196         " Cached_Allocator_With_Overflow %x with %d chunks\n",
01197         rd_allocator_.get(), n_chunks_));
01198 
01199   if ((qos_.liveliness.lease_duration.sec !=
01200       DDS::DURATION_INFINITE_SEC) &&
01201       (qos_.liveliness.lease_duration.nanosec !=
01202           DDS::DURATION_INFINITE_NSEC)) {
01203     liveliness_lease_duration_ =
01204         duration_to_time_value(qos_.liveliness.lease_duration);
01205   }
01206 
01207   // Setup the requested deadline watchdog if the configured deadline
01208   // period is not the default (infinite).
01209   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01210 
01211   if (!this->watchdog_
01212       && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01213           || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01214     this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
01215             ref(this->sample_lock_),
01216             this->qos_.deadline,
01217             ref(*this),
01218             ref(this->requested_deadline_missed_status_),
01219             ref(this->last_deadline_missed_total_count_));
01220   }
01221 
01222   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01223   disco->pre_reader(this);
01224 
01225   this->set_enabled();
01226 
01227   if (topic_servant_ && !transport_disabled_) {
01228 
01229     try {
01230       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01231           this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01232     } catch (const Transport::Exception&) {
01233       ACE_ERROR((LM_ERROR,
01234           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01235           ACE_TEXT("Transport Exception.\n")));
01236       return DDS::RETCODE_ERROR;
01237 
01238     }
01239 
01240     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01241 
01242     CORBA::String_var filterClassName = "";
01243     CORBA::String_var filterExpression = "";
01244     DDS::StringSeq exprParams;
01245 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01246 
01247     if (content_filtered_topic_) {
01248       filterClassName = content_filtered_topic_->get_filter_class_name();
01249       filterExpression = content_filtered_topic_->get_filter_expression();
01250       content_filtered_topic_->get_expression_parameters(exprParams);
01251     }
01252 
01253 #endif
01254 
01255     DDS::SubscriberQos sub_qos;
01256     subscriber->get_qos(sub_qos);
01257 
01258     this->subscription_id_ =
01259         disco->add_subscription(this->domain_id_,
01260             this->dp_id_,
01261             this->topic_servant_->get_id(),
01262             this,
01263             this->qos_,
01264             trans_conf_info,
01265             sub_qos,
01266             filterClassName,
01267             filterExpression,
01268             exprParams);
01269 
01270     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01271       ACE_ERROR((LM_WARNING,
01272           ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::enable, ")
01273           ACE_TEXT("add_subscription returned invalid id.\n")));
01274       return DDS::RETCODE_ERROR;
01275     }
01276   }
01277 
01278   if (topic_servant_) {
01279     const CORBA::String_var name = topic_servant_->get_name();
01280     DDS::ReturnCode_t return_value =
01281         subscriber->reader_enabled(name.in(), this);
01282 
01283     if (this->monitor_) {
01284       this->monitor_->report();
01285     }
01286 
01287     return return_value;
01288   } else {
01289     return DDS::RETCODE_OK;
01290   }
01291 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::enable_filtering ( ContentFilteredTopicImpl cft  ) 

Definition at line 3130 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader(), and content_filtered_topic_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

03131 {
03132   cft->add_reader(*this);
03133   content_filtered_topic_ = cft;
03134 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable_specific (  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by enable().

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::end_access (  ) 

Definition at line 3085 of file DataReaderImpl.cpp.

References coherent_, group_coherent_ordered_data_, post_read_or_take(), OpenDDS::DCPS::GroupRakeData::reset(), and sample_lock_.

03086 {
03087   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03088   this->coherent_ = false;
03089   this->group_coherent_ordered_data_.reset();
03090   this->post_read_or_take();
03091 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::filter_sample ( const DataSampleHeader header  )  [protected]

Check if the received data sample or instance should be filtered.

Note:
Filtering will only occur if the application configured a finite duration in the Topic's LIFESPAN QoS policy or DataReader's TIME_BASED_FILTER QoS policy.

Definition at line 2571 of file DataReaderImpl.cpp.

References ACE_TEXT(), always_get_history_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, ACE_OS::gettimeofday(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, qos_, ACE_Time_Value::sec(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::time_to_time_value(), ACE_Time_Value::usec(), and DDS::VOLATILE_DURABILITY_QOS.

Referenced by data_received().

02572 {
02573   ACE_Time_Value now(ACE_OS::gettimeofday());
02574 
02575   // Expire historic data if QoS indicates VOLATILE.
02576   if (!always_get_history_ && header.historic_sample_
02577       && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02578     if (DCPS_debug_level >= 8) {
02579       ACE_DEBUG((LM_DEBUG,
02580           ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02581           ACE_TEXT("Discarded historic data.\n")));
02582     }
02583 
02584     return true;  // Data filtered.
02585   }
02586 
02587   // The LIFESPAN_DURATION_FLAG is set when sample data is sent
02588   // with a non-default LIFESPAN duration value.
02589   if (header.lifespan_duration_) {
02590     // Finite lifespan.  Check if data has expired.
02591 
02592     DDS::Time_t const tmp = {
02593         header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02594         header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02595     };
02596 
02597     // We assume that the publisher host's clock and subcriber host's
02598     // clock are synchronized (allowed by the spec).
02599     ACE_Time_Value const expiration_time(
02600         OpenDDS::DCPS::time_to_time_value(tmp));
02601 
02602     if (now >= expiration_time) {
02603       if (DCPS_debug_level >= 8) {
02604         ACE_Time_Value const diff(now - expiration_time);
02605         ACE_DEBUG((LM_DEBUG,
02606             ACE_TEXT("OpenDDS (%P|%t) Received data ")
02607             ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02608             diff.sec(),
02609             diff.usec()));
02610       }
02611 
02612       return true;  // Data filtered.
02613     }
02614   }
02615 
02616   return false;
02617 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ContentFilteredTopic_ptr OpenDDS::DCPS::DataReaderImpl::get_cf_topic (  )  const

Definition at line 3137 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), content_filtered_topic_, and OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get().

Here is the call graph for this function:

CORBA::Long OpenDDS::DCPS::DataReaderImpl::get_depth (  )  const [inline]

Definition at line 394 of file DataReaderImpl.h.

00394                               {
00395     return depth_;
00396   }

OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_dp_id (  ) 

Definition at line 2832 of file DataReaderImpl.cpp.

References dp_id_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02833 {
02834   return dp_id_;
02835 }

Here is the caller graph for this function:

SubscriptionInstance_rch OpenDDS::DCPS::DataReaderImpl::get_handle_instance ( DDS::InstanceHandle_t  handle  )  [protected]

Definition at line 2413 of file DataReaderImpl.cpp.

References ACE_TEXT(), instances_, instances_lock_, and LM_WARNING.

Referenced by release_instance().

02414 {
02415   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
02416 
02417   SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02418   if (iter == instances_.end()) {
02419     ACE_DEBUG((LM_WARNING,
02420         ACE_TEXT("(%P|%t) WARNING: ")
02421         ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02422         ACE_TEXT("lookup for 0x%x failed\n"),
02423         handle));
02424     return SubscriptionInstance_rch();
02425   } // if (0 != instances_.find(handle, instance))
02426 
02427   return iter->second;
02428 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 197 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

00198 {
00199   using namespace OpenDDS::DCPS;
00200   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00201   if (participant)
00202     return participant->id_to_handle(subscription_id_);
00203   return 0;
00204 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 2838 of file DataReaderImpl.cpp.

References instances_, instances_lock_, and sample_lock_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02839 {
02840   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02841   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02842 
02843   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02844       end = instances_.end(); iter != end; ++iter) {
02845     instance_handles.push_back(iter->first);
02846   }
02847 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::get_latency_stats ( OpenDDS::DCPS::LatencyStatisticsSeq stats  )  [virtual]

Definition at line 2366 of file DataReaderImpl.cpp.

References statistics_.

02368 {
02369   stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02370   int index = 0;
02371 
02372   for (StatsMapType::const_iterator current = this->statistics_.begin();
02373       current != this->statistics_.end();
02374       ++current, ++index) {
02375     stats[ index] = current->second.get_stats();
02376     stats[ index].publication = current->first;
02377   }
02378 }

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_listener (  )  [virtual]

Implements DDS::DataReader.

Definition at line 940 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), and listener_.

00941 {
00942   return DDS::DataReaderListener::_duplicate(listener_.in());
00943 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_liveliness_changed_status ( DDS::LivelinessChangedStatus status  )  [virtual]
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publication_data ( DDS::PublicationBuiltinTopicData publication_data,
DDS::InstanceHandle_t  publication_handle 
) [virtual]

Definition at line 1097 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, publication_handle_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01100 {
01101   if (enabled_ == false) {
01102     ACE_ERROR_RETURN((LM_ERROR,
01103         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01104         ACE_TEXT("get_matched_publication_data: ")
01105         ACE_TEXT("Entity is not enabled. \n")),
01106         DDS::RETCODE_NOT_ENABLED);
01107   }
01108 
01109 
01110   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01111       guard,
01112       this->publication_handle_lock_,
01113       DDS::RETCODE_ERROR);
01114 
01115   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01116 
01117   if (!participant)
01118     return DDS::RETCODE_ERROR;
01119 
01120   DDS::PublicationBuiltinTopicDataSeq data;
01121   const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01122                                   participant.in(),
01123                                   BUILT_IN_PUBLICATION_TOPIC,
01124                                   publication_handle,
01125                                   data);
01126 
01127   if (ret == DDS::RETCODE_OK) {
01128     publication_data = data[0];
01129   }
01130 
01131   return ret;
01132 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publications ( DDS::InstanceHandleSeq publication_handles  )  [virtual]

Definition at line 1066 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, LM_ERROR, publication_handle_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01068 {
01069   if (enabled_ == false) {
01070     ACE_ERROR_RETURN((LM_ERROR,
01071         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01072         ACE_TEXT(" Entity is not enabled. \n")),
01073         DDS::RETCODE_NOT_ENABLED);
01074   }
01075 
01076   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01077       guard,
01078       this->publication_handle_lock_,
01079       DDS::RETCODE_ERROR);
01080 
01081   // Copy out the handles for the current set of publications.
01082   int index = 0;
01083   publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01084 
01085   for (RepoIdToHandleMap::iterator
01086       current = this->id_to_handle_map_.begin();
01087       current != this->id_to_handle_map_.end();
01088       ++current, ++index) {
01089     publication_handles[ index] = current->second;
01090   }
01091 
01092   return DDS::RETCODE_OK;
01093 }

Here is the call graph for this function:

size_t OpenDDS::DCPS::DataReaderImpl::get_n_chunks (  )  const [inline]

Definition at line 397 of file DataReaderImpl.h.

00397                               {
00398     return n_chunks_;
00399   }

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_next_handle ( const DDS::BuiltinTopicKey_t key  )  [protected]

Get an instance handle for a new instance.

Definition at line 2431 of file DataReaderImpl.cpp.

References domain_id_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), is_bit(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, TheServiceParticipant, and topic_servant_.

02432 {
02433   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02434   if (!participant)
02435     return 0;
02436 
02437   if (is_bit()) {
02438     Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02439     CORBA::String_var topic = topic_servant_->get_name();
02440 
02441     RepoId id = disc->bit_key_to_repo_id(participant.in(), topic, key);
02442     return participant->id_to_handle(id);
02443 
02444   } else {
02445     return participant->id_to_handle(GUID_UNKNOWN);
02446   }
02447 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::get_ordered_data ( GroupRakeData data,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Definition at line 3094 of file DataReaderImpl.cpp.

References group_coherent_ordered_data_, OpenDDS::DCPS::GroupRakeData::insert_sample(), instances_, instances_lock_, item(), and sample_lock_.

03098 {
03099   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03100   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03101 
03102   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03103       iter != instances_.end(); ++iter) {
03104     SubscriptionInstance_rch ptr = iter->second;
03105     if ((ptr->instance_state_.view_state() & view_states) &&
03106         (ptr->instance_state_.instance_state() & instance_states)) {
03107       size_t i(0);
03108       for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03109           item != 0; item = item->next_data_sample_) {
03110         if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03111           data.insert_sample(item, ptr, ++i);
03112           this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03113         }
03114       }
03115     }
03116   }
03117 }

Here is the call graph for this function:

Priority OpenDDS::DCPS::DataReaderImpl::get_priority_value ( const AssociationData data  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 675 of file DataReaderImpl.h.

References OpenDDS::DCPS::AssociationData::publication_transport_priority_.

00675                                                                  {
00676     return data.publication_transport_priority_;
00677   }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_qos ( DDS::DataReaderQos qos  )  [virtual]

Definition at line 923 of file DataReaderImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::InstanceState::schedule_release().

00925 {
00926   qos = qos_;
00927   return DDS::RETCODE_OK;
00928 }

Here is the caller graph for this function:

ACE_Reactor_Timer_Interface * OpenDDS::DCPS::DataReaderImpl::get_reactor ( void   ) 

Definition at line 2820 of file DataReaderImpl.cpp.

References reactor_.

Referenced by OpenDDS::DCPS::InstanceState::cancel_release(), and OpenDDS::DCPS::InstanceState::schedule_release().

02821 {
02822   return this->reactor_;
02823 }

Here is the caller graph for this function:

const RepoId& OpenDDS::DCPS::DataReaderImpl::get_repo_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 672 of file DataReaderImpl.h.

Referenced by data_received().

00672 { return this->subscription_id_; }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_deadline_missed_status ( DDS::RequestedDeadlineMissedStatus status  )  [virtual]

Definition at line 989 of file DataReaderImpl.cpp.

References last_deadline_missed_total_count_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, requested_deadline_missed_status_, DDS::RETCODE_OK, sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.

00991 {
00992   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
00993 
00994   set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
00995       false);
00996 
00997   this->requested_deadline_missed_status_.total_count_change =
00998       this->requested_deadline_missed_status_.total_count
00999       - this->last_deadline_missed_total_count_;
01000 
01001   // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
01002   // is updated by the RequestedDeadlineWatchdog.
01003 
01004   // Update for next status check.
01005   this->last_deadline_missed_total_count_ =
01006       this->requested_deadline_missed_status_.total_count;
01007 
01008   status = requested_deadline_missed_status_;
01009 
01010   return DDS::RETCODE_OK;
01011 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_incompatible_qos_status ( DDS::RequestedIncompatibleQosStatus status  )  [virtual]
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_lost_status ( DDS::SampleLostStatus status  )  [virtual]
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_rejected_status ( DDS::SampleRejectedStatus status  )  [virtual]
DDS::Subscriber_ptr OpenDDS::DCPS::DataReaderImpl::get_subscriber (  )  [virtual]

Implements DDS::DataReader.

Definition at line 955 of file DataReaderImpl.cpp.

References get_subscriber_servant().

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::DRMonitorImpl::report().

00956 {
00957   return get_subscriber_servant()._retn();
00958 }

Here is the call graph for this function:

Here is the caller graph for this function:

RcHandle< SubscriberImpl > OpenDDS::DCPS::DataReaderImpl::get_subscriber_servant (  )  [protected]

Definition at line 1675 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and subscriber_servant_.

Referenced by coherent_changes_completed(), data_received(), enable(), get_subscriber(), listener_for(), post_read_or_take(), set_qos(), and verify_coherent_changes_completion().

01676 {
01677   return subscriber_servant_.lock();
01678 }

Here is the call graph for this function:

Here is the caller graph for this function:

RepoId OpenDDS::DCPS::DataReaderImpl::get_subscription_id (  )  const
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_subscription_matched_status ( DDS::SubscriptionMatchedStatus status  )  [virtual]
OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_topic_id (  ) 

Definition at line 2826 of file DataReaderImpl.cpp.

References topic_servant_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02827 {
02828   return this->topic_servant_->get_id();
02829 }

Here is the caller graph for this function:

DDS::TopicDescription_ptr OpenDDS::DCPS::DataReaderImpl::get_topicdescription (  )  [virtual]

Implements DDS::DataReader.

Definition at line 945 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), content_filtered_topic_, OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get(), and topic_desc_.

Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

00946 {
00947 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00948   if (content_filtered_topic_) {
00949     return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
00950   }
00951 #endif
00952   return DDS::TopicDescription::_duplicate(topic_desc_.in());
00953 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::get_writer_states ( WriterStatePairVec &  writer_states  ) 

Definition at line 2850 of file DataReaderImpl.cpp.

References writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02851 {
02852   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02853       read_guard,
02854       this->writers_lock_);
02855   for (WriterMapType::iterator iter = writers_.begin();
02856       iter != writers_.end();
02857       ++iter) {
02858     writer_states.push_back(WriterStatePair(iter->first,
02859         iter->second->get_state()));
02860   }
02861 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::has_readcondition ( DDS::ReadCondition_ptr  a_condition  )  [protected]

Definition at line 821 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), and read_conditions_.

00822 {
00823   //sample lock already held
00824   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00825   return read_conditions_.find(rc) != read_conditions_.end();
00826 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::has_zero_copies (  ) 

This method is used for a precondition check of delete_datareader.

Return values:
true We have zero-copy samples loaned out
false We have no zero-copy samples loaned out

Definition at line 2725 of file DataReaderImpl.cpp.

References instances_, instances_lock_, item(), and sample_lock_.

02726 {
02727   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02728       guard,
02729       this->sample_lock_,
02730       true /* assume we have loans */);
02731   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
02732 
02733   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02734       iter != instances_.end();
02735       ++iter) {
02736     SubscriptionInstance_rch ptr = iter->second;
02737 
02738     for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02739         item != 0; item = item->next_data_sample_) {
02740       if (item->zero_copy_cnt_ > 0) {
02741         return true;
02742       }
02743     }
02744   }
02745 
02746   return false;
02747 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::have_instance_states ( DDS::InstanceStateMask  instance_states  )  const

!!caller should have acquired sample_lock_ : determine correct failed lock return value.

Definition at line 1728 of file DataReaderImpl.cpp.

References instances_, and instances_lock_.

01730 {
01731   //!!!caller should have acquired sample_lock_
01732   /// @TODO: determine correct failed lock return value.
01733   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01734 
01735   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01736       iter != instances_.end();
01737       ++iter) {
01738     SubscriptionInstance_rch ptr = iter->second;
01739 
01740     if (ptr->instance_state_.instance_state() & instance_states) {
01741       return true;
01742     }
01743   }
01744 
01745   return false;
01746 }

bool OpenDDS::DCPS::DataReaderImpl::have_sample_states ( DDS::SampleStateMask  sample_states  )  const

!!caller should have acquired sample_lock_ : determine correct failed lock return value.

Definition at line 1685 of file DataReaderImpl.cpp.

References instances_, instances_lock_, and item().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states().

01687 {
01688   //!!!caller should have acquired sample_lock_
01689   /// @TODO: determine correct failed lock return value.
01690   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01691 
01692   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01693       iter != instances_.end();
01694       ++iter) {
01695     SubscriptionInstance_rch ptr = iter->second;
01696 
01697     for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01698         item != 0; item = item->next_data_sample_) {
01699       if (item->sample_state_ & sample_states) {
01700         return true;
01701       }
01702     }
01703   }
01704 
01705   return false;
01706 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::have_view_states ( DDS::ViewStateMask  view_states  )  const

!!caller should have acquired sample_lock_ : determine correct failed lock return value.

Definition at line 1709 of file DataReaderImpl.cpp.

References instances_, and instances_lock_.

01710 {
01711   //!!!caller should have acquired sample_lock_
01712   /// @TODO: determine correct failed lock return value.
01713   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01714 
01715   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01716       iter != instances_.end();
01717       ++iter) {
01718     SubscriptionInstance_rch ptr = iter->second;
01719 
01720     if (ptr->instance_state_.view_state() & view_states) {
01721       return true;
01722     }
01723   }
01724 
01725   return false;
01726 }

void OpenDDS::DCPS::DataReaderImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 731 of file DataReaderImpl.cpp.

References topic_servant_.

00732 {
00733   topic_servant_->inconsistent_topic();
00734 }

void OpenDDS::DCPS::DataReaderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
DDS::DataReaderListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
SubscriberImpl subscriber 
)

Definition at line 150 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::SubscriberImpl::get_qos(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, LM_WARNING, participant_servant_, qos_, DDS::RETCODE_OK, ACE_OS::strcmp(), subscriber_servant_, topic_desc_, and topic_servant_.

Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00157 {
00158   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00159   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00160     topic_servant_ = a_topic;
00161   }
00162 
00163   CORBA::String_var topic_name = a_topic_desc->get_name();
00164 
00165 #if !defined (DDS_HAS_MINIMUM_BIT)
00166   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00167       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00168       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00169       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00170 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00171 
00172   qos_ = qos;
00173 
00174 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00175   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00176 #endif
00177 
00178   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00179   listener_mask_ = mask;
00180 
00181   // Only store the participant pointer, since it is our "grand"
00182   // parent, we will exist as long as it does
00183   participant_servant_ = *participant;
00184 
00185   domain_id_ = participant->get_domain_id();
00186 
00187   subscriber_servant_ = *subscriber;
00188 
00189   if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00190     ACE_DEBUG((LM_WARNING,
00191         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00192         ACE_TEXT("failed to get SubscriberQos\n")));
00193   }
00194 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update ( WriterInfo info,
const ACE_Time_Value when 
) [private]

Definition at line 2235 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count, instances_, instances_lock_, liveliness_changed_status_, and OpenDDS::DCPS::WriterInfo::writer_id_.

Referenced by writer_became_dead(), and writer_removed().

02237 {
02238   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02239   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02240       next = iter; iter != instances_.end(); iter = next) {
02241     ++next;
02242     iter->second->instance_state_.writer_became_dead(
02243         info.writer_id_, liveliness_changed_status_.alive_count, when);
02244   }
02245 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::is_bit (  )  const

Definition at line 2719 of file DataReaderImpl.cpp.

References is_bit_.

Referenced by get_next_handle().

02720 {
02721   return this->is_bit_;
02722 }

Here is the caller graph for this function:

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::listener_for ( DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change. If this datareader has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/subscriber.

Definition at line 1779 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), get_subscriber_servant(), CORBA::is_nil(), listener_, and listener_mask_.

Referenced by coherent_changes_completed(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().

01780 {
01781   // per 2.1.4.3.1 Listener Access to Plain Communication Status
01782   // use this entities factory if listener is mask not enabled
01783   // for this kind.
01784   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
01785   if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
01786     return subscriber->listener_for(kind);
01787 
01788   } else {
01789     return DDS::DataReaderListener::_duplicate(listener_.in());
01790   }
01791 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::liveliness_lost (  ) 
virtual void OpenDDS::DCPS::DataReaderImpl::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by data_received().

Here is the caller graph for this function:

virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::lookup_instance_generic ( const void *  data  )  [pure virtual]
void OpenDDS::DCPS::DataReaderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the publication repo ids.

Definition at line 2539 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, and participant_servant_.

Referenced by notify_latency(), notify_subscription_disconnected(), notify_subscription_lost(), notify_subscription_reconnected(), and remove_associations_i().

02541 {
02542   CORBA::ULong const num_wrts = ids.length();
02543 
02544   if (DCPS_debug_level > 9) {
02545     const char* separator = "";
02546     OPENDDS_STRING guids;
02547 
02548     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02549       guids += separator;
02550       guids += OPENDDS_STRING(GuidConverter(ids[i]));
02551       separator = ", ";
02552     }
02553 
02554     ACE_DEBUG((LM_DEBUG,
02555         ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02556         ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02557         guids.c_str()));
02558   }
02559 
02560   hdls.length(num_wrts);
02561 
02562   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563   if (participant) {
02564     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02565       hdls[i] = participant->id_to_handle(ids[i]);
02566     }
02567   }
02568 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_latency ( PublicationId  writer  ) 

Definition at line 2333 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), budget_exceeded_status_, CORBA::is_nil(), OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, listener_, lookup_instance_handles(), OpenDDS::DCPS::BudgetExceededStatus::total_count, and OpenDDS::DCPS::BudgetExceededStatus::total_count_change.

Referenced by process_latency().

02334 {
02335   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02336   // is given to this DataReader then narrow() fails.
02337   DataReaderListener_var listener
02338   = DataReaderListener::_narrow(this->listener_.in());
02339 
02340   if (!CORBA::is_nil(listener.in())) {
02341     WriterIdSeq writerIds;
02342     writerIds.length(1);
02343     writerIds[ 0] = writer;
02344 
02345     DDS::InstanceHandleSeq handles;
02346     this->lookup_instance_handles(writerIds, handles);
02347 
02348     if (handles.length() >= 1) {
02349       this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02350 
02351     } else {
02352       this->budget_exceeded_status_.last_instance_handle = -1;
02353     }
02354 
02355     ++this->budget_exceeded_status_.total_count;
02356     ++this->budget_exceeded_status_.total_count_change;
02357 
02358     listener->on_budget_exceeded(this, this->budget_exceeded_status_);
02359 
02360     this->budget_exceeded_status_.total_count_change = 0;
02361   }
02362 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_liveliness_change (  ) 

Definition at line 2749 of file DataReaderImpl.cpp.

References ACE_TEXT(), DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, CORBA::is_nil(), listener_, listener_for(), listener_mask_, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_dds_string(), and writers_.

Referenced by writer_became_alive(), writer_became_dead(), and writer_removed().

02750 {
02751   // N.B. writers_lock_ should already be acquired when
02752   //      this method is called.
02753 
02754   DDS::DataReaderListener_var listener
02755   = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02756 
02757   if (!CORBA::is_nil(listener.in())) {
02758     listener->on_liveliness_changed(this, liveliness_changed_status_);
02759 
02760     liveliness_changed_status_.alive_count_change = 0;
02761     liveliness_changed_status_.not_alive_count_change = 0;
02762   }
02763   notify_status_condition();
02764 
02765   if (DCPS_debug_level > 9) {
02766     OPENDDS_STRING output_str;
02767     output_str += "subscription ";
02768     output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02769     output_str += ", listener at: 0x";
02770     output_str += to_dds_string(this->listener_.in ());
02771 
02772     for (WriterMapType::iterator current = this->writers_.begin();
02773         current != this->writers_.end();
02774         ++current) {
02775       RepoId id = current->first;
02776       output_str += "\n\tNOTIFY: writer[ ";
02777       output_str += OPENDDS_STRING(GuidConverter(id));
02778       output_str += "] == ";
02779       output_str += current->second->get_state_str();
02780     }
02781 
02782     output_str + "\n";
02783     ACE_DEBUG((LM_DEBUG,
02784         ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02785         ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02786         ACE_TEXT("\tNOTIFY: %C\n"),
02787         listener.in (),
02788         listener_mask_,
02789         output_str.c_str()));
02790   }
02791 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_read_conditions (  )  [protected]

Data has arrived into the cache, unblock waiting ReadConditions.

Definition at line 1655 of file DataReaderImpl.cpp.

References ACE_TEXT(), LM_ERROR, read_conditions_, reverse_sample_lock_, and OpenDDS::DCPS::ConditionImpl::signal_all().

Referenced by accept_sample_processing(), and data_received().

01656 {
01657   //sample lock is already held
01658   ReadConditionSet local_read_conditions = read_conditions_;
01659   ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01660 
01661   for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01662       end = local_read_conditions.end(); it != end; ++it) {
01663     ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
01664     if (ci) {
01665       ci->signal_all();
01666     } else {
01667       ACE_ERROR((LM_ERROR,
01668         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
01669         ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
01670     }
01671   }
01672 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2450 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.

02451 {
02452   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02453 
02454   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02455   // is given to this DataReader then narrow() fails.
02456   DataReaderListener_var the_listener
02457   = DataReaderListener::_narrow(this->listener_.in());
02458 
02459   if (!CORBA::is_nil(the_listener.in())) {
02460     SubscriptionLostStatus status;
02461 
02462     // Since this callback may come after remove_association which removes
02463     // the writer from id_to_handle map, we can ignore this error.
02464     this->lookup_instance_handles(pubids, status.publication_handles);
02465     the_listener->on_subscription_disconnected(this, status);
02466   }
02467 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 2492 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), len, listener_, OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.

02493 {
02494   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02495 
02496   if (!this->is_bit_) {
02497     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02498     // is given to this DataReader then narrow() fails.
02499     DataReaderListener_var the_listener
02500     = DataReaderListener::_narrow(this->listener_.in());
02501 
02502     if (!CORBA::is_nil(the_listener.in())) {
02503       SubscriptionLostStatus status;
02504 
02505       CORBA::ULong len = handles.length();
02506       status.publication_handles.length(len);
02507 
02508       for (CORBA::ULong i = 0; i < len; ++ i) {
02509         status.publication_handles[i] = handles[i];
02510       }
02511 
02512       the_listener->on_subscription_lost(this, status);
02513     }
02514   }
02515 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2518 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.

Referenced by remove_associations_i().

02519 {
02520   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02521 
02522   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02523   // is given to this DataReader then narrow() fails.
02524   DataReaderListener_var the_listener
02525   = DataReaderListener::_narrow(this->listener_.in());
02526 
02527   if (!CORBA::is_nil(the_listener.in())) {
02528     SubscriptionLostStatus status;
02529 
02530     // Since this callback may come after remove_association which removes
02531     // the writer from id_to_handle map, we can ignore this error.
02532     this->lookup_instance_handles(pubids, status.publication_handles);
02533     the_listener->on_subscription_lost(this, status);
02534   }
02535 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2470 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.

02471 {
02472   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02473 
02474   if (!this->is_bit_) {
02475     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02476     // is given to this DataReader then narrow() fails.
02477     DataReaderListener_var the_listener
02478     = DataReaderListener::_narrow(this->listener_.in());
02479 
02480     if (!CORBA::is_nil(the_listener.in())) {
02481       SubscriptionLostStatus status;
02482 
02483       // If it's reconnected then the reader should be in id_to_handle
02484       this->lookup_instance_handles(pubids, status.publication_handles);
02485 
02486       the_listener->on_subscription_reconnected(this,  status);
02487     }
02488   }
02489 }

Here is the call graph for this function:

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
SubscriptionInstance_rch   
)

Referenced by deliver_historic(), and resume_sample_processing().

Here is the caller graph for this function:

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
RcHandle< WriterInfo ,
GUID_tKeyLessThan   
) [private]

publications writing to this reader.

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
WriterStats  ,
GUID_tKeyLessThan   
)

Type of collection of statistics for writers to this reader.

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET_CMP ( DDS::ReadCondition_var  ,
RCCompLess   
) [private]
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( void *   ) 
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( WriterStatePair   ) 
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 

Referenced by signal_liveliness().

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataReaderImpl::ownership_filter_instance ( const SubscriptionInstance_rch instance,
const PublicationId pubid 
) [protected]

Definition at line 2621 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_UNKNOWN, is_exclusive_ownership_, LM_DEBUG, OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, and writers_.

02623 {
02624 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02625   if (this->is_exclusive_ownership_) {
02626 
02627     WriterMapType::iterator iter = writers_.find(pubid);
02628 
02629     if (iter == writers_.end()) {
02630       if (DCPS_debug_level > 4) {
02631         // This may not be an error since it could happen that the sample
02632         // is delivered to the datareader after the write is dis-associated
02633         // with this datareader.
02634         GuidConverter reader_converter(subscription_id_);
02635         GuidConverter writer_converter(pubid);
02636         ACE_DEBUG((LM_DEBUG,
02637                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02638                    ACE_TEXT("reader %C is not associated with writer %C.\n"),
02639                    OPENDDS_STRING(reader_converter).c_str(),
02640                    OPENDDS_STRING(writer_converter).c_str()));
02641       }
02642       return true;
02643     }
02644 
02645 
02646     // Evaulate the owner of the instance if not selected and filter
02647     // current message if it's not from owner writer.
02648     if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02649         || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02650       OwnershipManagerPtr owner_manager = this->ownership_manager();
02651 
02652       bool is_owner = owner_manager && owner_manager->select_owner (
02653         instance->instance_handle_,
02654         iter->second->writer_id_,
02655         iter->second->writer_qos_.ownership_strength.value,
02656         &instance->instance_state_);
02657       iter->second->set_owner_evaluated (instance->instance_handle_, true);
02658 
02659       if (! is_owner) {
02660         if (DCPS_debug_level >= 1) {
02661           GuidConverter reader_converter(subscription_id_);
02662           GuidConverter writer_converter(pubid);
02663           GuidConverter owner_converter (instance->instance_state_.get_owner ());
02664           ACE_DEBUG((LM_DEBUG,
02665                      ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02666                      ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02667                      OPENDDS_STRING(reader_converter).c_str(),
02668                      OPENDDS_STRING(writer_converter).c_str(),
02669                      OPENDDS_STRING(owner_converter).c_str()));
02670         }
02671         return true;
02672       }
02673     }
02674     else if (! (instance->instance_state_.get_owner () == pubid)) {
02675       if (DCPS_debug_level >= 1) {
02676         GuidConverter reader_converter(subscription_id_);
02677         GuidConverter writer_converter(pubid);
02678         GuidConverter owner_converter (instance->instance_state_.get_owner ());
02679         ACE_DEBUG((LM_DEBUG,
02680                    ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
02681                    ACE_TEXT("reader %C writer %C is not owner %C\n"),
02682                    OPENDDS_STRING(reader_converter).c_str(),
02683                    OPENDDS_STRING(writer_converter).c_str(),
02684                    OPENDDS_STRING(owner_converter).c_str()));
02685       }
02686       return true;
02687     }
02688   }
02689 #else
02690   ACE_UNUSED_ARG(pubid);
02691   ACE_UNUSED_ARG(instance);
02692 #endif
02693   return false;
02694 }

Here is the call graph for this function:

OwnershipManagerPtr OpenDDS::DCPS::DataReaderImpl::ownership_manager (  )  [inline]
RcHandle< EntityImpl > OpenDDS::DCPS::DataReaderImpl::parent ( void   )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1641 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and subscriber_servant_.

01642 {
01643   return this->subscriber_servant_.lock();
01644 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::post_read_or_take (  )  [protected]

Definition at line 2793 of file DataReaderImpl.cpp.

References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

Referenced by end_access().

02794 {
02795   set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02796   RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02797   if (subscriber)
02798     subscriber->set_status_changed_flag(
02799       DDS::DATA_ON_READERS_STATUS, false);
02800 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::prepare_to_delete (  )  [protected]

Definition at line 2405 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::send_final_acks(), OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().

02406 {
02407   this->set_deleted(true);
02408   this->stop_associating();
02409   this->send_final_acks();
02410 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::process_latency ( const ReceivedDataSample sample  ) 

NB: This message is generated contemporaneously with a similar message from writer_activity(). That message is not marked as an error, so we follow that lead and leave this as an informational message, guarded by debug level. This seems to be due to late samples (samples delivered after an association has been torn down). We may want to promote this to a warning if other conditions causing this symptom are discovered.

Definition at line 2271 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, ACE_OS::gettimeofday(), OpenDDS::DCPS::ReceivedDataSample::header_, DDS::DataReaderQos::latency_budget, LM_DEBUG, ACE_Time_Value::msec(), notify_latency(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, qos_, ACE_Time_Value::sec(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, statistics_, statistics_enabled(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::time_value_to_duration().

Referenced by data_received().

02272 {
02273   StatsMapType::iterator location
02274   = this->statistics_.find(sample.header_.publication_id_);
02275 
02276   if (location != this->statistics_.end()) {
02277     const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02278 
02279     // Only when the user has specified a latency budget or statistics
02280     // are enabled we need to calculate our latency
02281     if ((this->statistics_enabled()) ||
02282         (this->qos_.latency_budget.duration > zero)) {
02283       // This starts as the current time.
02284       ACE_Time_Value latency = ACE_OS::gettimeofday();
02285 
02286       // The time interval starts at the send end.
02287       DDS::Duration_t then = {
02288           sample.header_.source_timestamp_sec_,
02289           sample.header_.source_timestamp_nanosec_
02290       };
02291 
02292       // latency delay in ACE_Time_Value format.
02293       latency -= duration_to_time_value(then);
02294 
02295       if (this->statistics_enabled()) {
02296         location->second.add_stat(latency);
02297       }
02298 
02299       if (DCPS_debug_level > 9) {
02300         ACE_DEBUG((LM_DEBUG,
02301             ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02302             ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02303             latency.sec(),
02304             latency.msec()));
02305       }
02306 
02307       if (this->qos_.latency_budget.duration > zero) {
02308         // Check latency against the budget.
02309         if (time_value_to_duration(latency) > this->qos_.latency_budget.duration) {
02310           this->notify_latency(sample.header_.publication_id_);
02311         }
02312       }
02313     }
02314   } else if (DCPS_debug_level > 0) {
02315     /// NB: This message is generated contemporaneously with a similar
02316     ///     message from writer_activity().  That message is not marked
02317     ///     as an error, so we follow that lead and leave this as an
02318     ///     informational message, guarded by debug level.  This seems
02319     ///     to be due to late samples (samples delivered after an
02320     ///     association has been torn down).  We may want to promote this
02321     ///     to a warning if other conditions causing this symptom are
02322     ///     discovered.
02323     GuidConverter reader_converter(subscription_id_);
02324     GuidConverter writer_converter(sample.header_.publication_id_);
02325     ACE_DEBUG((LM_DEBUG,
02326         ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02327         ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02328         OPENDDS_STRING(reader_converter).c_str(),
02329         OPENDDS_STRING(writer_converter).c_str()));
02330   }
02331 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::DataReaderImpl::purge_data ( SubscriptionInstance_rch  instance  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by release_instance().

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::qos_change ( const DDS::DataReaderQos qos  )  [protected, virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Definition at line 896 of file DataReaderImpl.cpp.

References DDS::DataReaderQos::deadline, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), last_deadline_missed_total_count_, qos_, OpenDDS::DCPS::ref(), requested_deadline_missed_status_, OpenDDS::DCPS::RcHandle< T >::reset(), sample_lock_, and watchdog_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change(), and set_qos().

00897 {
00898   // Reset the deadline timer if the period has changed.
00899   if (qos_.deadline.period.sec != qos.deadline.period.sec ||
00900       qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00901     if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00902         qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00903 
00904           this->watchdog_ = make_rch<RequestedDeadlineWatchdog>(
00905                   ref(this->sample_lock_),
00906                   qos.deadline,
00907                   ref(*this),
00908                   ref(this->requested_deadline_missed_status_),
00909                   ref(this->last_deadline_missed_total_count_));
00910 
00911     } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
00912                qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00913       watchdog_->cancel_all();
00914       watchdog_.reset();
00915     } else {
00916       watchdog_->reset_interval(
00917         duration_to_time_value(qos.deadline.period));
00918     }
00919   }
00920 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE unsigned int & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size (  ) 

Configure the size of the raw data collection buffer.

Definition at line 19 of file DataReaderImpl.inl.

References raw_latency_buffer_size_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00020 {
00021   return this->raw_latency_buffer_size_;
00022 }

Here is the caller graph for this function:

ACE_INLINE OpenDDS::DCPS::DataCollector< double >::OnFull & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type (  ) 

Configure the type of the raw data collection buffer.

Definition at line 26 of file DataReaderImpl.inl.

References raw_latency_buffer_type_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00027 {
00028   return this->raw_latency_buffer_type_;
00029 }

Here is the caller graph for this function:

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const OpenDDS::DCPS::DataReaderImpl::StatsMapType & OpenDDS::DCPS::DataReaderImpl::raw_latency_statistics (  )  const

Expose the statistics container.

Definition at line 12 of file DataReaderImpl.inl.

References statistics_.

00013 {
00014   return this->statistics_;
00015 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().

Here is the caller graph for this function:

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]
void OpenDDS::DCPS::DataReaderImpl::register_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3242 of file DataReaderImpl.cpp.

03247 {
03248   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03249 }

void OpenDDS::DCPS::DataReaderImpl::reject_coherent ( PublicationId writer_id,
RepoId publisher_id 
)

Definition at line 2960 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instances_, instances_lock_, LM_DEBUG, OPENDDS_STRING, reset_coherent_info(), sample_lock_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

Referenced by verify_coherent_changes_completion().

02962 {
02963   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
02964     GuidConverter reader (this->subscription_id_);
02965     GuidConverter writer (writer_id);
02966     GuidConverter publisher (publisher_id);
02967     ACE_DEBUG((LM_DEBUG,
02968         ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
02969         ACE_TEXT(" reader %C writer %C publisher %C \n"),
02970         OPENDDS_STRING(reader).c_str(),
02971         OPENDDS_STRING(writer).c_str(),
02972         OPENDDS_STRING(publisher).c_str()));
02973   }
02974 
02975   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02976   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02977 
02978   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02979       iter != this->instances_.end(); ++iter) {
02980     iter->second->rcvd_strategy_->reject_coherent(
02981         writer_id, publisher_id);
02982   }
02983   this->reset_coherent_info (writer_id, publisher_id);
02984 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::release_instance ( DDS::InstanceHandle_t  handle  ) 

Release the instance with the handle.

Definition at line 1968 of file DataReaderImpl.cpp.

References get_handle_instance(), instances_, instances_lock_, LM_ERROR, monitor_, ownership_manager(), purge_data(), release_instance_i(), OpenDDS::DCPS::Monitor::report(), and sample_lock_.

Referenced by OpenDDS::DCPS::InstanceState::release().

01969 {
01970 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01971   OwnershipManagerPtr owner_manager = this->ownership_manager();
01972   if (owner_manager) {
01973     owner_manager->remove_writers (handle);
01974   }
01975 #endif
01976 
01977   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01978   SubscriptionInstance_rch instance = this->get_handle_instance(handle);
01979 
01980   if (!instance) {
01981     ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
01982         "could not find the instance by handle 0x%x\n", handle));
01983     return;
01984   }
01985 
01986   this->purge_data(instance);
01987 
01988   {
01989     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01990     instances_.erase(handle);
01991   }
01992 
01993   this->release_instance_i(handle);
01994   if (this->monitor_) {
01995     this->monitor_->report();
01996   }
01997 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::DataReaderImpl::release_instance_i ( DDS::InstanceHandle_t  handle  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by release_instance().

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::remove_all_associations (  ) 

Definition at line 647 of file DataReaderImpl.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, publication_handle_lock_, remove_associations(), size, OpenDDS::DCPS::TransportClient::stop_associating(), writers_, and writers_lock_.

00648 {
00649   DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00650   // stop pending associations
00651   this->stop_associating();
00652 
00653   OpenDDS::DCPS::WriterIdSeq writers;
00654   int size;
00655 
00656   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00657 
00658   {
00659     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00660 
00661     size = static_cast<int>(writers_.size());
00662     writers.length(size);
00663 
00664     WriterMapType::iterator curr_writer = writers_.begin();
00665     WriterMapType::iterator end_writer = writers_.end();
00666 
00667     int i = 0;
00668 
00669     while (curr_writer != end_writer) {
00670       writers[i++] = curr_writer->first;
00671       ++curr_writer;
00672     }
00673   }
00674 
00675   try {
00676     CORBA::Boolean dont_notify_lost = 0;
00677 
00678     if (0 < size) {
00679       remove_associations(writers, dont_notify_lost);
00680     }
00681 
00682   } catch (const CORBA::Exception&) {
00683       ACE_DEBUG((LM_WARNING,
00684                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00685                  ACE_TEXT("caught exception from remove_associations.\n")));
00686   }
00687 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::remove_associations ( const WriterIdSeq writers,
bool  callback 
) [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 447 of file DataReaderImpl.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::entity_deleted_, is_bit_, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, remove_associations_i(), OpenDDS::DCPS::TransportClient::stop_associating(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), writers_, and writers_lock_.

Referenced by remove_all_associations().

00449 {
00450   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00451 
00452   if (writers.length() == 0) {
00453     return;
00454   }
00455 
00456   if (DCPS_debug_level >= 1) {
00457     GuidConverter reader_converter(subscription_id_);
00458     GuidConverter writer_converter(writers[0]);
00459     ACE_DEBUG((LM_DEBUG,
00460         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00461         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00462         is_bit_,
00463         OPENDDS_STRING(reader_converter).c_str(),
00464         OPENDDS_STRING(writer_converter).c_str(),
00465         writers.length()));
00466   }
00467   if (!this->entity_deleted_.value()) {
00468     // stop pending associations for these writer ids
00469     this->stop_associating(writers.get_buffer(), writers.length());
00470 
00471     // writers which are considered non-active and can
00472     // be removed immediately
00473     WriterIdSeq non_active_writers;
00474     {
00475       CORBA::ULong wr_len = writers.length();
00476       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00477 
00478       for (CORBA::ULong i = 0; i < wr_len; i++) {
00479         PublicationId writer_id = writers[i];
00480 
00481         WriterMapType::iterator it = this->writers_.find(writer_id);
00482         if (it != this->writers_.end() &&
00483             it->second->active()) {
00484           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00485         } else {
00486           push_back(non_active_writers, writer_id);
00487         }
00488       }
00489     }
00490     remove_associations_i(non_active_writers, notify_lost);
00491   } else {
00492     remove_associations_i(writers, notify_lost);
00493   }
00494 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
) [protected, virtual]

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1

Definition at line 513 of file DataReaderImpl.cpp.

References ACE_TEXT(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), end_historic_sweeper_, id_to_handle_map_, is_bit_, CORBA::is_nil(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), LM_DEBUG, lookup_instance_handles(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.

Referenced by remove_associations(), and remove_publication().

00515 {
00516   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00517 
00518   if (writers.length() == 0) {
00519     return;
00520   }
00521 
00522   if (DCPS_debug_level >= 1) {
00523     GuidConverter reader_converter(subscription_id_);
00524     GuidConverter writer_converter(writers[0]);
00525     ACE_DEBUG((LM_DEBUG,
00526         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00527         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00528         is_bit_,
00529         OPENDDS_STRING(reader_converter).c_str(),
00530         OPENDDS_STRING(writer_converter).c_str(),
00531         writers.length()));
00532   }
00533   DDS::InstanceHandleSeq handles;
00534 
00535   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00536 
00537   // This is used to hold the list of writers which were actually
00538   // removed, which is a proper subset of the writers which were
00539   // requested to be removed.
00540   WriterIdSeq updated_writers;
00541 
00542   CORBA::ULong wr_len;
00543 
00544   //Remove the writers from writer list. If the supplied writer
00545   //is not in the cached writers list then it is already removed.
00546   //We just need remove the writers in the list that have not been
00547   //removed.
00548   {
00549     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00550 
00551     wr_len = writers.length();
00552 
00553     for (CORBA::ULong i = 0; i < wr_len; i++) {
00554       PublicationId writer_id = writers[i];
00555 
00556       WriterMapType::iterator it = this->writers_.find(writer_id);
00557 
00558       if (it != this->writers_.end()) {
00559         it->second->removed();
00560         end_historic_sweeper_->cancel_timer(it->second);
00561         remove_association_sweeper_->cancel_timer(it->second);
00562       }
00563 
00564       if (this->writers_.erase(writer_id) == 0) {
00565         if (DCPS_debug_level >= 1) {
00566           GuidConverter converter(writer_id);
00567           ACE_DEBUG((LM_DEBUG,
00568               ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00569               ACE_TEXT("the writer local %C was already removed.\n"),
00570               OPENDDS_STRING(converter).c_str()));
00571         }
00572 
00573       } else {
00574         push_back(updated_writers, writer_id);
00575       }
00576     }
00577   }
00578 
00579   wr_len = updated_writers.length();
00580 
00581   // Return now if the supplied writers have been removed already.
00582   if (wr_len == 0) {
00583     return;
00584   }
00585 
00586   if (!is_bit_) {
00587     // The writer should be in the id_to_handle map at this time.
00588     this->lookup_instance_handles(updated_writers, handles);
00589 
00590     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00591       id_to_handle_map_.erase(updated_writers[i]);
00592     }
00593   }
00594 
00595   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00596     {
00597       this->disassociate(updated_writers[i]);
00598     }
00599   }
00600 
00601   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00602   if (!this->is_bit_) {
00603     // Derive the change in the number of publications writing to this reader.
00604     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00605     this->subscription_match_status_.current_count_change
00606     = matchedPublications - this->subscription_match_status_.current_count;
00607 
00608     // Only process status if the number of publications has changed.
00609     if (this->subscription_match_status_.current_count_change != 0) {
00610       this->subscription_match_status_.current_count = matchedPublications;
00611 
00612       /// Section 7.1.4.1: total_count will not decrement.
00613 
00614       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00615       this->subscription_match_status_.last_publication_handle
00616       = handles[ wr_len - 1];
00617 
00618       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00619 
00620       DDS::DataReaderListener_var listener
00621       = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00622 
00623       if (!CORBA::is_nil(listener.in())) {
00624         listener->on_subscription_matched(this, this->subscription_match_status_);
00625 
00626         // Client will look at it so next time it looks the change should be 0
00627         this->subscription_match_status_.total_count_change = 0;
00628         this->subscription_match_status_.current_count_change = 0;
00629       }
00630       notify_status_condition();
00631     }
00632   }
00633 
00634   // If this remove_association is invoked when the InfoRepo
00635   // detects a lost writer then make a callback to notify
00636   // subscription lost.
00637   if (notify_lost) {
00638     this->notify_subscription_lost(handles);
00639   }
00640 
00641   if (this->monitor_) {
00642     this->monitor_->report();
00643   }
00644 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::remove_publication ( const PublicationId pub_id  )  [protected]

Definition at line 497 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::push_back(), remove_associations_i(), writers_, and writers_lock_.

00498 {
00499   WriterIdSeq writers;
00500   bool notify = false;
00501   {
00502     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00503     WriterMapType::iterator where = writers_.find(pub_id);
00504     if (writers_.end() != where) {
00505       notify = where->second->notify_lost_;
00506       push_back(writers, pub_id);
00507     }
00508   }
00509   remove_associations_i(writers, notify);
00510 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::reschedule_deadline (  ) 

Definition at line 2802 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, instances_lock_, LM_ERROR, and watchdog_.

02803 {
02804   if (this->watchdog_.in()) {
02805     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02806     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02807         iter != this->instances_.end();
02808         ++iter) {
02809       if (iter->second->deadline_timer_id_ != -1) {
02810         if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02811           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02812               ACE_TEXT("reset_timer_interval")));
02813         }
02814       }
02815     }
02816   }
02817 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::reset_coherent_info ( const PublicationId writer_id,
const RepoId publisher_id 
)

Definition at line 2987 of file DataReaderImpl.cpp.

References writers_, and writers_lock_.

Referenced by reject_coherent().

02989 {
02990   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
02991 
02992   WriterMapType::iterator itEnd = this->writers_.end();
02993   for (WriterMapType::iterator it = this->writers_.begin();
02994       it != itEnd; ++it) {
02995     if (it->second->writer_id_ == writer_id
02996         && it->second->publisher_id_ == publisher_id) {
02997       it->second->reset_coherent_info();
02998     }
02999   }
03000 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::reset_latency_stats (  )  [virtual]

Clear any intermediate statistical values.

Implements OpenDDS::DCPS::DataReaderEx.

Definition at line 2382 of file DataReaderImpl.cpp.

References statistics_.

02383 {
02384   for (StatsMapType::iterator current = this->statistics_.begin();
02385       current != this->statistics_.end();
02386       ++current) {
02387     current->second.reset_stats();
02388   }
02389 }

void OpenDDS::DCPS::DataReaderImpl::reset_ownership ( DDS::InstanceHandle_t  instance  ) 
void OpenDDS::DCPS::DataReaderImpl::resume_sample_processing ( const PublicationId pub_id  )  [private]

when done handling historic samples, resume

Definition at line 3168 of file DataReaderImpl.cpp.

References deliver_historic(), end_historic_sweeper_, OpenDDS::DCPS::WriterInfo::last_historic_seq_, OPENDDS_MAP(), OpenDDS::DCPS::WriterInfo::waiting_for_end_historic_samples_, writers_, and writers_lock_.

Referenced by add_link(), and data_received().

03169 {
03170   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03171   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03172   WriterMapType::iterator where = writers_.find(pub_id);
03173   if (writers_.end() != where) {
03174     WriterInfo& info = *where->second;
03175     // Stop filtering these
03176     if (info.waiting_for_end_historic_samples_) {
03177       end_historic_sweeper_->cancel_timer(where->second);
03178       if (!info.historic_samples_.empty()) {
03179         info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03180       }
03181       to_deliver.swap(info.historic_samples_);
03182       write_guard.release();
03183       deliver_historic(to_deliver);
03184     }
03185   }
03186 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::sample_info ( DDS::SampleInfo sample_info,
const ReceivedDataElement ptr 
) [protected]

Definition at line 1793 of file DataReaderImpl.cpp.

References DDS::SampleInfo::absolute_generation_rank, DDS::SampleInfo::disposed_generation_count, OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::SampleInfo::no_writers_generation_count, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::sample_rank, and OpenDDS::DCPS::ReceivedDataElement::sequence_.

01795 {
01796 
01797   sample_info.sample_rank = 0;
01798 
01799   // generation_rank =
01800   //    (MRSIC.disposed_generation_count +
01801   //     MRSIC.no_writers_generation_count)
01802   //  - (S.disposed_generation_count +
01803   //     S.no_writers_generation_count)
01804   //
01805   sample_info.generation_rank =
01806       (sample_info.disposed_generation_count +
01807           sample_info.no_writers_generation_count) -
01808           sample_info.generation_rank;
01809 
01810   // absolute_generation_rank =
01811   //     (MRS.disposed_generation_count +
01812   //      MRS.no_writers_generation_count)
01813   //   - (S.disposed_generation_count +
01814   //      S.no_writers_generation_count)
01815   //
01816   sample_info.absolute_generation_rank =
01817       (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01818           static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01819           sample_info.absolute_generation_rank;
01820 
01821   sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01822 }

Here is the call graph for this function:

virtual void OpenDDS::DCPS::DataReaderImpl::set_instance_state ( DDS::InstanceHandle_t  instance,
DDS::InstanceStateKind  state 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_listener ( DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 930 of file DataReaderImpl.cpp.

References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.

Referenced by cleanup().

00933 {
00934   listener_mask_ = mask;
00935   //note: OK to duplicate  a nil object ref
00936   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00937   return DDS::RETCODE_OK;
00938 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_qos ( const DDS::DataReaderQos qos  )  [virtual]

Definition at line 846 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, dp_id_, OpenDDS::DCPS::EntityImpl::enabled_, get_subscriber_servant(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, qos_, qos_change(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00848 {
00849 
00850   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00852   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00853 
00854   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00855     if (qos_ == qos)
00856       return DDS::RETCODE_OK;
00857 
00858     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00859       return DDS::RETCODE_IMMUTABLE_POLICY;
00860 
00861     } else {
00862       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00863       DDS::SubscriberQos subscriberQos;
00864 
00865       RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
00866       bool status = false;
00867       if (subscriber) {
00868         subscriber->get_qos(subscriberQos);
00869         status =
00870           disco->update_subscription_qos(
00871               domain_id_,
00872               dp_id_,
00873               this->subscription_id_,
00874               qos,
00875               subscriberQos);
00876       }
00877       if (!status) {
00878         ACE_ERROR_RETURN((LM_ERROR,
00879             ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00880             ACE_TEXT("qos not updated. \n")),
00881             DDS::RETCODE_ERROR);
00882       }
00883     }
00884 
00885 
00886     qos_change(qos);
00887     qos_ = qos;
00888 
00889     return DDS::RETCODE_OK;
00890 
00891   } else {
00892     return DDS::RETCODE_INCONSISTENT_POLICY;
00893   }
00894 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::set_sample_lost_status ( const DDS::SampleLostStatus status  )  [protected]

!!caller should have acquired sample_lock_

Definition at line 2248 of file DataReaderImpl.cpp.

References sample_lost_status_.

02250 {
02251   //!!!caller should have acquired sample_lock_
02252   sample_lost_status_ = status;
02253 }

void OpenDDS::DCPS::DataReaderImpl::set_sample_rejected_status ( const DDS::SampleRejectedStatus status  )  [protected]

!!caller should have acquired sample_lock_

Definition at line 2256 of file DataReaderImpl.cpp.

References sample_rejected_status_.

02258 {
02259   //!!!caller should have acquired sample_lock_
02260   sample_rejected_status_ = status;
02261 }

void OpenDDS::DCPS::DataReaderImpl::set_subscriber_qos ( const DDS::SubscriberQos qos  ) 

Definition at line 3122 of file DataReaderImpl.cpp.

References subqos_.

03124 {
03125   this->subqos_ = qos;
03126 }

void OpenDDS::DCPS::DataReaderImpl::signal_liveliness ( const RepoId remote_participant  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 737 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::GUID_t::entityId, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, instances_, instances_lock_, OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.

00738 {
00739   RepoId prefix = remote_participant;
00740   prefix.entityId = EntityId_t();
00741 
00742   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00743 
00744   typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00745   typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00746   WriterSet writers;
00747 
00748   {
00749     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00751            limit = writers_.end();
00752          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00753          ++pos) {
00754       writers.push_back(std::make_pair(pos->first, pos->second));
00755     }
00756   }
00757 
00758   ACE_Time_Value when = ACE_OS::gettimeofday();
00759   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00760        pos != limit;
00761        ++pos) {
00762     pos->second->received_activity(when);
00763   }
00764 
00765   if (!writers.empty()) {
00766     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00767     for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00768          pos != limit;
00769          ++pos) {
00770       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00771            iter != instances_.end();
00772            ++iter) {
00773         SubscriptionInstance_rch ptr = iter->second;
00774         ptr->instance_state_.lively(pos->first);
00775       }
00776     }
00777   }
00778 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::statistics_enabled ( CORBA::Boolean  statistics_enabled  )  [virtual]

Definition at line 2398 of file DataReaderImpl.cpp.

References statistics_enabled_.

02400 {
02401   this->statistics_enabled_ = statistics_enabled;
02402 }

CORBA::Boolean OpenDDS::DCPS::DataReaderImpl::statistics_enabled (  )  [virtual]

Definition at line 2392 of file DataReaderImpl.cpp.

References statistics_enabled_.

Referenced by process_latency().

02393 {
02394   return this->statistics_enabled_;
02395 }

Here is the caller graph for this function:

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::take ( AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]
bool OpenDDS::DCPS::DataReaderImpl::time_based_filter_instance ( const SubscriptionInstance_rch instance,
ACE_Time_Value filter_time_expired 
) [protected]

Definition at line 2697 of file DataReaderImpl.cpp.

References DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, ACE_OS::gettimeofday(), qos_, DDS::DataReaderQos::time_based_filter, and OpenDDS::DCPS::time_value_to_duration().

02698 {
02699   ACE_Time_Value now(ACE_OS::gettimeofday());
02700 
02701   // TIME_BASED_FILTER processing; expire data samples
02702   // if minimum separation is not met for instance.
02703   const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02704 
02705   if (qos_.time_based_filter.minimum_separation > zero) {
02706     filter_time_expired = now - instance->last_accepted_;
02707     DDS::Duration_t separation = time_value_to_duration(filter_time_expired);
02708 
02709     if (separation < qos_.time_based_filter.minimum_separation) {
02710       return true;  // Data filtered.
02711     }
02712   }
02713 
02714   instance->last_accepted_ = now;
02715 
02716   return false;
02717 }

Here is the call graph for this function:

CORBA::Long OpenDDS::DCPS::DataReaderImpl::total_samples (  )  const [protected]

!!caller should have acquired sample_lock_

Definition at line 1824 of file DataReaderImpl.cpp.

References instances_, and instances_lock_.

01825 {
01826   //!!!caller should have acquired sample_lock_
01827   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01828 
01829   CORBA::Long count(0);
01830 
01831   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01832       iter != instances_.end();
01833       ++iter) {
01834     SubscriptionInstance_rch ptr = iter->second;
01835 
01836     count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01837   }
01838 
01839   return count;
01840 }

void OpenDDS::DCPS::DataReaderImpl::transport_assoc_done ( int  flags,
const RepoId remote_id 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 324 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, OpenDDS::DCPS::TransportClient::ASSOC_OK, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dp_id_, id_to_handle_map_, is_bit_, CORBA::is_nil(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, liveliness_timer_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::Monitor::report(), sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, writers_, writers_lock_, and ACE_Time_Value::zero.

00325 {
00326   if (!(flags & ASSOC_OK)) {
00327     if (DCPS_debug_level) {
00328       const GuidConverter conv(remote_id);
00329       ACE_ERROR((LM_ERROR,
00330           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00331           ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00332           OPENDDS_STRING(conv).c_str()));
00333     }
00334     return;
00335   }
00336 
00337   const bool active = flags & ASSOC_ACTIVE;
00338   {
00339 
00340     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00341 
00342     // LIVELINESS policy timers are managed here.
00343     if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00344       if (DCPS_debug_level >= 5) {
00345         GuidConverter converter(subscription_id_);
00346         ACE_DEBUG((LM_DEBUG,
00347             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00348             ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00349             OPENDDS_STRING(converter).c_str()));
00350       }
00351       // this call will start the timer if it is not already set
00352       liveliness_timer_->check_liveliness();
00353     }
00354   }
00355   // We no longer hold the publication_handle_lock_.
00356 
00357   if (!is_bit_) {
00358 
00359     RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00360 
00361     if (!participant)
00362       return;
00363 
00364     DDS::InstanceHandle_t handle = participant->id_to_handle(remote_id);
00365 
00366     // We acquire the publication_handle_lock_ for the remainder of our
00367     // processing.
00368     {
00369       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00370 
00371       // This insertion is idempotent.
00372       id_to_handle_map_.insert(
00373           RepoIdToHandleMap::value_type(remote_id, handle));
00374 
00375       if (DCPS_debug_level > 4) {
00376         GuidConverter converter(remote_id);
00377         ACE_DEBUG((LM_DEBUG,
00378             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00379             ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00380             OPENDDS_STRING(converter).c_str(),
00381             handle));
00382       }
00383 
00384       // We need to adjust these after the insertions have all completed
00385       // since insertions are not guaranteed to increase the number of
00386       // currently matched publications.
00387       const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00388       subscription_match_status_.current_count_change =
00389           matchedPublications - subscription_match_status_.current_count;
00390       subscription_match_status_.current_count = matchedPublications;
00391 
00392       ++subscription_match_status_.total_count;
00393       ++subscription_match_status_.total_count_change;
00394 
00395       subscription_match_status_.last_publication_handle = handle;
00396 
00397       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00398 
00399       DDS::DataReaderListener_var listener =
00400           listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00401 
00402       if (!CORBA::is_nil(listener)) {
00403         listener->on_subscription_matched(this, subscription_match_status_);
00404 
00405         // TBD - why does the spec say to change this but not change
00406         //       the ChangeFlagStatus after a listener call?
00407 
00408         // Client will look at it so next time it looks the change should be 0
00409         subscription_match_status_.total_count_change = 0;
00410         subscription_match_status_.current_count_change = 0;
00411       }
00412 
00413       notify_status_condition();
00414     }
00415 
00416     {
00417       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00418       ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00419 
00420       if(!writers_.count(remote_id)){
00421         return;
00422       }
00423       writers_[remote_id]->handle_ = handle;
00424     }
00425   }
00426 
00427   if (!active) {
00428     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00429 
00430     disco->association_complete(domain_id_, dp_id_,
00431         subscription_id_, remote_id);
00432   }
00433 
00434   if (monitor_) {
00435     monitor_->report();
00436   }
00437 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::unregister_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3252 of file DataReaderImpl.cpp.

03255 {
03256   TransportClient::unregister_for_writer(participant, readerid, writerid);
03257 }

void OpenDDS::DCPS::DataReaderImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 690 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, publication_handle_lock_, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00691 {
00692   DDS::DataReaderListener_var listener =
00693       listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00694 
00695   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00696       guard,
00697       this->publication_handle_lock_);
00698 
00699 
00700   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00701     // This test should make the method idempotent.
00702     return;
00703   }
00704 
00705   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00706       true);
00707 
00708   // copy status and increment change
00709   requested_incompatible_qos_status_.total_count = status.total_count;
00710   requested_incompatible_qos_status_.total_count_change +=
00711       status.count_since_last_send;
00712   requested_incompatible_qos_status_.last_policy_id =
00713       status.last_policy_id;
00714   requested_incompatible_qos_status_.policies = status.policies;
00715 
00716   if (!CORBA::is_nil(listener.in())) {
00717     listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
00718 
00719     // TBD - why does the spec say to change total_count_change but not
00720     // change the ChangeFlagStatus after a listener call?
00721 
00722     // client just looked at it so next time it looks the
00723     // change should be 0
00724     requested_incompatible_qos_status_.total_count_change = 0;
00725   }
00726 
00727   notify_status_condition();
00728 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::update_ownership_strength ( const PublicationId pub_id,
const CORBA::Long ownership_strength 
)

Definition at line 2865 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, writers_, and writers_lock_.

02867 {
02868   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02869       read_guard,
02870       this->writers_lock_);
02871   for (WriterMapType::iterator iter = writers_.begin();
02872       iter != writers_.end();
02873       ++iter) {
02874     if (iter->second->writer_id_ == pub_id) {
02875       if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
02876         if (DCPS_debug_level >= 1) {
02877           GuidConverter reader_converter(this->subscription_id_);
02878           GuidConverter writer_converter(pub_id);
02879           ACE_DEBUG((LM_DEBUG,
02880               ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
02881               ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
02882               OPENDDS_STRING(reader_converter).c_str(),
02883               OPENDDS_STRING(writer_converter).c_str(),
02884               iter->second->writer_qos_.ownership_strength, ownership_strength));
02885         }
02886         iter->second->writer_qos_.ownership_strength.value = ownership_strength;
02887         iter->second->clear_owner_evaluated ();
02888       }
02889       break;
02890     }
02891   }
02892 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::update_subscription_params ( const DDS::StringSeq params  )  const

Definition at line 3146 of file DataReaderImpl.cpp.

References domain_id_, dp_id_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and TheServiceParticipant.

03147 {
03148   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03149   disco->update_subscription_params(domain_id_,
03150       dp_id_,
03151       subscription_id_,
03152       params);
03153 }

bool OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion ( WriterInfo writer  )  [private]

Definition at line 2896 of file DataReaderImpl.cpp.

References accept_coherent(), OpenDDS::DCPS::WriterInfo::coherent_change_received(), coherent_changes_completed(), OpenDDS::DCPS::COMPLETED, get_subscriber_servant(), OpenDDS::DCPS::WriterInfo::group_coherent_, DDS::INSTANCE_PRESENTATION_QOS, OpenDDS::DCPS::NOT_COMPLETED_YET, DDS::SubscriberQos::presentation, OpenDDS::DCPS::WriterInfo::publisher_id_, reject_coherent(), OpenDDS::DCPS::REJECTED, OpenDDS::DCPS::WriterInfo::reset_coherent_info(), state, subqos_, and OpenDDS::DCPS::WriterInfo::writer_id_.

Referenced by accept_sample_processing(), and data_received().

02897 {
02898   if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
02899       || ! this->subqos_.presentation.coherent_access) {
02900     this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02901     this->coherent_changes_completed (this);
02902     return true;
02903   }
02904 
02905   // verify current coherent changes from single writer
02906   Coherent_State state = writer->coherent_change_received();
02907   if (writer->group_coherent_) { // GROUP coherent
02908     RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
02909     if (subscriber && state != NOT_COMPLETED_YET) {
02910       // verify if all readers received complete coherent changes in a group.
02911       subscriber->coherent_change_received (
02912           writer->publisher_id_, this, state);
02913     }
02914   }
02915   else {  // TOPIC coherent
02916     if (state == COMPLETED) {
02917       this->accept_coherent (writer->writer_id_, writer->publisher_id_);
02918     }
02919     else if (state == REJECTED) {
02920       this->reject_coherent (writer->writer_id_, writer->publisher_id_);
02921     }
02922     else {// NOT_COMPLETED
02923       return false;
02924     }
02925 
02926     // decision made: either COMPLETED or REJECTED
02927     writer->reset_coherent_info ();
02928   }
02929 
02930   return state == COMPLETED;
02931 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::wait_for_historical_data ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 1058 of file DataReaderImpl.cpp.

01060 {
01061   // Add your implementation here
01062   return 0;
01063 }

void OpenDDS::DCPS::DataReaderImpl::writer_activity ( const DataSampleHeader header  ) 

update liveliness info for this writer.

Definition at line 1294 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, ACE_OS::gettimeofday(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::UNREGISTER_INSTANCE, writers_, and writers_lock_.

Referenced by data_received().

01295 {
01296   // caller should have the sample_lock_ !!!
01297 
01298   RcHandle<WriterInfo> writer;
01299 
01300   // The received_activity() has to be called outside the writers_lock_
01301   // because it probably acquire writers_lock_ read lock recursively
01302   // (in handle_timeout). This could cause deadlock when there are writers
01303   // waiting.
01304   {
01305     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01306     WriterMapType::iterator iter = writers_.find(header.publication_id_);
01307 
01308     if (iter != writers_.end()) {
01309       writer = iter->second;
01310 
01311     } else if (DCPS_debug_level > 4) {
01312       // This may not be an error since it could happen that the sample
01313       // is delivered to the datareader after the write is dis-associated
01314       // with this datareader.
01315       GuidConverter reader_converter(subscription_id_);
01316       GuidConverter writer_converter(header.publication_id_);
01317       ACE_DEBUG((LM_DEBUG,
01318           ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01319           ACE_TEXT("reader %C is not associated with writer %C.\n"),
01320           OPENDDS_STRING(reader_converter).c_str(),
01321           OPENDDS_STRING(writer_converter).c_str()));
01322     }
01323   }
01324 
01325   if (!writer.is_nil()) {
01326     ACE_Time_Value when = ACE_OS::gettimeofday();
01327     writer->received_activity(when);
01328 
01329     if ((header.message_id_ == SAMPLE_DATA) ||
01330         (header.message_id_ == INSTANCE_REGISTRATION) ||
01331         (header.message_id_ == UNREGISTER_INSTANCE) ||
01332         (header.message_id_ == DISPOSE_INSTANCE) ||
01333         (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01334 
01335       const SequenceNumber defaultSN;
01336       SequenceRange resetRange(defaultSN, header.sequence_);
01337 
01338 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01339       if (header.coherent_change_) {
01340         if (writer->coherent_samples_ == 0) {
01341           writer->coherent_sample_sequence_.reset();
01342           writer->coherent_sample_sequence_.insert(resetRange);
01343         }
01344         else {
01345           writer->coherent_sample_sequence_.insert(header.sequence_);
01346         }
01347       }
01348 #endif
01349     }
01350   }
01351 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::writer_became_alive ( WriterInfo info,
const ACE_Time_Value when 
) [virtual]

tell instances when a DataWriter transitions to being alive The writer state is inout parameter, it has to be set ALIVE before handle_timeout is called since some subroutine use the state.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2086 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, liveliness_timer_, LM_DEBUG, LM_ERROR, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, OpenDDS::DCPS::Monitor::report(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02088 {
02089   if (DCPS_debug_level >= 5) {
02090     GuidConverter reader_converter(subscription_id_);
02091     GuidConverter writer_converter(info.writer_id_);
02092     ACE_DEBUG((LM_DEBUG,
02093         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02094         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02095         OPENDDS_STRING(reader_converter).c_str(),
02096         OPENDDS_STRING(writer_converter).c_str(),
02097         info.get_state_str().c_str()));
02098   }
02099 
02100   // caller should already have the samples_lock_ !!!
02101 
02102   // NOTE: each instance will change to ALIVE_STATE when they receive a sample
02103 
02104   bool liveliness_changed = false;
02105 
02106   if (info.state_ != WriterInfo::ALIVE) {
02107     liveliness_changed_status_.alive_count++;
02108     liveliness_changed_status_.alive_count_change++;
02109     liveliness_changed = true;
02110   }
02111 
02112   if (info.state_ == WriterInfo::DEAD) {
02113     liveliness_changed_status_.not_alive_count--;
02114     liveliness_changed_status_.not_alive_count_change--;
02115     liveliness_changed = true;
02116   }
02117 
02118   liveliness_changed_status_.last_publication_handle = info.handle_;
02119 
02120   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02121 
02122   if (liveliness_changed_status_.alive_count < 0) {
02123     ACE_ERROR((LM_ERROR,
02124         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02125         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02126         liveliness_changed_status_.alive_count));
02127     return;
02128   }
02129 
02130   if (liveliness_changed_status_.not_alive_count < 0) {
02131     ACE_ERROR((LM_ERROR,
02132         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02133         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02134         liveliness_changed_status_.not_alive_count));
02135     return;
02136   }
02137 
02138   // Change the state to ALIVE since handle_timeout may call writer_became_dead
02139   // which need the current state info.
02140   info.state_ = WriterInfo::ALIVE;
02141 
02142   if (this->monitor_) {
02143     this->monitor_->report();
02144   }
02145 
02146   // Call listener only when there are liveliness status changes.
02147   if (liveliness_changed) {
02148     // Avoid possible deadlock by releasing sample_lock_.
02149     // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock()
02150     // for information about the locks involved.
02151     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02152     this->notify_liveliness_change();
02153   }
02154 
02155   // this call will start the liveliness timer if it is not already set
02156   liveliness_timer_->check_liveliness();
02157 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::writer_became_dead ( WriterInfo info,
const ACE_Time_Value when 
) [virtual]

tell instances when a DataWriter transitions to DEAD The writer state is inout parameter, the state is set to DEAD when it returns.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2160 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, LM_ERROR, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::WriterInfo::NOT_SET, notify_liveliness_change(), OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02162 {
02163   if (DCPS_debug_level >= 5) {
02164     GuidConverter reader_converter(subscription_id_);
02165     GuidConverter writer_converter(info.writer_id_);
02166     ACE_DEBUG((LM_DEBUG,
02167         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02168         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02169 
02170         OPENDDS_STRING(reader_converter).c_str(),
02171         OPENDDS_STRING(writer_converter).c_str(),
02172         info.get_state_str().c_str()));
02173   }
02174 
02175 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02176   OwnershipManagerPtr owner_manager = this->ownership_manager();
02177   if (owner_manager) {
02178     owner_manager->remove_writer (info.writer_id_);
02179     info.clear_owner_evaluated ();
02180   }
02181 #endif
02182 
02183   // caller should already have the samples_lock_ !!!
02184   bool liveliness_changed = false;
02185 
02186   if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02187     liveliness_changed_status_.not_alive_count++;
02188     liveliness_changed_status_.not_alive_count_change++;
02189     liveliness_changed = true;
02190   }
02191 
02192   if (info.state_ == WriterInfo::ALIVE) {
02193     liveliness_changed_status_.alive_count--;
02194     liveliness_changed_status_.alive_count_change--;
02195     liveliness_changed_status_.not_alive_count++;
02196     liveliness_changed_status_.not_alive_count_change++;
02197     liveliness_changed = true;
02198   }
02199 
02200   liveliness_changed_status_.last_publication_handle = info.handle_;
02201 
02202   //update the state to DEAD.
02203   info.state_ = WriterInfo::DEAD;
02204 
02205   if (this->monitor_) {
02206     this->monitor_->report();
02207   }
02208 
02209   if (liveliness_changed_status_.alive_count < 0) {
02210     ACE_ERROR((LM_ERROR,
02211         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02212         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02213         liveliness_changed_status_.alive_count));
02214     return;
02215   }
02216 
02217   if (liveliness_changed_status_.not_alive_count < 0) {
02218     ACE_ERROR((LM_ERROR,
02219         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02220         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02221         liveliness_changed_status_.not_alive_count));
02222     return;
02223   }
02224 
02225   instances_liveliness_update(info, when);
02226 
02227   // Call listener only when there are liveliness status changes.
02228   if (liveliness_changed) {
02229     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02230     this->notify_liveliness_change();
02231   }
02232 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataReaderImpl::writer_removed ( WriterInfo info  )  [virtual]

tell instance when a DataWriter is removed. The liveliness status need update.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2042 of file DataReaderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, ACE_OS::gettimeofday(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02043 {
02044   if (DCPS_debug_level >= 5) {
02045     GuidConverter reader_converter(subscription_id_);
02046     GuidConverter writer_converter(info.writer_id_);
02047     ACE_DEBUG((LM_DEBUG,
02048         ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02049         ACE_TEXT("reader %C from writer %C.\n"),
02050         OPENDDS_STRING(reader_converter).c_str(),
02051         OPENDDS_STRING(writer_converter).c_str()));
02052   }
02053 
02054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02055   OwnershipManagerPtr owner_manager = this->ownership_manager();
02056   if (owner_manager) {
02057     owner_manager->remove_writer (info.writer_id_);
02058     info.clear_owner_evaluated ();
02059   }
02060 #endif
02061 
02062   bool liveliness_changed = false;
02063 
02064   if (info.state_ == WriterInfo::ALIVE) {
02065     -- liveliness_changed_status_.alive_count;
02066     -- liveliness_changed_status_.alive_count_change;
02067     liveliness_changed = true;
02068   }
02069 
02070   if (info.state_ == WriterInfo::DEAD) {
02071     -- liveliness_changed_status_.not_alive_count;
02072     -- liveliness_changed_status_.not_alive_count_change;
02073     liveliness_changed = true;
02074   }
02075 
02076   liveliness_changed_status_.last_publication_handle = info.handle_;
02077   instances_liveliness_update(info, ACE_OS::gettimeofday());
02078 
02079   if (liveliness_changed) {
02080     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02081     this->notify_liveliness_change();
02082   }
02083 }

Here is the call graph for this function:


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 697 of file DataReaderImpl.h.

friend class EndHistoricSamplesMissedSweeper [friend]

Definition at line 694 of file DataReaderImpl.h.

friend class InstanceState [friend]

Definition at line 693 of file DataReaderImpl.h.

friend class OwnershipManagerPtr [friend]

Definition at line 460 of file DataReaderImpl.h.

friend class QueryConditionImpl [friend]

Definition at line 199 of file DataReaderImpl.h.

Referenced by create_querycondition().

friend class RemoveAssociationSweeper< DataReaderImpl > [friend]

Definition at line 695 of file DataReaderImpl.h.

friend class RequestedDeadlineWatchdog [friend]

Definition at line 198 of file DataReaderImpl.h.

friend class SubscriberImpl [friend]

Definition at line 200 of file DataReaderImpl.h.


Member Data Documentation

Definition at line 829 of file DataReaderImpl.h.

Referenced by filter_sample().

Definition at line 728 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and notify_latency().

Is accessing to Group coherent changes ?

Definition at line 646 of file DataReaderImpl.h.

Referenced by begin_access(), and end_access().

Definition at line 641 of file DataReaderImpl.h.

Referenced by enable(), enable_filtering(), get_cf_topic(), and get_topicdescription().

Definition at line 711 of file DataReaderImpl.h.

Referenced by enable().

Definition at line 708 of file DataReaderImpl.h.

Referenced by add_link(), remove_associations_i(), and resume_sample_processing().

Ordered group samples.

Definition at line 649 of file DataReaderImpl.h.

Referenced by end_access(), and get_ordered_data().

SubscriptionInstanceMapType OpenDDS::DCPS::DataReaderImpl::instances_ [mutable, protected]

Assume since the container is mutable(?!!?) it may need to use the lock while const. : remove the recursive nature of the instances_lock if not needed.

Definition at line 596 of file DataReaderImpl.h.

Referenced by accept_coherent(), contains_sample(), data_received(), get_handle_instance(), get_instance_handles(), get_ordered_data(), has_zero_copies(), have_instance_states(), have_sample_states(), have_view_states(), instances_liveliness_update(), reject_coherent(), release_instance(), reschedule_deadline(), signal_liveliness(), and total_samples().

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 827 of file DataReaderImpl.h.

Referenced by add_association(), init(), is_bit(), notify_subscription_lost(), notify_subscription_reconnected(), remove_associations(), remove_associations_i(), and transport_assoc_done().

Definition at line 636 of file DataReaderImpl.h.

Referenced by data_received(), init(), and ownership_filter_instance().

Definition at line 820 of file DataReaderImpl.h.

Referenced by enable(), get_requested_deadline_missed_status(), and qos_change().

DDS::DataReaderListener_var OpenDDS::DCPS::DataReaderImpl::listener_ [private]

Definition at line 700 of file DataReaderImpl.h.

Referenced by init(), listener_for(), notify_liveliness_change(), and set_listener().

Definition at line 818 of file DataReaderImpl.h.

Referenced by transport_assoc_done(), and writer_became_alive().

Definition at line 712 of file DataReaderImpl.h.

Referenced by enable().

Periodic Monitor object for this entity.

Definition at line 860 of file DataReaderImpl.h.

Referenced by DataReaderImpl().

Bound (or initial reservation) of raw latency buffer.

Definition at line 847 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_size().

Type of raw latency data buffer.

Definition at line 850 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_type().

Definition at line 619 of file DataReaderImpl.h.

Referenced by enable().

The orb's reactor to be used to register the liveliness timer.

Definition at line 744 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_reactor().

Definition at line 709 of file DataReaderImpl.h.

Referenced by remove_associations(), and remove_associations_i().

Definition at line 716 of file DataReaderImpl.h.

Statistics for this reader, collected for each writer.

Definition at line 844 of file DataReaderImpl.h.

Referenced by add_association(), get_latency_stats(), process_latency(), raw_latency_statistics(), and reset_latency_stats().

Flag indicating status of statistics gathering.

Definition at line 832 of file DataReaderImpl.h.

Referenced by statistics_enabled().

Definition at line 651 of file DataReaderImpl.h.

Referenced by set_subscriber_qos(), and verify_coherent_changes_completion().

Definition at line 707 of file DataReaderImpl.h.

Referenced by get_subscriber_servant(), init(), and parent().

Todo:
The subscription_lost_status_ and subscription_reconnecting_status_ are left here for future use when we add get_subscription_lost_status() and get_subscription_reconnecting_status() methods.

Definition at line 737 of file DataReaderImpl.h.

DDS::TopicDescription_var OpenDDS::DCPS::DataReaderImpl::topic_desc_ [private]

Definition at line 699 of file DataReaderImpl.h.

Referenced by get_topicdescription(), and init().

Definition at line 862 of file DataReaderImpl.h.

Referenced by disable_transport(), and enable().

Watchdog responsible for reporting missed offered deadlines.

Definition at line 823 of file DataReaderImpl.h.

Referenced by accept_sample_processing(), data_received(), enable(), qos_change(), and reschedule_deadline().

WriterMapType OpenDDS::DCPS::DataReaderImpl::writers_ [private]

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1