OpenDDS::DCPS::DataReaderImpl Class Reference

Implements the DDS::DataReader interface. More...

#include <DataReaderImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl:

Collaboration graph
[legend]
List of all members.

Public Types

typedef std::pair< PublicationId,
WriterInfo::WriterState
WriterStatePair

Public Member Functions

typedef OPENDDS_MAP (DDS::InstanceHandle_t, SubscriptionInstance *) SubscriptionInstanceMapType
typedef OPENDDS_MAP_CMP (PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType
 Type of collection of statistics for writers to this reader.
 DataReaderImpl ()
virtual ~DataReaderImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual void add_association (const RepoId &yourId, const WriterAssociation &writer, bool active)
virtual void transport_assoc_done (int flags, const RepoId &remote_id)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const WriterIdSeq &writers, bool callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void inconsistent_topic ()
virtual void signal_liveliness (const RepoId &remote_participant)
DDS::DataReaderListener_ptr listener_for (DDS::StatusKind kind)
void writer_became_alive (WriterInfo &info, const ACE_Time_Value &when)
void writer_became_dead (WriterInfo &info, const ACE_Time_Value &when)
void writer_removed (WriterInfo &info)
void cleanup ()
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber, DDS::DataReader_ptr dr_objref)
virtual DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
virtual DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t delete_contained_entities ()
virtual DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataReaderListener_ptr get_listener ()
virtual DDS::TopicDescription_ptr get_topicdescription ()
virtual DDS::Subscriber_ptr get_subscriber ()
virtual DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
virtual DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
virtual DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
virtual DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
virtual DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
virtual DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
virtual DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
virtual DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
virtual DDS::ReturnCode_t enable ()
virtual void get_latency_stats (OpenDDS::DCPS::LatencyStatisticsSeq &stats)
virtual void reset_latency_stats ()
virtual CORBA::Boolean statistics_enabled ()
virtual void statistics_enabled (CORBA::Boolean statistics_enabled)
void writer_activity (const DataSampleHeader &header)
 update liveliness info for this writer.
virtual void data_received (const ReceivedDataSample &sample)
 process a message that has been received - could be control or a data sample.
virtual bool check_transport_qos (const TransportInst &inst)
RepoId get_subscription_id () const
DDS::DataReader_ptr get_dr_obj_ref ()
char * get_topic_name () const
bool have_sample_states (DDS::SampleStateMask sample_states) const
bool have_view_states (DDS::ViewStateMask view_states) const
bool have_instance_states (DDS::InstanceStateMask instance_states) const
bool contains_sample (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)=0
virtual void dds_demarshal (const ReceivedDataSample &sample, SubscriptionInstance *&instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type)=0
virtual void dispose_unregister (const ReceivedDataSample &sample, SubscriptionInstance *&instance)
void process_latency (const ReceivedDataSample &sample)
void notify_latency (PublicationId writer)
CORBA::Long get_depth () const
size_t get_n_chunks () const
void liveliness_lost ()
void remove_all_associations ()
void notify_subscription_disconnected (const WriterIdSeq &pubids)
void notify_subscription_reconnected (const WriterIdSeq &pubids)
void notify_subscription_lost (const WriterIdSeq &pubids)
virtual void notify_connection_deleted (const RepoId &peerId)
void notify_liveliness_change ()
bool is_bit () const
virtual DDS::ReturnCode_t auto_return_loan (void *seq)=0
virtual int num_zero_copies ()
virtual void dec_ref_data_element (ReceivedDataElement *r)=0
void release_instance (DDS::InstanceHandle_t handle)
 Release the instance with the handle.
void reschedule_deadline ()
ACE_Reactor_Timer_Interface * get_reactor ()
RepoId get_topic_id ()
RepoId get_dp_id ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
typedef OPENDDS_VECTOR (WriterStatePair) WriterStatePairVec
void get_writer_states (WriterStatePairVec &writer_states)
void update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength)
OwnershipManagerownership_manager () const
virtual void delete_instance_map (void *map)=0
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance *&instance)=0
void enable_filtering (ContentFilteredTopicImpl *cft)
DDS::ContentFilteredTopic_ptr get_cf_topic () const
void update_subscription_params (const DDS::StringSeq &params) const
typedef OPENDDS_VECTOR (void *) GenericSeq
virtual DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)=0
virtual DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual DDS::InstanceHandle_t lookup_instance_generic (const void *data)=0
virtual DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
virtual void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state)=0
void begin_access ()
void end_access ()
void get_ordered_data (GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void accept_coherent (PublicationId &writer_id, RepoId &publisher_id)
void reject_coherent (PublicationId &writer_id, RepoId &publisher_id)
void coherent_change_received (RepoId publisher_id, Coherent_State &result)
void coherent_changes_completed (DataReaderImpl *reader)
void reset_coherent_info (const PublicationId &writer_id, const RepoId &publisher_id)
void set_subscriber_qos (const DDS::SubscriberQos &qos)
void reset_ownership (::DDS::InstanceHandle_t instance)
virtual EntityImplparent () const
void disable_transport ()
virtual void register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *)
virtual void unregister_for_writer (const RepoId &, const RepoId &, const RepoId &)
Raw Latency Statistics Interfaces
const StatsMapType & raw_latency_statistics () const
 Expose the statistics container.
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer.
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer.

Protected Types

typedef ACE_Reverse_Lock<
ACE_Recursive_Thread_Mutex > 
Reverse_Lock_t

Protected Member Functions

virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
void remove_or_reschedule (const PublicationId &pub_id)
void prepare_to_delete ()
SubscriberImplget_subscriber_servant ()
void post_read_or_take ()
virtual DDS::ReturnCode_t enable_specific ()=0
void sample_info (DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
CORBA::Long total_samples () const
void set_sample_lost_status (const DDS::SampleLostStatus &status)
void set_sample_rejected_status (const DDS::SampleRejectedStatus &status)
SubscriptionInstanceget_handle_instance (DDS::InstanceHandle_t handle)
DDS::InstanceHandle_t get_next_handle (const DDS::BuiltinTopicKey_t &key)
virtual void purge_data (SubscriptionInstance *instance)=0
virtual void release_instance_i (DDS::InstanceHandle_t handle)=0
bool has_readcondition (DDS::ReadCondition_ptr a_condition)
bool filter_sample (const DataSampleHeader &header)
bool filter_instance (SubscriptionInstance *instance, const PublicationId &pubid)
void notify_read_conditions ()
 Data has arrived into the cache, unblock waiting ReadConditions.
virtual void add_link (const DataLink_rch &link, const RepoId &peer)

Protected Attributes

SubscriptionInstanceMapType instances_
 : document why the instances_ container is mutable.
ACE_Recursive_Thread_Mutex instances_lock_
ReceivedDataAllocatorrd_allocator_
DDS::DataReaderQos qos_
DDS::SampleRejectedStatus sample_rejected_status_
DDS::SampleLostStatus sample_lost_status_
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses.
Reverse_Lock_t reverse_sample_lock_
DomainParticipantImplparticipant_servant_
TopicImpltopic_servant_
bool is_exclusive_ownership_
OwnershipManagerowner_manager_
DDS::ContentFilteredTopic_var content_filtered_topic_
bool coherent_
 Is accessing to Group coherent changes ?
GroupRakeData group_coherent_ordered_data_
 Ordered group samples.
DDS::SubscriberQos subqos_

Private Types

typedef VarLess< DDS::ReadConditionRCCompLess

Private Member Functions

void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
bool lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids.
void instances_liveliness_update (WriterInfo &info, const ACE_Time_Value &when)
bool verify_coherent_changes_completion (WriterInfo *writer)
bool coherent_change_received (WriterInfo *writer)
const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
Priority get_priority_value (const AssociationData &data) const
void resume_sample_processing (const PublicationId &pub_id)
 when done handling historic samples, resume
bool check_historic (const ReceivedDataSample &sample)
void deliver_historic (OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
 deliver samples that were held by check_historic()
void listener_add_ref ()
void listener_remove_ref ()
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (PublicationId, RcHandle< WriterInfo >, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader.
typedef OPENDDS_SET_CMP (DDS::ReadCondition_var, RCCompLess) ReadConditionSet

Private Attributes

DDS::TopicDescription_var topic_desc_
DDS::StatusMask listener_mask_
DDS::DataReaderListener_var listener_
DDS::DomainId_t domain_id_
SubscriberImplsubscriber_servant_
DDS::DataReader_var dr_local_objref_
EndHistoricSamplesMissedSweeperend_historic_sweeper_
RemoveAssociationSweeper<
DataReaderImpl > * 
remove_association_sweeper_
CORBA::Long depth_
size_t n_chunks_
ACE_Recursive_Thread_Mutex publication_handle_lock_
Reverse_Lock_t reverse_pub_handle_lock_
RepoIdToHandleMap id_to_handle_map_
DDS::LivelinessChangedStatus liveliness_changed_status_
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
DDS::SubscriptionMatchedStatus subscription_match_status_
BudgetExceededStatus budget_exceeded_status_
SubscriptionLostStatus subscription_lost_status_
ACE_Reactor_Timer_Interface * reactor_
LivelinessTimerliveliness_timer_
CORBA::Long last_deadline_missed_total_count_
RequestedDeadlineWatchdogwatchdog_
bool is_bit_
bool initialized_
 Flag indicates that the init() is called.
bool always_get_history_
bool statistics_enabled_
 Flag indicating status of statistics gathering.
WriterMapType writers_
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications.
StatsMapType statistics_
 Statistics for this reader, collected for each writer.
unsigned int raw_latency_buffer_size_
 Bound (or initial reservation) of raw latency buffer.
DataCollector< double >::OnFull raw_latency_buffer_type_
 Type of raw latency data buffer.
ReadConditionSet read_conditions_
Monitormonitor_
 Monitor object for this entity.
Monitorperiodic_monitor_
 Periodic Monitor object for this entity.
bool transport_disabled_

Friends

class RequestedDeadlineWatchdog
class QueryConditionImpl
class SubscriberImpl
class InstanceState
class EndHistoricSamplesMissedSweeper
class RemoveAssociationSweeper< DataReaderImpl >
class ::DDS_TEST

Classes

struct  GenericBundle
class  LivelinessTimer

Detailed Description

Implements the DDS::DataReader interface.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datareader which is specific to the data-type associated with the topic.

Definition at line 183 of file DataReaderImpl.h.


Member Typedef Documentation

typedef VarLess<DDS::ReadCondition> OpenDDS::DCPS::DataReaderImpl::RCCompLess [private]

Definition at line 841 of file DataReaderImpl.h.

typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::DataReaderImpl::Reverse_Lock_t [protected]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 619 of file DataReaderImpl.h.

typedef std::pair<PublicationId, WriterInfo::WriterState> OpenDDS::DCPS::DataReaderImpl::WriterStatePair

Definition at line 451 of file DataReaderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataReaderImpl::DataReaderImpl (  ) 

Definition at line 51 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, budget_exceeded_status_, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, DDS::SampleRejectedStatus::last_instance_handle, DDS::RequestedDeadlineMissedStatus::last_instance_handle, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::LivelinessChangedStatus::last_publication_handle, DDS::SampleRejectedStatus::last_reason, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::NOT_REJECTED, periodic_monitor_, DDS::RequestedIncompatibleQosStatus::policies, reactor_, requested_deadline_missed_status_, requested_incompatible_qos_status_, sample_lost_status_, sample_rejected_status_, subscription_match_status_, TheServiceParticipant, OpenDDS::DCPS::BudgetExceededStatus::total_count, DDS::SampleRejectedStatus::total_count, DDS::SampleLostStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count, OpenDDS::DCPS::BudgetExceededStatus::total_count_change, DDS::SampleRejectedStatus::total_count_change, DDS::SampleLostStatus::total_count_change, DDS::SubscriptionMatchedStatus::total_count_change, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::RequestedDeadlineMissedStatus::total_count_change.

00052 : rd_allocator_(0),
00053   qos_(TheServiceParticipant->initial_DataReaderQos()),
00054   reverse_sample_lock_(sample_lock_),
00055   participant_servant_(0),
00056   topic_servant_(0),
00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00058   is_exclusive_ownership_ (false),
00059 #endif
00060 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00061   owner_manager_ (0),
00062 #endif
00063     coherent_(false),
00064     subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00065     topic_desc_(0),
00066     listener_mask_(DEFAULT_STATUS_MASK),
00067     domain_id_(0),
00068     subscriber_servant_(0),
00069     end_historic_sweeper_(new EndHistoricSamplesMissedSweeper(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00070     remove_association_sweeper_(new RemoveAssociationSweeper<DataReaderImpl>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00071     n_chunks_(TheServiceParticipant->n_chunks()),
00072     reverse_pub_handle_lock_(publication_handle_lock_),
00073     reactor_(0),
00074     liveliness_timer_(new LivelinessTimer(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)),
00075     last_deadline_missed_total_count_(0),
00076     watchdog_(),
00077     is_bit_(false),
00078     initialized_(false),
00079     always_get_history_(false),
00080     statistics_enabled_(false),
00081     raw_latency_buffer_size_(0),
00082     raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00083     monitor_(0),
00084     periodic_monitor_(0),
00085     transport_disabled_(false)
00086 {
00087   reactor_ = TheServiceParticipant->timer();
00088 
00089   liveliness_changed_status_.alive_count = 0;
00090   liveliness_changed_status_.not_alive_count = 0;
00091   liveliness_changed_status_.alive_count_change = 0;
00092   liveliness_changed_status_.not_alive_count_change = 0;
00093   liveliness_changed_status_.last_publication_handle =
00094       DDS::HANDLE_NIL;
00095 
00096   requested_deadline_missed_status_.total_count = 0;
00097   requested_deadline_missed_status_.total_count_change = 0;
00098   requested_deadline_missed_status_.last_instance_handle =
00099       DDS::HANDLE_NIL;
00100 
00101   requested_incompatible_qos_status_.total_count = 0;
00102   requested_incompatible_qos_status_.total_count_change = 0;
00103   requested_incompatible_qos_status_.last_policy_id = 0;
00104   requested_incompatible_qos_status_.policies.length(0);
00105 
00106   subscription_match_status_.total_count = 0;
00107   subscription_match_status_.total_count_change = 0;
00108   subscription_match_status_.current_count = 0;
00109   subscription_match_status_.current_count_change = 0;
00110   subscription_match_status_.last_publication_handle =
00111       DDS::HANDLE_NIL;
00112 
00113   sample_lost_status_.total_count = 0;
00114   sample_lost_status_.total_count_change = 0;
00115 
00116   sample_rejected_status_.total_count = 0;
00117   sample_rejected_status_.total_count_change = 0;
00118   sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
00119   sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
00120 
00121   this->budget_exceeded_status_.total_count = 0;
00122   this->budget_exceeded_status_.total_count_change = 0;
00123   this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
00124 
00125   monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this);
00126   periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this);
00127 }

OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl (  )  [virtual]

Definition at line 131 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), DBG_ENTRY_LVL, OpenDDS::DCPS::ReactorInterceptor::destroy(), end_historic_sweeper_, initialized_, liveliness_timer_, rd_allocator_, remove_association_sweeper_, OpenDDS::DCPS::ReactorInterceptor::wait(), and writers_.

00132 {
00133   DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6);
00134 
00135   {
00136     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00137                    read_guard,
00138                    this->writers_lock_);
00139     // Cancel any uncancelled sweeper timers to decrement reference count.
00140     WriterMapType::iterator writer;
00141     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00142       end_historic_sweeper_->cancel_timer(writer->second);
00143       remove_association_sweeper_->cancel_timer(writer->second);
00144     }
00145   }
00146 
00147   end_historic_sweeper_->wait();
00148   end_historic_sweeper_->destroy();
00149 
00150   remove_association_sweeper_->wait();
00151   remove_association_sweeper_->destroy();
00152 
00153   liveliness_timer_->cancel_timer();
00154   liveliness_timer_->wait();
00155   liveliness_timer_->destroy();
00156 
00157   if (initialized_) {
00158     delete rd_allocator_;
00159   }
00160 }


Member Function Documentation

void OpenDDS::DCPS::DataReaderImpl::accept_coherent ( PublicationId writer_id,
RepoId publisher_id 
)

Definition at line 3061 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, instances_, OPENDDS_STRING, and sample_lock_.

Referenced by verify_coherent_changes_completion().

03063 {
03064   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03065     GuidConverter reader (this->subscription_id_);
03066     GuidConverter writer (writer_id);
03067     GuidConverter publisher (publisher_id);
03068     ACE_DEBUG((LM_DEBUG,
03069         ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
03070         ACE_TEXT(" reader %C writer %C publisher %C \n"),
03071         OPENDDS_STRING(reader).c_str(),
03072         OPENDDS_STRING(writer).c_str(),
03073         OPENDDS_STRING(publisher).c_str()));
03074   }
03075 
03076   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03077   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03078 
03079   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03080       iter != this->instances_.end(); ++iter) {
03081     iter->second->rcvd_strategy_->accept_coherent(
03082         writer_id, publisher_id);
03083   }
03084 }

void OpenDDS::DCPS::DataReaderImpl::add_association ( const RepoId yourId,
const WriterAssociation writer,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 295 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, raw_latency_buffer_size_, raw_latency_buffer_type_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

00298 {
00299   if (DCPS_debug_level) {
00300     GuidConverter reader_converter(yourId);
00301     GuidConverter writer_converter(writer.writerId);
00302     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
00303         ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00304         OPENDDS_STRING(reader_converter).c_str(),
00305         OPENDDS_STRING(writer_converter).c_str()));
00306   }
00307 
00308   if (entity_deleted_.value()) {
00309     if (DCPS_debug_level) {
00310       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
00311           ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00312     }
00313     return;
00314   }
00315 
00316   // We are being called back from the repository before we are done
00317   // processing after our call to the repository that caused this call
00318   // (from the repository) to be made.
00319   if (GUID_UNKNOWN == subscription_id_) {
00320     subscription_id_ = yourId;
00321   }
00322 
00323   //Why do we need the publication_handle_lock_ here?  No access to id_to_handle_map_...
00324   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00325 
00326 
00327   // For each writer in the list of writers to associate with, we
00328   // create a WriterInfo and a WriterStats object and store them in
00329   // our internal maps.
00330   //
00331   {
00332 
00333     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
00334 
00335     const PublicationId& writer_id = writer.writerId;
00336     RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337     std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
00338         // This insertion is idempotent.
00339         WriterMapType::value_type(
00340           writer_id,
00341           info));
00342 
00343       // Schedule timer if necessary
00344       //   - only need to check reader qos - we know the writer must be >= reader
00345       if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00346         info->waiting_for_end_historic_samples_ = true;
00347       }
00348 
00349       this->statistics_.insert(
00350         StatsMapType::value_type(
00351             writer_id,
00352             WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
00353 
00354     // If this is a durable reader
00355     if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
00356       // TODO schedule timer for removing flag from writers
00357     }
00358 
00359     if (DCPS_debug_level > 4) {
00360       GuidConverter converter(writer_id);
00361       ACE_DEBUG((LM_DEBUG,
00362           "(%P|%t) DataReaderImpl::add_association: "
00363           "inserted writer %C.return %d \n",
00364           OPENDDS_STRING(converter).c_str(), bpair.second));
00365 
00366       WriterMapType::iterator iter = writers_.find(writer_id);
00367       if (iter != writers_.end()) {
00368         // This may not be an error since it could happen that the sample
00369         // is delivered to the datareader after the write is dis-associated
00370         // with this datareader.
00371         GuidConverter reader_converter(subscription_id_);
00372         GuidConverter writer_converter(writer_id);
00373         ACE_DEBUG((LM_DEBUG,
00374             ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00375             ACE_TEXT("reader %C is associated with writer %C.\n"),
00376             OPENDDS_STRING(reader_converter).c_str(),
00377             OPENDDS_STRING(writer_converter).c_str()));
00378       }
00379     }
00380   }
00381 
00382   // Propagate the add_associations processing down into the Transport
00383   // layer here.  This will establish the transport support and reserve
00384   // usage of an existing connection or initiate creation of a new
00385   // connection if no suitable connection is available.
00386   AssociationData data;
00387   data.remote_id_ = writer.writerId;
00388   data.remote_data_ = writer.writerTransInfo;
00389   data.publication_transport_priority_ =
00390       writer.writerQos.transport_priority.value;
00391   data.remote_reliable_ =
00392       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00393   data.remote_durable_ =
00394       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00395 
00396   //Do not hold publication_handle_lock_ when calling associate due to possible reactor
00397   //deadlock on passive side completion
00398   //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_
00399   //is held here anyway
00400   guard.release();
00401 
00402   if (!associate(data, active)) {
00403     if (DCPS_debug_level) {
00404       ACE_DEBUG((LM_ERROR,
00405           ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
00406           ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00407     }
00408   }
00409 }

void OpenDDS::DCPS::DataReaderImpl::add_link ( const DataLink_rch link,
const RepoId peer 
) [protected, virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 3344 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::add_link(), end_historic_sweeper_, OPENDDS_STRING, resume_sample_processing(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::schedule_timer(), DDS::VOLATILE_DURABILITY_QOS, writers_, and writers_lock_.

03345 {
03346   if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
03347 
03348     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
03349 
03350     WriterMapType::iterator it = writers_.find(peer);
03351     if (it != writers_.end()) {
03352       // Schedule timer if necessary
03353       //   - only need to check reader qos - we know the writer must be >= reader
03354       end_historic_sweeper_->schedule_timer(it->second);
03355     }
03356   }
03357   TransportClient::add_link(link, peer);
03358   TransportImpl_rch impl = link->impl();
03359   OPENDDS_STRING type = impl->transport_type();
03360 
03361   if (type == "rtps_udp" || type == "multicast") {
03362     resume_sample_processing(peer);
03363   }
03364 }

void OpenDDS::DCPS::DataReaderImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 524 of file DataReaderImpl.cpp.

00525 {
00526   // For the current DCPSInfoRepo implementation, the DataReader side will
00527   // always be passive, so association_complete() will not be called.
00528 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::auto_return_loan ( void *  seq  )  [pure virtual]

This method provides virtual access to type specific code that is used when loans are automatically returned. The destructor of the sequence supporing zero-copy read calls this method on the datareader that provided the loan.

Parameters:
seq - The sequence of loaned values.
Returns:
Always RETCODE_OK.
thows NONE.

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by TAO::DCPS::ZeroCopyDataSeq< Sample_T, DEF_MAX >::~ZeroCopyDataSeq().

void OpenDDS::DCPS::DataReaderImpl::begin_access (  ) 

Definition at line 3202 of file DataReaderImpl.cpp.

References coherent_, and sample_lock_.

03203 {
03204   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03205   this->coherent_ = true;
03206 }

bool OpenDDS::DCPS::DataReaderImpl::check_historic ( const ReceivedDataSample sample  )  [private]

collect samples received before END_HISTORIC_SAMPLES returns false if normal processing of this sample should be skipped

Definition at line 3313 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), writers_, and writers_lock_.

Referenced by data_received().

03314 {
03315   ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
03316   WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
03317   if (iter != writers_.end()) {
03318     const SequenceNumber& seq = sample.header_.sequence_;
03319     if (iter->second->waiting_for_end_historic_samples_) {
03320       iter->second->historic_samples_.insert(std::make_pair(seq, sample));
03321       return false;
03322     }
03323     if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
03324         && !sample.header_.historic_sample_
03325         && seq <= iter->second->last_historic_seq_) {
03326       // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
03327       return false;
03328     }
03329   }
03330   return true;
03331 }

bool OpenDDS::DCPS::DataReaderImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 1791 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), and DDS::RELIABLE_RELIABILITY_QOS.

01792 {
01793   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01794     return ti.is_reliable();
01795   }
01796   return true;
01797 }

void OpenDDS::DCPS::DataReaderImpl::cleanup (  ) 

cleanup the DataWriter.

Definition at line 164 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer(), content_filtered_topic_, dr_local_objref_, end_historic_sweeper_, instances_, liveliness_timer_, owner_manager_, remove_association_sweeper_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), OpenDDS::DCPS::ContentFilteredTopicImpl::remove_reader(), topic_servant_, OpenDDS::DCPS::TopicImpl::type_name(), OpenDDS::DCPS::OwnershipManager::unregister_reader(), OpenDDS::DCPS::TopicDescriptionImpl::update_reader_count(), OpenDDS::DCPS::ReactorInterceptor::wait(), watchdog_, and writers_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().

00165 {
00166   {
00167     // Is this lock necessary?
00168     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00169         guard,
00170         this->sample_lock_);
00171 
00172     liveliness_timer_->cancel_timer();
00173   }
00174   liveliness_timer_->wait();
00175 
00176   // Cancel any watchdog timers
00177   { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00178   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00179       iter != instances_.end();
00180       ++iter) {
00181     SubscriptionInstance *ptr = iter->second;
00182     if (this->watchdog_ && ptr->deadline_timer_id_ != -1) {
00183       this->watchdog_->cancel_timer(ptr);
00184     }
00185   }
00186   }
00187 
00188 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00189   if (owner_manager_) {
00190     owner_manager_->unregister_reader(topic_servant_->type_name(), this);
00191   }
00192 #endif
00193 
00194   if (topic_servant_) {
00195     topic_servant_->remove_entity_ref();
00196     topic_servant_->_remove_ref();
00197   }
00198 
00199   dr_local_objref_ = DDS::DataReader::_nil();
00200 
00201 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00202   if (!CORBA::is_nil(content_filtered_topic_.in())) {
00203     ContentFilteredTopicImpl* cft =
00204         dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in());
00205     cft->remove_reader(*this);
00206     cft->update_reader_count(false);
00207     content_filtered_topic_ = DDS::ContentFilteredTopic::_nil ();
00208   }
00209 #endif
00210 
00211   {
00212     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00213                    read_guard,
00214                    this->writers_lock_);
00215     // Cancel any uncancelled sweeper timers
00216     WriterMapType::iterator writer;
00217     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00218       end_historic_sweeper_->cancel_timer(writer->second);
00219       remove_association_sweeper_->cancel_timer(writer->second);
00220     }
00221   }
00222 
00223   end_historic_sweeper_->wait();
00224   remove_association_sweeper_->wait();
00225 }

bool OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( WriterInfo writer  )  [private]

void OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( RepoId  publisher_id,
Coherent_State result 
)

Definition at line 3131 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, state, and writers_.

03132 {
03133   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03134 
03135   result = COMPLETED;
03136   for (WriterMapType::iterator iter = writers_.begin();
03137       iter != writers_.end();
03138       ++iter) {
03139 
03140     if (iter->second->publisher_id_ == publisher_id) {
03141       Coherent_State state = iter->second->coherent_change_received();
03142       if (state == NOT_COMPLETED_YET) {
03143         result = state;
03144         break;
03145       }
03146       else if (state == REJECTED) {
03147         result = REJECTED;
03148       }
03149     }
03150   }
03151 }

void OpenDDS::DCPS::DataReaderImpl::coherent_changes_completed ( DataReaderImpl reader  ) 

Definition at line 3155 of file DataReaderImpl.cpp.

References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, dr_local_objref_, listener_for(), OpenDDS::DCPS::SubscriberImpl::listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and subscriber_servant_.

Referenced by verify_coherent_changes_completion().

03156 {
03157   this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
03158   this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
03159 
03160   ::DDS::SubscriberListener_var sub_listener =
03161       this->subscriber_servant_->listener_for(::DDS::DATA_ON_READERS_STATUS);
03162   if (!CORBA::is_nil(sub_listener.in()))
03163   {
03164     if (reader == this) {
03165       // Release the sample_lock before listener callback.
03166       ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03167       sub_listener->on_data_on_readers(this->subscriber_servant_);
03168     }
03169 
03170     this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03171     this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03172   }
03173   else
03174   {
03175     this->subscriber_servant_->notify_status_condition();
03176 
03177     ::DDS::DataReaderListener_var listener =
03178         this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
03179 
03180     if (!CORBA::is_nil(listener.in()))
03181     {
03182       if (reader == this) {
03183         // Release the sample_lock before listener callback.
03184         ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
03185         listener->on_data_available(dr_local_objref_.in ());
03186       }
03187       else {
03188         listener->on_data_available(dr_local_objref_.in ());
03189       }
03190 
03191       set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
03192       this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
03193     }
03194     else
03195     {
03196       this->notify_status_condition();
03197     }
03198   }
03199 }

bool OpenDDS::DCPS::DataReaderImpl::contains_sample ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Fold-in the three separate loops of have_sample_states(), have_view_states(), and have_instance_states(). Takes the sample_lock_.

Definition at line 1892 of file DataReaderImpl.cpp.

References instances_, and sample_lock_.

Referenced by OpenDDS::DCPS::ReadConditionImpl::get_trigger_value().

01894 {
01895   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
01896   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01897 
01898   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
01899       end = instances_.end(); iter != end; ++iter) {
01900     SubscriptionInstance& inst = *iter->second;
01901 
01902     if ((inst.instance_state_.view_state() & view_states) &&
01903         (inst.instance_state_.instance_state() & instance_states)) {
01904       for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
01905           item = item->next_data_sample_) {
01906         if (item->sample_state_ & sample_states
01907 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01908             && !item->coherent_change_
01909 #endif
01910         ) {
01911           return true;
01912         }
01913       }
01914     }
01915   }
01916 
01917   return false;
01918 }

virtual bool OpenDDS::DCPS::DataReaderImpl::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const FilterEvaluator evaluator,
const DDS::StringSeq params 
) [pure virtual]

Referenced by OpenDDS::DCPS::QueryConditionImpl::get_trigger_value().

DDS::QueryCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_querycondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const char *  query_expression,
const DDS::StringSeq query_parameters 
) [virtual]

Definition at line 891 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, QueryConditionImpl, and read_conditions_.

00897 {
00898   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00899   try {
00900     DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
00901         view_states, instance_states, query_expression, query_parameters);
00902     DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
00903     read_conditions_.insert(rc);
00904     return qc._retn();
00905   } catch (const std::exception& e) {
00906     if (DCPS_debug_level) {
00907       ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ")
00908           ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
00909           e.what()));
00910     }
00911     return 0;
00912   }
00913 }

DDS::ReadCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_readcondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [virtual]

Definition at line 878 of file DataReaderImpl.cpp.

References read_conditions_.

00882 {
00883   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
00884   DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
00885       view_states, instance_states);
00886   read_conditions_.insert(rc);
00887   return rc._retn();
00888 }

void OpenDDS::DCPS::DataReaderImpl::data_received ( const ReceivedDataSample sample  )  [virtual]

process a message that has been received - could be control or a data sample.

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 1454 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(), check_historic(), OpenDDS::DCPS::SubscriptionInstance::cur_sample_tv_, OpenDDS::DCPS::SubscriberImpl::data_received(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_demarshal(), OpenDDS::DCPS::DISPOSE_INSTANCE, dispose_unregister(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER, OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::EntityImpl::get_deleted(), get_repo_id(), OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, is_exclusive_ownership_, OpenDDS::DCPS::InstanceState::is_last(), OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::SubscriptionInstance::last_sample_tv_, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, notify_read_conditions(), OPENDDS_STRING, process_latency(), OpenDDS::DCPS::DataSampleHeader::publication_id_, resume_sample_processing(), reverse_sample_lock_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(), subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, verify_coherent_changes_completion(), watchdog_, writer_activity(), and writers_.

Referenced by deliver_historic().

01455 {
01456   DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
01457 
01458   // ensure some other thread is not changing the sample container
01459   // or statuses related to samples.
01460   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
01461 
01462   if (get_deleted()) return;
01463 
01464   if (DCPS_debug_level > 9) {
01465     GuidConverter converter(subscription_id_);
01466     ACE_DEBUG((LM_DEBUG,
01467         ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01468         ACE_TEXT("%C received sample: %C.\n"),
01469         OPENDDS_STRING(converter).c_str(),
01470         to_string(sample.header_).c_str()));
01471   }
01472 
01473   switch (sample.header_.message_id_) {
01474   case SAMPLE_DATA:
01475   case INSTANCE_REGISTRATION: {
01476     if (!check_historic(sample)) break;
01477 
01478     DataSampleHeader const & header = sample.header_;
01479 
01480     this->writer_activity(header);
01481 
01482     // Verify data has not exceeded its lifespan.
01483     if (this->filter_sample(header)) break;
01484 
01485     // This adds the reader to the set/list of readers with data.
01486     this->subscriber_servant_->data_received(this);
01487 
01488     // Only gather statistics about real samples, not registration data, etc.
01489     if (header.message_id_ == SAMPLE_DATA) {
01490       this->process_latency(sample);
01491     }
01492 
01493     // This also adds to the sample container and makes any callbacks
01494     // and condition modifications.
01495 
01496     SubscriptionInstance* instance = 0;
01497     bool is_new_instance = false;
01498     bool filtered = false;
01499     if (sample.header_.key_fields_only_) {
01500       dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING);
01501     } else {
01502       dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING);
01503     }
01504 
01505     // Per sample logging
01506     if (DCPS_debug_level >= 8) {
01507       GuidConverter reader_converter(subscription_id_);
01508       GuidConverter writer_converter(header.publication_id_);
01509 
01510       ACE_DEBUG ((LM_DEBUG,
01511           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
01512           ACE_TEXT("instance %d is_new_instance %d filtered %d \n"),
01513           OPENDDS_STRING(reader_converter).c_str(),
01514           OPENDDS_STRING(writer_converter).c_str(),
01515           instance ? instance->instance_handle_ : 0,
01516               is_new_instance, filtered));
01517     }
01518 
01519     if (filtered) break; // sample filtered from instance
01520     bool accepted = true;
01521 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01522     bool verify_coherent = false;
01523 #endif
01524     RcHandle<WriterInfo> writer;
01525 
01526     if (header.publication_id_.entityId.entityKind
01527         != OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER) {
01528       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01529 
01530       WriterMapType::iterator where
01531       = this->writers_.find(header.publication_id_);
01532 
01533       if (where != this->writers_.end()) {
01534         if (header.coherent_change_) {
01535 
01536 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01537           // Received coherent change
01538           where->second->group_coherent_ = header.group_coherent_;
01539           where->second->publisher_id_ = header.publisher_id_;
01540           ++where->second->coherent_samples_;
01541           verify_coherent = true;
01542 #endif
01543           writer = where->second;
01544         }
01545       } else {
01546         GuidConverter subscriptionBuffer(this->subscription_id_);
01547         GuidConverter publicationBuffer(header.publication_id_);
01548         ACE_DEBUG((LM_WARNING,
01549             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01550             ACE_TEXT("subscription %C failed to find ")
01551             ACE_TEXT("publication data for %C.\n"),
01552             OPENDDS_STRING(subscriptionBuffer).c_str(),
01553             OPENDDS_STRING(publicationBuffer).c_str()));
01554       }
01555     }
01556 
01557 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01558     if (verify_coherent) {
01559       accepted = this->verify_coherent_changes_completion(writer.in());
01560     }
01561 #endif
01562 
01563     if (this->watchdog_) {
01564       instance->last_sample_tv_ = instance->cur_sample_tv_;
01565       instance->cur_sample_tv_ = ACE_OS::gettimeofday();
01566 
01567       // Watchdog can't be called with sample_lock_ due to reactor deadlock
01568       ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01569       if (is_new_instance) {
01570         this->watchdog_->schedule_timer(instance);
01571 
01572       } else {
01573         this->watchdog_->execute(instance, false);
01574       }
01575     }
01576 
01577     if (accepted) {
01578       this->notify_read_conditions();
01579     }
01580   }
01581   break;
01582 
01583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01584   case END_COHERENT_CHANGES: {
01585     CoherentChangeControl control;
01586 
01587     this->writer_activity(sample.header_);
01588 
01589     Serializer serializer(
01590         sample.sample_, sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER);
01591     serializer >> control;
01592 
01593     if (DCPS_debug_level > 0) {
01594       std::stringstream buffer;
01595       buffer << control << std::endl;
01596 
01597       ACE_DEBUG((LM_DEBUG,
01598           ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01599           ACE_TEXT("END_COHERENT_CHANGES %C\n"),
01600           buffer.str().c_str()));
01601     }
01602 
01603     RcHandle<WriterInfo> writer;
01604     {
01605       ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01606 
01607       WriterMapType::iterator it =
01608           this->writers_.find(sample.header_.publication_id_);
01609 
01610       if (it == this->writers_.end()) {
01611         GuidConverter sub_id(this->subscription_id_);
01612         GuidConverter pub_id(sample.header_.publication_id_);
01613         ACE_DEBUG((LM_WARNING,
01614             ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
01615             ACE_TEXT(" subscription %C failed to find ")
01616             ACE_TEXT(" publication data for %C!\n"),
01617             OPENDDS_STRING(sub_id).c_str(),
01618             OPENDDS_STRING(pub_id).c_str()));
01619         return;
01620       }
01621       else {
01622         writer = it->second;
01623       }
01624       it->second->set_group_info (control);
01625     }
01626 
01627     if (this->verify_coherent_changes_completion(writer.in())) {
01628       this->notify_read_conditions();
01629     }
01630   }
01631   break;
01632 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
01633 
01634   case DATAWRITER_LIVELINESS: {
01635     if (DCPS_debug_level >= 4) {
01636       GuidConverter reader_converter(subscription_id_);
01637       GuidConverter writer_converter(sample.header_.publication_id_);
01638       ACE_DEBUG((LM_DEBUG,
01639                  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
01640                  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
01641                  OPENDDS_STRING(reader_converter).c_str(),
01642                  OPENDDS_STRING(writer_converter).c_str()));
01643     }
01644     this->writer_activity(sample.header_);
01645 
01646     // tell all instances they got a liveliness message
01647     { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01648     for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01649         iter != instances_.end();
01650         ++iter) {
01651       SubscriptionInstance *ptr = iter->second;
01652 
01653       ptr->instance_state_.lively(sample.header_.publication_id_);
01654     }
01655     }
01656 
01657   }
01658   break;
01659 
01660   case DISPOSE_INSTANCE: {
01661     if (!check_historic(sample)) break;
01662     this->writer_activity(sample.header_);
01663     SubscriptionInstance* instance = 0;
01664 
01665     if (this->watchdog_) {
01666       // Find the instance first for timer cancellation since
01667       // the instance may be deleted during dispose and can
01668       // not be accessed.
01669       ReceivedDataSample dup(sample);
01670       this->lookup_instance(dup, instance);
01671 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01672       if (! this->is_exclusive_ownership_
01673           || (this->is_exclusive_ownership_
01674               && (instance != 0 )
01675               && (this->owner_manager_->is_owner (instance->instance_handle_,
01676                   sample.header_.publication_id_)))) {
01677 #endif
01678         this->watchdog_->cancel_timer(instance);
01679 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01680       }
01681 #endif
01682     }
01683     instance = 0;
01684     this->dispose_unregister(sample, instance);
01685   }
01686   this->notify_read_conditions();
01687   break;
01688 
01689   case UNREGISTER_INSTANCE: {
01690     if (!check_historic(sample)) break;
01691     this->writer_activity(sample.header_);
01692     SubscriptionInstance* instance = 0;
01693 
01694     if (this->watchdog_) {
01695       // Find the instance first for timer cancellation since
01696       // the instance may be deleted during dispose and can
01697       // not be accessed.
01698       ReceivedDataSample dup(sample);
01699       this->lookup_instance(dup, instance);
01700       if( instance != 0) {
01701 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01702         if (! this->is_exclusive_ownership_
01703             || (this->is_exclusive_ownership_
01704                 && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01705 #endif
01706           this->watchdog_->cancel_timer(instance);
01707 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01708         }
01709 #endif
01710       }
01711     }
01712     instance = 0;
01713     this->dispose_unregister(sample, instance);
01714   }
01715   this->notify_read_conditions();
01716   break;
01717 
01718   case DISPOSE_UNREGISTER_INSTANCE: {
01719     if (!check_historic(sample)) break;
01720     this->writer_activity(sample.header_);
01721     SubscriptionInstance* instance = 0;
01722 
01723     if (this->watchdog_) {
01724       // Find the instance first for timer cancellation since
01725       // the instance may be deleted during dispose and can
01726       // not be accessed.
01727       ReceivedDataSample dup(sample);
01728       this->lookup_instance(dup, instance);
01729 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01730       if (! this->is_exclusive_ownership_
01731           || (this->is_exclusive_ownership_
01732               && (instance != 0 )
01733               && (this->owner_manager_->is_owner (instance->instance_handle_,
01734                   sample.header_.publication_id_)))
01735                   || (this->is_exclusive_ownership_
01736                       && (instance != 0 )
01737                       && instance->instance_state_.is_last (sample.header_.publication_id_))) {
01738 #endif
01739         this->watchdog_->cancel_timer(instance);
01740 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01741       }
01742 #endif
01743     }
01744     instance = 0;
01745     this->dispose_unregister(sample, instance);
01746   }
01747   this->notify_read_conditions();
01748   break;
01749 
01750   case END_HISTORIC_SAMPLES: {
01751     if (sample.header_.message_length_ >= sizeof(RepoId)) {
01752       Serializer ser(sample.sample_);
01753       RepoId readerId = GUID_UNKNOWN;
01754       ser >> readerId;
01755       if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) {
01756         break; // not our message
01757       }
01758     }
01759     if (DCPS_debug_level > 4) {
01760       ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
01761     }
01762     // Going to acquire writers lock, release samples lock
01763     guard.release();
01764     this->resume_sample_processing(sample.header_.publication_id_);
01765     if (DCPS_debug_level > 4) {
01766       GuidConverter pub_id(sample.header_.publication_id_);
01767       ACE_DEBUG((
01768           LM_INFO,
01769           "(%P|%t) Resumed sample processing for durable writer %C\n",
01770           OPENDDS_STRING(pub_id).c_str()));
01771     }
01772     break;
01773   }
01774 
01775   default:
01776     ACE_ERROR((LM_ERROR,
01777         "(%P|%t) ERROR: DataReaderImpl::data_received"
01778         "unexpected message_id = %d\n",
01779         sample.header_.message_id_));
01780     break;
01781   }
01782 }

virtual void OpenDDS::DCPS::DataReaderImpl::dds_demarshal ( const ReceivedDataSample sample,
SubscriptionInstance *&  instance,
bool &  is_new_instance,
bool &  filtered,
MarshalingType  marshaling_type 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by data_received().

virtual void OpenDDS::DCPS::DataReaderImpl::dec_ref_data_element ( ReceivedDataElement r  )  [pure virtual]

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and TAO::DCPS::ZeroCopyDataSeq< Sample_T, DEF_MAX >::length().

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_contained_entities (  )  [virtual]

Definition at line 933 of file DataReaderImpl.cpp.

References read_conditions_, DDS::RETCODE_OK, and DDS::RETCODE_OUT_OF_RESOURCES.

00934 {
00935   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00936       DDS::RETCODE_OUT_OF_RESOURCES);
00937   read_conditions_.clear();
00938   return DDS::RETCODE_OK;
00939 }

virtual void OpenDDS::DCPS::DataReaderImpl::delete_instance_map ( void *  map  )  [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by OpenDDS::DCPS::OwnershipManager::unregister_reader().

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_readcondition ( DDS::ReadCondition_ptr  a_condition  )  [virtual]

Definition at line 923 of file DataReaderImpl.cpp.

References read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and DDS::RETCODE_PRECONDITION_NOT_MET.

00925 {
00926   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00927       DDS::RETCODE_OUT_OF_RESOURCES);
00928   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00929   return read_conditions_.erase(rc)
00930       ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
00931 }

void OpenDDS::DCPS::DataReaderImpl::deliver_historic ( OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&  samples  )  [private]

deliver samples that were held by check_historic()

Definition at line 3333 of file DataReaderImpl.cpp.

References data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, and OPENDDS_MAP().

Referenced by resume_sample_processing().

03334 {
03335   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
03336   const iter_t end = samples.end();
03337   for (iter_t iter = samples.begin(); iter != end; ++iter) {
03338     iter->second.header_.historic_sample_ = true;
03339     data_received(iter->second);
03340   }
03341 }

ACE_INLINE void OpenDDS::DCPS::DataReaderImpl::disable_transport (  ) 

Definition at line 40 of file DataReaderImpl.inl.

References transport_disabled_.

Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr().

00041 {
00042   this->transport_disabled_ = true;
00043 }

void OpenDDS::DCPS::DataReaderImpl::dispose_unregister ( const ReceivedDataSample sample,
SubscriptionInstance *&  instance 
) [virtual]

Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Definition at line 2393 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

Referenced by data_received().

02395 {
02396   if (DCPS_debug_level > 0) {
02397     ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
02398   }
02399 }

DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 664 of file DataReaderImpl.h.

00664 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 1226 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::deadline, depth_, domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), get_cf_topic(), OpenDDS::DCPS::ContentFilteredTopicImpl::get_filter_class_name(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::SubscriberImpl::get_qos(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::DataReaderQos::history, DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataReaderQos::liveliness, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, monitor_, n_chunks_, DDS::Duration_t::nanosec, qos_, rd_allocator_, OpenDDS::DCPS::SubscriberImpl::reader_enabled(), DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), requested_deadline_missed_status_, RequestedDeadlineWatchdog, DDS::DataReaderQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, transport_disabled_, DDS::VOLATILE_DURABILITY_QOS, and watchdog_.

Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr(), and OpenDDS::DCPS::SubscriberImpl::create_datareader().

01227 {
01228   //According spec:
01229   // - Calling enable on an already enabled Entity returns OK and has no
01230   // effect.
01231   // - Calling enable on an Entity whose factory is not enabled will fail
01232   // and return PRECONDITION_NOT_MET.
01233 
01234   if (this->is_enabled()) {
01235     return DDS::RETCODE_OK;
01236   }
01237 
01238   if (this->subscriber_servant_->is_enabled() == false) {
01239     return DDS::RETCODE_PRECONDITION_NOT_MET;
01240   }
01241 
01242   if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
01243     // The spec says qos_.history.depth is "has no effect"
01244     // when history.kind = KEEP_ALL so use max_samples_per_instance
01245     depth_ = qos_.resource_limits.max_samples_per_instance;
01246 
01247   } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
01248     depth_ = qos_.history.depth;
01249   }
01250 
01251   if (depth_ == DDS::LENGTH_UNLIMITED) {
01252     // DDS::LENGTH_UNLIMITED is negative so make it a positive
01253     // value that is for all intents and purposes unlimited
01254     // and we can use it for comparisons.
01255     // use 2147483647L because that is the greatest value a signed
01256     // CORBA::Long can have.
01257     // WARNING: The client risks running out of memory in this case.
01258     depth_ = 2147483647L;
01259   }
01260 
01261   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01262     n_chunks_ = qos_.resource_limits.max_samples;
01263   }
01264 
01265   //else using value from Service_Participant
01266 
01267   // enable the type specific part of this DataReader
01268   this->enable_specific();
01269 
01270   //Note: the QoS used to set n_chunks_ is Changable=No so
01271   // it is OK that we cannot change the size of our allocators.
01272   rd_allocator_ = new ReceivedDataAllocator(n_chunks_);
01273 
01274   if (DCPS_debug_level >= 2)
01275     ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
01276         " Cached_Allocator_With_Overflow %x with %d chunks\n",
01277         rd_allocator_, n_chunks_));
01278 
01279   if ((qos_.liveliness.lease_duration.sec !=
01280       DDS::DURATION_INFINITE_SEC) &&
01281       (qos_.liveliness.lease_duration.nanosec !=
01282           DDS::DURATION_INFINITE_NSEC)) {
01283     liveliness_lease_duration_ =
01284         duration_to_time_value(qos_.liveliness.lease_duration);
01285   }
01286 
01287   // Setup the requested deadline watchdog if the configured deadline
01288   // period is not the default (infinite).
01289   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01290 
01291   if (this->watchdog_ == 0
01292       && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01293           || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
01294     this->watchdog_ =
01295         new RequestedDeadlineWatchdog(
01296             this->sample_lock_,
01297             this->qos_.deadline,
01298             this,
01299             this->dr_local_objref_.in(),
01300             this->requested_deadline_missed_status_,
01301             this->last_deadline_missed_total_count_);
01302   }
01303 
01304   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01305   disco->pre_reader(this);
01306 
01307   this->set_enabled();
01308 
01309   if (topic_servant_ && !transport_disabled_) {
01310 
01311     try {
01312       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
01313           this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01314     } catch (const Transport::Exception&) {
01315       ACE_ERROR((LM_ERROR,
01316           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01317           ACE_TEXT("Transport Exception.\n")));
01318       return DDS::RETCODE_ERROR;
01319 
01320     }
01321 
01322     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01323 
01324     CORBA::String_var filterClassName = "";
01325     CORBA::String_var filterExpression = "";
01326     DDS::StringSeq exprParams;
01327 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01328     DDS::ContentFilteredTopic_var cft = this->get_cf_topic();
01329     if (cft) {
01330       OpenDDS::DCPS::ContentFilteredTopicImpl* impl =
01331         dynamic_cast<OpenDDS::DCPS::ContentFilteredTopicImpl*>(cft.in());
01332       if (impl) {
01333         filterClassName = impl->get_filter_class_name();
01334       }
01335       filterExpression = cft->get_filter_expression();
01336       cft->get_expression_parameters(exprParams);
01337     }
01338 #endif
01339 
01340     DDS::SubscriberQos sub_qos;
01341     this->subscriber_servant_->get_qos(sub_qos);
01342 
01343     this->subscription_id_ =
01344         disco->add_subscription(this->domain_id_,
01345             this->participant_servant_->get_id(),
01346             this->topic_servant_->get_id(),
01347             this,
01348             this->qos_,
01349             trans_conf_info,
01350             sub_qos,
01351             filterClassName,
01352             filterExpression,
01353             exprParams);
01354 
01355     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01356       ACE_ERROR((LM_ERROR,
01357           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
01358           ACE_TEXT("add_subscription returned invalid id.\n")));
01359       return DDS::RETCODE_ERROR;
01360     }
01361   }
01362 
01363   if (topic_servant_) {
01364     const CORBA::String_var name = topic_servant_->get_name();
01365     DDS::ReturnCode_t return_value =
01366         this->subscriber_servant_->reader_enabled(name.in(), this);
01367 
01368     if (this->monitor_) {
01369       this->monitor_->report();
01370     }
01371 
01372     return return_value;
01373   } else {
01374     return DDS::RETCODE_OK;
01375   }
01376 }

void OpenDDS::DCPS::DataReaderImpl::enable_filtering ( ContentFilteredTopicImpl cft  ) 

Definition at line 3254 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader(), content_filtered_topic_, and OpenDDS::DCPS::TopicDescriptionImpl::update_reader_count().

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

03255 {
03256   cft->update_reader_count(true);
03257   cft->add_reader(*this);
03258   content_filtered_topic_ = DDS::ContentFilteredTopic::_duplicate(cft);
03259 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable_specific (  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by enable().

void OpenDDS::DCPS::DataReaderImpl::end_access (  ) 

Definition at line 3209 of file DataReaderImpl.cpp.

References coherent_, group_coherent_ordered_data_, post_read_or_take(), OpenDDS::DCPS::GroupRakeData::reset(), and sample_lock_.

03210 {
03211   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03212   this->coherent_ = false;
03213   this->group_coherent_ordered_data_.reset();
03214   this->post_read_or_take();
03215 }

bool OpenDDS::DCPS::DataReaderImpl::filter_instance ( SubscriptionInstance instance,
const PublicationId pubid 
) [protected]

Definition at line 2759 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, OpenDDS::DCPS::InstanceState::get_owner(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::SubscriptionInstance::last_accepted_, OPENDDS_STRING, qos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataReaderQos::time_based_filter, OpenDDS::DCPS::time_value_to_duration(), and writers_.

02761 {
02762 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02763   if (this->is_exclusive_ownership_) {
02764 
02765     WriterMapType::iterator iter = writers_.find(pubid);
02766 
02767     if (iter == writers_.end()) {
02768       if (DCPS_debug_level > 4) {
02769         // This may not be an error since it could happen that the sample
02770         // is delivered to the datareader after the write is dis-associated
02771         // with this datareader.
02772         GuidConverter reader_converter(subscription_id_);
02773         GuidConverter writer_converter(pubid);
02774         ACE_DEBUG((LM_DEBUG,
02775             ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02776             ACE_TEXT("reader %C is not associated with writer %C.\n"),
02777             OPENDDS_STRING(reader_converter).c_str(),
02778             OPENDDS_STRING(writer_converter).c_str()));
02779       }
02780       return true;
02781     }
02782 
02783 
02784     // Evaulate the owner of the instance if not selected and filter
02785     // current message if it's not from owner writer.
02786     if ( instance->instance_state_.get_owner () == GUID_UNKNOWN
02787         || ! iter->second->is_owner_evaluated (instance->instance_handle_)) {
02788       bool is_owner = this->owner_manager_->select_owner (
02789           instance->instance_handle_,
02790           iter->second->writer_id_,
02791           iter->second->writer_qos_.ownership_strength.value,
02792           &instance->instance_state_);
02793       iter->second->set_owner_evaluated (instance->instance_handle_, true);
02794 
02795       if (! is_owner) {
02796         if (DCPS_debug_level >= 1) {
02797           GuidConverter reader_converter(subscription_id_);
02798           GuidConverter writer_converter(pubid);
02799           GuidConverter owner_converter (instance->instance_state_.get_owner ());
02800           ACE_DEBUG((LM_DEBUG,
02801               ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02802               ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
02803               OPENDDS_STRING(reader_converter).c_str(),
02804               OPENDDS_STRING(writer_converter).c_str(),
02805               OPENDDS_STRING(owner_converter).c_str()));
02806         }
02807         return true;
02808       }
02809     }
02810     else if (! (instance->instance_state_.get_owner () == pubid)) {
02811       if (DCPS_debug_level >= 1) {
02812         GuidConverter reader_converter(subscription_id_);
02813         GuidConverter writer_converter(pubid);
02814         GuidConverter owner_converter (instance->instance_state_.get_owner ());
02815         ACE_DEBUG((LM_DEBUG,
02816             ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ")
02817             ACE_TEXT("reader %C writer %C is not owner %C\n"),
02818             OPENDDS_STRING(reader_converter).c_str(),
02819             OPENDDS_STRING(writer_converter).c_str(),
02820             OPENDDS_STRING(owner_converter).c_str()));
02821       }
02822       return true;
02823     }
02824   }
02825 #else
02826   ACE_UNUSED_ARG(pubid);
02827 #endif
02828 
02829   ACE_Time_Value now(ACE_OS::gettimeofday());
02830 
02831   // TIME_BASED_FILTER processing; expire data samples
02832   // if minimum separation is not met for instance.
02833   const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
02834 
02835   if (this->qos_.time_based_filter.minimum_separation > zero) {
02836     DDS::Duration_t separation =
02837         time_value_to_duration(now - instance->last_accepted_);
02838 
02839     if (separation < this->qos_.time_based_filter.minimum_separation) {
02840       return true;  // Data filtered.
02841     }
02842   }
02843 
02844   instance->last_accepted_ = now;
02845 
02846   return false;
02847 }

bool OpenDDS::DCPS::DataReaderImpl::filter_sample ( const DataSampleHeader header  )  [protected]

Note:
Filtering will only occur if the application configured a finite duration in the Topic's LIFESPAN QoS policy or DataReader's TIME_BASED_FILTER QoS policy.

Definition at line 2710 of file DataReaderImpl.cpp.

References always_get_history_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, header, qos_, OpenDDS::DCPS::time_to_time_value(), and DDS::VOLATILE_DURABILITY_QOS.

02711 {
02712   ACE_Time_Value now(ACE_OS::gettimeofday());
02713 
02714   // Expire historic data if QoS indicates VOLATILE.
02715   if (!always_get_history_ && header.historic_sample_
02716       && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
02717     if (DCPS_debug_level >= 8) {
02718       ACE_DEBUG((LM_DEBUG,
02719           ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
02720           ACE_TEXT("Discarded historic data.\n")));
02721     }
02722 
02723     return true;  // Data filtered.
02724   }
02725 
02726   // The LIFESPAN_DURATION_FLAG is set when sample data is sent
02727   // with a non-default LIFESPAN duration value.
02728   if (header.lifespan_duration_) {
02729     // Finite lifespan.  Check if data has expired.
02730 
02731     DDS::Time_t const tmp = {
02732         header.source_timestamp_sec_ + header.lifespan_duration_sec_,
02733         header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
02734     };
02735 
02736     // We assume that the publisher host's clock and subcriber host's
02737     // clock are synchronized (allowed by the spec).
02738     ACE_Time_Value const expiration_time(
02739         OpenDDS::DCPS::time_to_time_value(tmp));
02740 
02741     if (now >= expiration_time) {
02742       if (DCPS_debug_level >= 8) {
02743         ACE_Time_Value const diff(now - expiration_time);
02744         ACE_DEBUG((LM_DEBUG,
02745             ACE_TEXT("OpenDDS (%P|%t) Received data ")
02746             ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
02747             diff.sec(),
02748             diff.usec()));
02749       }
02750 
02751       return true;  // Data filtered.
02752     }
02753   }
02754 
02755   return false;
02756 }

DDS::ContentFilteredTopic_ptr OpenDDS::DCPS::DataReaderImpl::get_cf_topic (  )  const

Definition at line 3262 of file DataReaderImpl.cpp.

References content_filtered_topic_.

Referenced by enable(), and get_topicdescription().

03263 {
03264   return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_);
03265 }

CORBA::Long OpenDDS::DCPS::DataReaderImpl::get_depth (  )  const [inline]

Definition at line 397 of file DataReaderImpl.h.

00397                               {
00398     return depth_;
00399   }

OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_dp_id (  ) 

Definition at line 2960 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::get_id(), and participant_servant_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02961 {
02962   return this->participant_servant_->get_id();
02963 }

ACE_INLINE DDS::DataReader_ptr OpenDDS::DCPS::DataReaderImpl::get_dr_obj_ref (  ) 

Definition at line 10 of file DataReaderImpl.inl.

References dr_local_objref_.

00011 {
00012   return DDS::DataReader::_duplicate(dr_local_objref_.in()) ;
00013 }

SubscriptionInstance * OpenDDS::DCPS::DataReaderImpl::get_handle_instance ( DDS::InstanceHandle_t  handle  )  [protected]

Definition at line 2538 of file DataReaderImpl.cpp.

References instances_.

Referenced by release_instance().

02539 {
02540   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, 0);
02541 
02542   SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
02543   if (iter == instances_.end()) {
02544     ACE_DEBUG((LM_WARNING,
02545         ACE_TEXT("(%P|%t) WARNING: ")
02546         ACE_TEXT("DataReaderImpl::get_handle_instance: ")
02547         ACE_TEXT("lookup for 0x%x failed\n"),
02548         handle));
02549     return 0;
02550   } // if (0 != instances_.find(handle, instance))
02551 
02552   return iter->second;
02553 }

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 289 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

00290 {
00291   return this->participant_servant_->id_to_handle(subscription_id_);
00292 }

void OpenDDS::DCPS::DataReaderImpl::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 2966 of file DataReaderImpl.cpp.

References instances_, and sample_lock_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02967 {
02968   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
02969   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02970 
02971   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02972       end = instances_.end(); iter != end; ++iter) {
02973     instance_handles.push_back(iter->first);
02974   }
02975 }

void OpenDDS::DCPS::DataReaderImpl::get_latency_stats ( OpenDDS::DCPS::LatencyStatisticsSeq stats  )  [virtual]

Definition at line 2491 of file DataReaderImpl.cpp.

References statistics_.

02493 {
02494   stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
02495   int index = 0;
02496 
02497   for (StatsMapType::const_iterator current = this->statistics_.begin();
02498       current != this->statistics_.end();
02499       ++current, ++index) {
02500     stats[ index] = current->second.get_stats();
02501     stats[ index].publication = current->first;
02502   }
02503 }

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_listener (  )  [virtual]

Definition at line 1028 of file DataReaderImpl.cpp.

References listener_.

01029 {
01030   return DDS::DataReaderListener::_duplicate(listener_.in());
01031 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_liveliness_changed_status ( DDS::LivelinessChangedStatus status  )  [virtual]

Definition at line 1062 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count_change, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

01064 {
01065   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01066 
01067   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
01068       false);
01069   status = liveliness_changed_status_;
01070 
01071   liveliness_changed_status_.alive_count_change = 0;
01072   liveliness_changed_status_.not_alive_count_change = 0;
01073 
01074   return DDS::RETCODE_OK;
01075 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publication_data ( DDS::PublicationBuiltinTopicData publication_data,
DDS::InstanceHandle_t  publication_handle 
) [virtual]

Definition at line 1186 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01189 {
01190   if (enabled_ == false) {
01191     ACE_ERROR_RETURN((LM_ERROR,
01192         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
01193         ACE_TEXT("get_matched_publication_data: ")
01194         ACE_TEXT("Entity is not enabled. \n")),
01195         DDS::RETCODE_NOT_ENABLED);
01196   }
01197 
01198 
01199   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01200       guard,
01201       this->publication_handle_lock_,
01202       DDS::RETCODE_ERROR);
01203 
01204 
01205   BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01206   DDS::PublicationBuiltinTopicDataDataReader_var,
01207   DDS::PublicationBuiltinTopicDataSeq > hh;
01208 
01209   DDS::PublicationBuiltinTopicDataSeq data;
01210 
01211   DDS::ReturnCode_t ret
01212   = hh.instance_handle_to_bit_data(participant_servant_,
01213       BUILT_IN_PUBLICATION_TOPIC,
01214       publication_handle,
01215       data);
01216 
01217   if (ret == DDS::RETCODE_OK) {
01218     publication_data = data[0];
01219   }
01220 
01221   return ret;
01222 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publications ( DDS::InstanceHandleSeq publication_handles  )  [virtual]

Definition at line 1155 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01157 {
01158   if (enabled_ == false) {
01159     ACE_ERROR_RETURN((LM_ERROR,
01160         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
01161         ACE_TEXT(" Entity is not enabled. \n")),
01162         DDS::RETCODE_NOT_ENABLED);
01163   }
01164 
01165   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01166       guard,
01167       this->publication_handle_lock_,
01168       DDS::RETCODE_ERROR);
01169 
01170   // Copy out the handles for the current set of publications.
01171   int index = 0;
01172   publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01173 
01174   for (RepoIdToHandleMap::iterator
01175       current = this->id_to_handle_map_.begin();
01176       current != this->id_to_handle_map_.end();
01177       ++current, ++index) {
01178     publication_handles[ index] = current->second;
01179   }
01180 
01181   return DDS::RETCODE_OK;
01182 }

size_t OpenDDS::DCPS::DataReaderImpl::get_n_chunks (  )  const [inline]

Definition at line 400 of file DataReaderImpl.h.

00400                               {
00401     return n_chunks_;
00402   }

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_next_handle ( const DDS::BuiltinTopicKey_t key  )  [protected]

Get an instance handle for a new instance.

Definition at line 2556 of file DataReaderImpl.cpp.

References domain_id_, get_topic_name(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), is_bit(), participant_servant_, and TheServiceParticipant.

02557 {
02558   if (is_bit()) {
02559     Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_);
02560     CORBA::String_var topic = get_topic_name();
02561     RepoId id = disc->bit_key_to_repo_id(participant_servant_, topic, key);
02562     return participant_servant_->id_to_handle(id);
02563 
02564   } else {
02565     return participant_servant_->id_to_handle(GUID_UNKNOWN);
02566   }
02567 }

void OpenDDS::DCPS::DataReaderImpl::get_ordered_data ( GroupRakeData data,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Definition at line 3218 of file DataReaderImpl.cpp.

References group_coherent_ordered_data_, OpenDDS::DCPS::GroupRakeData::insert_sample(), instances_, and sample_lock_.

03222 {
03223   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03224   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03225 
03226   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
03227       iter != instances_.end(); ++iter) {
03228     SubscriptionInstance *ptr = iter->second;
03229     if ((ptr->instance_state_.view_state() & view_states) &&
03230         (ptr->instance_state_.instance_state() & instance_states)) {
03231       size_t i(0);
03232       for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
03233           item != 0; item = item->next_data_sample_) {
03234         if ((item->sample_state_ & sample_states) && !item->coherent_change_) {
03235           data.insert_sample(item, ptr, ++i);
03236           this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i);
03237         }
03238       }
03239     }
03240   }
03241 }

Priority OpenDDS::DCPS::DataReaderImpl::get_priority_value ( const AssociationData data  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 666 of file DataReaderImpl.h.

References OpenDDS::DCPS::AssociationData::publication_transport_priority_.

00666                                                                  {
00667     return data.publication_transport_priority_;
00668   }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_qos ( DDS::DataReaderQos qos  )  [virtual]

Definition at line 1011 of file DataReaderImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::InstanceState::schedule_release().

01013 {
01014   qos = qos_;
01015   return DDS::RETCODE_OK;
01016 }

ACE_Reactor_Timer_Interface * OpenDDS::DCPS::DataReaderImpl::get_reactor (  ) 

Definition at line 2948 of file DataReaderImpl.cpp.

References reactor_.

Referenced by OpenDDS::DCPS::InstanceState::cancel_release(), and OpenDDS::DCPS::InstanceState::schedule_release().

02949 {
02950   return this->reactor_;
02951 }

const RepoId& OpenDDS::DCPS::DataReaderImpl::get_repo_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 663 of file DataReaderImpl.h.

Referenced by data_received(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout().

00663 { return this->subscription_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_deadline_missed_status ( DDS::RequestedDeadlineMissedStatus status  )  [virtual]

Definition at line 1078 of file DataReaderImpl.cpp.

References last_deadline_missed_total_count_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, requested_deadline_missed_status_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.

01080 {
01081   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01082 
01083   set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
01084       false);
01085 
01086   this->requested_deadline_missed_status_.total_count_change =
01087       this->requested_deadline_missed_status_.total_count
01088       - this->last_deadline_missed_total_count_;
01089 
01090   // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
01091   // is updated by the RequestedDeadlineWatchdog.
01092 
01093   // Update for next status check.
01094   this->last_deadline_missed_total_count_ =
01095       this->requested_deadline_missed_status_.total_count;
01096 
01097   status = requested_deadline_missed_status_;
01098 
01099   return DDS::RETCODE_OK;
01100 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_incompatible_qos_status ( DDS::RequestedIncompatibleQosStatus status  )  [virtual]

Definition at line 1103 of file DataReaderImpl.cpp.

References DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::RequestedIncompatibleQosStatus::total_count_change.

01105 {
01106 
01107   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01108       this->publication_handle_lock_);
01109 
01110   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
01111       false);
01112   status = requested_incompatible_qos_status_;
01113   requested_incompatible_qos_status_.total_count_change = 0;
01114 
01115   return DDS::RETCODE_OK;
01116 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_lost_status ( DDS::SampleLostStatus status  )  [virtual]

Definition at line 1135 of file DataReaderImpl.cpp.

References DDS::RETCODE_OK, DDS::SAMPLE_LOST_STATUS, sample_lost_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleLostStatus::total_count_change.

01137 {
01138   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01139 
01140   set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
01141   status = sample_lost_status_;
01142   sample_lost_status_.total_count_change = 0;
01143   return DDS::RETCODE_OK;
01144 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_rejected_status ( DDS::SampleRejectedStatus status  )  [virtual]

Definition at line 1050 of file DataReaderImpl.cpp.

References DDS::RETCODE_OK, DDS::SAMPLE_REJECTED_STATUS, sample_rejected_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleRejectedStatus::total_count_change.

01052 {
01053   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
01054 
01055   set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
01056   status = sample_rejected_status_;
01057   sample_rejected_status_.total_count_change = 0;
01058   return DDS::RETCODE_OK;
01059 }

DDS::Subscriber_ptr OpenDDS::DCPS::DataReaderImpl::get_subscriber (  )  [virtual]

Definition at line 1044 of file DataReaderImpl.cpp.

References subscriber_servant_.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::DRMonitorImpl::report().

01045 {
01046   return DDS::Subscriber::_duplicate(subscriber_servant_);
01047 }

SubscriberImpl * OpenDDS::DCPS::DataReaderImpl::get_subscriber_servant (  )  [protected]

Definition at line 1811 of file DataReaderImpl.cpp.

References subscriber_servant_.

Referenced by post_read_or_take().

01812 {
01813   return subscriber_servant_;
01814 }

RepoId OpenDDS::DCPS::DataReaderImpl::get_subscription_id (  )  const

Definition at line 1816 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WriterInfoListener::subscription_id_.

Referenced by OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DRPeriodicMonitorImpl::report(), and OpenDDS::DCPS::DRMonitorImpl::report().

01817 {
01818   return subscription_id_;
01819 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_subscription_matched_status ( DDS::SubscriptionMatchedStatus status  )  [virtual]

Definition at line 1119 of file DataReaderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count_change, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, and DDS::SubscriptionMatchedStatus::total_count_change.

01121 {
01122 
01123   ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(
01124       this->publication_handle_lock_);
01125 
01126   set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
01127   status = subscription_match_status_;
01128   subscription_match_status_.total_count_change = 0;
01129   subscription_match_status_.current_count_change = 0;
01130 
01131   return DDS::RETCODE_OK;
01132 }

OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_topic_id (  ) 

Definition at line 2954 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TopicImpl::get_id(), and topic_servant_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02955 {
02956   return this->topic_servant_->get_id();
02957 }

char * OpenDDS::DCPS::DataReaderImpl::get_topic_name (  )  const

Definition at line 1822 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TopicDescriptionImpl::get_name(), and topic_servant_.

Referenced by get_next_handle().

01823 {
01824   return topic_servant_->get_name();
01825 }

DDS::TopicDescription_ptr OpenDDS::DCPS::DataReaderImpl::get_topicdescription (  )  [virtual]

Definition at line 1033 of file DataReaderImpl.cpp.

References get_cf_topic(), and topic_desc_.

Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

01034 {
01035 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01036   DDS::ContentFilteredTopic_ptr cft = this->get_cf_topic();
01037   if (cft) {
01038     return cft; // get_cf_topic has already _duplicated()
01039   }
01040 #endif
01041   return DDS::TopicDescription::_duplicate(topic_desc_.in());
01042 }

void OpenDDS::DCPS::DataReaderImpl::get_writer_states ( WriterStatePairVec &  writer_states  ) 

Definition at line 2978 of file DataReaderImpl.cpp.

References writers_.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

02979 {
02980   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02981       read_guard,
02982       this->writers_lock_);
02983   for (WriterMapType::iterator iter = writers_.begin();
02984       iter != writers_.end();
02985       ++iter) {
02986     writer_states.push_back(WriterStatePair(iter->first,
02987         iter->second->get_state()));
02988   }
02989 }

bool OpenDDS::DCPS::DataReaderImpl::has_readcondition ( DDS::ReadCondition_ptr  a_condition  )  [protected]

Definition at line 916 of file DataReaderImpl.cpp.

References read_conditions_.

00917 {
00918   //sample lock already held
00919   DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
00920   return read_conditions_.find(rc) != read_conditions_.end();
00921 }

bool OpenDDS::DCPS::DataReaderImpl::have_instance_states ( DDS::InstanceStateMask  instance_states  )  const

Definition at line 1870 of file DataReaderImpl.cpp.

References instances_.

01872 {
01873   //!!!caller should have acquired sample_lock_
01874   /// @TODO: determine correct failed lock return value.
01875   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01876 
01877   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01878       iter != instances_.end();
01879       ++iter) {
01880     SubscriptionInstance *ptr = iter->second;
01881 
01882     if (ptr->instance_state_.instance_state() & instance_states) {
01883       return true;
01884     }
01885   }
01886 
01887   return false;
01888 }

bool OpenDDS::DCPS::DataReaderImpl::have_sample_states ( DDS::SampleStateMask  sample_states  )  const

Definition at line 1827 of file DataReaderImpl.cpp.

References instances_.

01829 {
01830   //!!!caller should have acquired sample_lock_
01831   /// @TODO: determine correct failed lock return value.
01832   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
01833 
01834   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01835       iter != instances_.end();
01836       ++iter) {
01837     SubscriptionInstance *ptr = iter->second;
01838 
01839     for (ReceivedDataElement *item = ptr->rcvd_samples_.head_;
01840         item != 0; item = item->next_data_sample_) {
01841       if (item->sample_state_ & sample_states) {
01842         return true;
01843       }
01844     }
01845   }
01846 
01847   return false;
01848 }

bool OpenDDS::DCPS::DataReaderImpl::have_view_states ( DDS::ViewStateMask  view_states  )  const

Definition at line 1851 of file DataReaderImpl.cpp.

References instances_.

01852 {
01853   //!!!caller should have acquired sample_lock_
01854   /// @TODO: determine correct failed lock return value.
01855   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false);
01856 
01857   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01858       iter != instances_.end();
01859       ++iter) {
01860     SubscriptionInstance *ptr = iter->second;
01861 
01862     if (ptr->instance_state_.view_state() & view_states) {
01863       return true;
01864     }
01865   }
01866 
01867   return false;
01868 }

void OpenDDS::DCPS::DataReaderImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 829 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.

00830 {
00831   topic_servant_->inconsistent_topic();
00832 }

void OpenDDS::DCPS::DataReaderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
DDS::DataReaderListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
SubscriberImpl subscriber,
DDS::DataReader_ptr  dr_objref 
)

Definition at line 227 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, domain_id_, dr_local_objref_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), initialized_, is_bit_, is_exclusive_ownership_, listener_, listener_mask_, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant_servant_, qos_, DDS::RETCODE_OK, subscriber_servant_, topic_desc_, and topic_servant_.

Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr(), OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00235 {
00236   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00237   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00238     topic_servant_ = a_topic;
00239     topic_servant_->_add_ref();
00240 
00241     topic_servant_->add_entity_ref();
00242   }
00243 
00244   CORBA::String_var topic_name = a_topic_desc->get_name();
00245 
00246 #if !defined (DDS_HAS_MINIMUM_BIT)
00247   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00248       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00249       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00250       || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00251 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00252 
00253   qos_ = qos;
00254 
00255 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00256   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00257 #endif
00258 
00259   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
00260   listener_mask_ = mask;
00261 
00262   // Only store the participant pointer, since it is our "grand"
00263   // parent, we will exist as long as it does
00264   participant_servant_ = participant;
00265 
00266 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00267   if (is_exclusive_ownership_) {
00268     owner_manager_ = participant_servant_->ownership_manager ();
00269   }
00270 #endif
00271 
00272   domain_id_ = participant_servant_->get_domain_id();
00273 
00274   // Only store the subscriber pointer, since it is our parent, we
00275   // will exist as long as it does.
00276   subscriber_servant_ = subscriber;
00277   dr_local_objref_    = DDS::DataReader::_duplicate(dr_objref);
00278 
00279   if (this->subscriber_servant_->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
00280     ACE_DEBUG((LM_WARNING,
00281         ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
00282         ACE_TEXT("failed to get SubscriberQos\n")));
00283   }
00284 
00285   initialized_ = true;
00286 }

void OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update ( WriterInfo info,
const ACE_Time_Value &  when 
) [private]

Definition at line 2365 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count, instances_, liveliness_changed_status_, and OpenDDS::DCPS::WriterInfo::writer_id_.

Referenced by writer_became_dead(), and writer_removed().

02367 {
02368   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02369   for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
02370       next = iter; iter != instances_.end(); iter = next) {
02371     ++next;
02372     iter->second->instance_state_.writer_became_dead(
02373         info.writer_id_, liveliness_changed_status_.alive_count, when);
02374   }
02375 }

bool OpenDDS::DCPS::DataReaderImpl::is_bit (  )  const

Definition at line 2849 of file DataReaderImpl.cpp.

References is_bit_.

Referenced by get_next_handle().

02850 {
02851   return this->is_bit_;
02852 }

void OpenDDS::DCPS::DataReaderImpl::listener_add_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 680 of file DataReaderImpl.h.

00680 { EntityImpl::_add_ref(); }

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::listener_for ( DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change. If this datareader has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/subscriber.

Definition at line 1921 of file DataReaderImpl.cpp.

References listener_, OpenDDS::DCPS::SubscriberImpl::listener_for(), listener_mask_, and subscriber_servant_.

Referenced by coherent_changes_completed(), OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().

01922 {
01923   // per 2.1.4.3.1 Listener Access to Plain Communication Status
01924   // use this entities factory if listener is mask not enabled
01925   // for this kind.
01926   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01927     return subscriber_servant_->listener_for(kind);
01928 
01929   } else {
01930     return DDS::DataReaderListener::_duplicate(listener_.in());
01931   }
01932 }

void OpenDDS::DCPS::DataReaderImpl::listener_remove_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 681 of file DataReaderImpl.h.

00681 { EntityImpl::_remove_ref(); }

void OpenDDS::DCPS::DataReaderImpl::liveliness_lost (  ) 

virtual void OpenDDS::DCPS::DataReaderImpl::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance *&  instance 
) [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by data_received().

virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::lookup_instance_generic ( const void *  data  )  [pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

bool OpenDDS::DCPS::DataReaderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the publication repo ids.

Definition at line 2679 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.

Referenced by notify_latency(), notify_subscription_disconnected(), and notify_subscription_lost().

02681 {
02682   if (DCPS_debug_level > 9) {
02683     CORBA::ULong const size = ids.length();
02684     const char* separator = "";
02685     OPENDDS_STRING guids;
02686 
02687     for (unsigned long i = 0; i < size; ++i) {
02688       guids += separator;
02689       guids += OPENDDS_STRING(GuidConverter(ids[i]));
02690       separator = ", ";
02691     }
02692 
02693     ACE_DEBUG((LM_DEBUG,
02694         ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
02695         ACE_TEXT("searching for handles for writer Ids: %C.\n"),
02696         guids.c_str()));
02697   }
02698 
02699   CORBA::ULong const num_wrts = ids.length();
02700   hdls.length(num_wrts);
02701 
02702   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
02703     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02704   }
02705 
02706   return true;
02707 }

void OpenDDS::DCPS::DataReaderImpl::notify_connection_deleted ( const RepoId peerId  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2666 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion().

02667 {
02668   DBG_ENTRY_LVL("DataReaderImpl","notify_connection_deleted",6);
02669   on_notification_of_connection_deletion(peerId);
02670   // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02671   // is given to this DataWriter then narrow() fails.
02672   DataReaderListener_var the_listener = DataReaderListener::_narrow(this->listener_.in());
02673 
02674   if (!CORBA::is_nil(the_listener.in()))
02675     the_listener->on_connection_deleted(this->dr_local_objref_.in());
02676 }

void OpenDDS::DCPS::DataReaderImpl::notify_latency ( PublicationId  writer  ) 

Definition at line 2456 of file DataReaderImpl.cpp.

References budget_exceeded_status_, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, lookup_instance_handles(), OpenDDS::DCPS::BudgetExceededStatus::total_count, and OpenDDS::DCPS::BudgetExceededStatus::total_count_change.

Referenced by process_latency().

02457 {
02458   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02459   // is given to this DataReader then narrow() fails.
02460   DataReaderListener_var listener
02461   = DataReaderListener::_narrow(this->listener_.in());
02462 
02463   if (!CORBA::is_nil(listener.in())) {
02464     WriterIdSeq writerIds;
02465     writerIds.length(1);
02466     writerIds[ 0] = writer;
02467 
02468     DDS::InstanceHandleSeq handles;
02469     this->lookup_instance_handles(writerIds, handles);
02470 
02471     if (handles.length() >= 1) {
02472       this->budget_exceeded_status_.last_instance_handle = handles[ 0];
02473 
02474     } else {
02475       this->budget_exceeded_status_.last_instance_handle = -1;
02476     }
02477 
02478     ++this->budget_exceeded_status_.total_count;
02479     ++this->budget_exceeded_status_.total_count_change;
02480 
02481     listener->on_budget_exceeded(
02482         this->dr_local_objref_.in(),
02483         this->budget_exceeded_status_);
02484 
02485     this->budget_exceeded_status_.total_count_change = 0;
02486   }
02487 }

void OpenDDS::DCPS::DataReaderImpl::notify_liveliness_change (  ) 

Definition at line 2878 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, dr_local_objref_, listener_for(), listener_mask_, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_dds_string(), and writers_.

Referenced by writer_became_alive(), writer_became_dead(), and writer_removed().

02879 {
02880   // N.B. writers_lock_ should already be acquired when
02881   //      this method is called.
02882 
02883   DDS::DataReaderListener_var listener
02884   = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
02885 
02886   if (!CORBA::is_nil(listener.in())) {
02887     listener->on_liveliness_changed(dr_local_objref_.in(),
02888         liveliness_changed_status_);
02889 
02890     liveliness_changed_status_.alive_count_change = 0;
02891     liveliness_changed_status_.not_alive_count_change = 0;
02892   }
02893   notify_status_condition();
02894 
02895   if (DCPS_debug_level > 9) {
02896     OPENDDS_STRING output_str;
02897     output_str += "subscription ";
02898     output_str += OPENDDS_STRING(GuidConverter(subscription_id_));
02899     output_str += ", listener at: 0x";
02900     output_str += to_dds_string(this->listener_.in ());
02901 
02902     for (WriterMapType::iterator current = this->writers_.begin();
02903         current != this->writers_.end();
02904         ++current) {
02905       RepoId id = current->first;
02906       output_str += "\n\tNOTIFY: writer[ ";
02907       output_str += OPENDDS_STRING(GuidConverter(id));
02908       output_str += "] == ";
02909       output_str += current->second->get_state_str();
02910     }
02911 
02912     output_str + "\n";
02913     ACE_DEBUG((LM_DEBUG,
02914         ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
02915         ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
02916         ACE_TEXT("\tNOTIFY: %C\n"),
02917         listener.in (),
02918         listener_mask_,
02919         output_str.c_str()));
02920   }
02921 }

void OpenDDS::DCPS::DataReaderImpl::notify_read_conditions (  )  [protected]

Data has arrived into the cache, unblock waiting ReadConditions.

Definition at line 1799 of file DataReaderImpl.cpp.

References read_conditions_, and reverse_sample_lock_.

Referenced by data_received().

01800 {
01801   //sample lock is already held
01802   ReadConditionSet local_read_conditions = read_conditions_;
01803   ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01804 
01805   for (ReadConditionSet::iterator it = local_read_conditions.begin(),
01806       end = local_read_conditions.end(); it != end; ++it) {
01807     dynamic_cast<ConditionImpl*>(it->in())->signal_all();
01808   }
01809 }

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2570 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, lookup_instance_handles(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

02571 {
02572   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
02573 
02574   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02575   // is given to this DataReader then narrow() fails.
02576   DataReaderListener_var the_listener
02577   = DataReaderListener::_narrow(this->listener_.in());
02578 
02579   if (!CORBA::is_nil(the_listener.in())) {
02580     SubscriptionLostStatus status;
02581 
02582     // Since this callback may come after remove_association which removes
02583     // the writer from id_to_handle map, we can ignore this error.
02584     this->lookup_instance_handles(pubids, status.publication_handles);
02585     the_listener->on_subscription_disconnected(this->dr_local_objref_.in(),
02586         status);
02587   }
02588 }

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 2618 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

02619 {
02620   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02621 
02622   if (!this->is_bit_) {
02623     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02624     // is given to this DataReader then narrow() fails.
02625     DataReaderListener_var the_listener
02626     = DataReaderListener::_narrow(this->listener_.in());
02627 
02628     if (!CORBA::is_nil(the_listener.in())) {
02629       SubscriptionLostStatus status;
02630 
02631       CORBA::ULong len = handles.length();
02632       status.publication_handles.length(len);
02633 
02634       for (CORBA::ULong i = 0; i < len; ++ i) {
02635         status.publication_handles[i] = handles[i];
02636       }
02637 
02638       the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02639           status);
02640     }
02641   }
02642 }

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2645 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, lookup_instance_handles(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

Referenced by remove_associations_i().

02646 {
02647   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
02648 
02649   // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02650   // is given to this DataReader then narrow() fails.
02651   DataReaderListener_var the_listener
02652   = DataReaderListener::_narrow(this->listener_.in());
02653 
02654   if (!CORBA::is_nil(the_listener.in())) {
02655     SubscriptionLostStatus status;
02656 
02657     // Since this callback may come after remove_association which removes
02658     // the writer from id_to_handle map, we can ignore this error.
02659     this->lookup_instance_handles(pubids, status.publication_handles);
02660     the_listener->on_subscription_lost(this->dr_local_objref_.in(),
02661         status);
02662   }
02663 }

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2591 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

02592 {
02593   DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
02594 
02595   if (!this->is_bit_) {
02596     // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
02597     // is given to this DataReader then narrow() fails.
02598     DataReaderListener_var the_listener
02599     = DataReaderListener::_narrow(this->listener_.in());
02600 
02601     if (!CORBA::is_nil(the_listener.in())) {
02602       SubscriptionLostStatus status;
02603 
02604       // If it's reconnected then the reader should be in id_to_handle map otherwise
02605       // log with an error.
02606       if (this->lookup_instance_handles(pubids, status.publication_handles) == false) {
02607         ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::notify_subscription_reconnected: "
02608             "lookup_instance_handles failed.\n"));
02609       }
02610 
02611       the_listener->on_subscription_reconnected(this->dr_local_objref_.in(),
02612           status);
02613     }
02614   }
02615 }

int OpenDDS::DCPS::DataReaderImpl::num_zero_copies (  )  [virtual]

This method is used for a precondition check of delete_datareader.

Returns:
the number of outstanding zero-copy samples loaned out.

Definition at line 2855 of file DataReaderImpl.cpp.

References instances_.

02856 {
02857   int loans = 0;
02858   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02859       guard,
02860       this->sample_lock_,
02861       1 /* assume we have loans */);
02862   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,1);
02863 
02864   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
02865       iter != instances_.end();
02866       ++iter) {
02867     SubscriptionInstance *ptr = iter->second;
02868 
02869     for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
02870         item != 0; item = item->next_data_sample_) {
02871       loans += item->zero_copy_cnt_.value();
02872     }
02873   }
02874 
02875   return loans;
02876 }

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
SubscriptionInstance  
)

Referenced by deliver_historic(), and resume_sample_processing().

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
RcHandle< WriterInfo ,
GUID_tKeyLessThan   
) [private]

publications writing to this reader.

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
WriterStats  ,
GUID_tKeyLessThan   
)

Type of collection of statistics for writers to this reader.

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET_CMP ( DDS::ReadCondition_var  ,
RCCompLess   
) [private]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( void *   ) 

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( WriterStatePair   ) 

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 

Referenced by signal_liveliness().

OwnershipManager* OpenDDS::DCPS::DataReaderImpl::ownership_manager (  )  const [inline]

Definition at line 458 of file DataReaderImpl.h.

Referenced by OpenDDS::DCPS::InstanceState::~InstanceState().

00458 { return owner_manager_; }

EntityImpl * OpenDDS::DCPS::DataReaderImpl::parent (  )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1785 of file DataReaderImpl.cpp.

References subscriber_servant_.

01786 {
01787   return this->subscriber_servant_;
01788 }

void OpenDDS::DCPS::DataReaderImpl::post_read_or_take (  )  [protected]

Definition at line 2923 of file DataReaderImpl.cpp.

References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

Referenced by end_access().

02924 {
02925   set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02926   get_subscriber_servant()->set_status_changed_flag(
02927       DDS::DATA_ON_READERS_STATUS, false);
02928 }

void OpenDDS::DCPS::DataReaderImpl::prepare_to_delete (  )  [protected]

Definition at line 2530 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::send_final_acks(), OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().

02531 {
02532   this->set_deleted(true);
02533   this->stop_associating();
02534   this->send_final_acks();
02535 }

void OpenDDS::DCPS::DataReaderImpl::process_latency ( const ReceivedDataSample sample  ) 

Definition at line 2401 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::ReceivedDataSample::header_, notify_latency(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::time_value_to_duration().

Referenced by data_received().

02402 {
02403   StatsMapType::iterator location
02404   = this->statistics_.find(sample.header_.publication_id_);
02405 
02406   if (location != this->statistics_.end()) {
02407     // This starts as the current time.
02408     ACE_Time_Value  latency = ACE_OS::gettimeofday();
02409 
02410     // The time interval starts at the send end.
02411     DDS::Duration_t then = {
02412         sample.header_.source_timestamp_sec_,
02413         sample.header_.source_timestamp_nanosec_
02414     };
02415 
02416     // latency delay in ACE_Time_Value format.
02417     latency -= duration_to_time_value(then);
02418 
02419     if (this->statistics_enabled()) {
02420       location->second.add_stat(latency);
02421     }
02422 
02423     if (DCPS_debug_level > 9) {
02424       ACE_DEBUG((LM_DEBUG,
02425           ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02426           ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"),
02427           latency.sec(),
02428           latency.msec()));
02429     }
02430 
02431     // Check latency against the budget.
02432     if (time_value_to_duration(latency)
02433         > this->qos_.latency_budget.duration) {
02434       this->notify_latency(sample.header_.publication_id_);
02435     }
02436 
02437   } else if (DCPS_debug_level > 0) {
02438     /// NB: This message is generated contemporaneously with a similar
02439     ///     message from writer_activity().  That message is not marked
02440     ///     as an error, so we follow that lead and leave this as an
02441     ///     informational message, guarded by debug level.  This seems
02442     ///     to be due to late samples (samples delivered after an
02443     ///     association has been torn down).  We may want to promote this
02444     ///     to a warning if other conditions causing this symptom are
02445     ///     discovered.
02446     GuidConverter reader_converter(subscription_id_);
02447     GuidConverter writer_converter(sample.header_.publication_id_);
02448     ACE_DEBUG((LM_DEBUG,
02449         ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
02450         ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
02451         OPENDDS_STRING(reader_converter).c_str(),
02452         OPENDDS_STRING(writer_converter).c_str()));
02453   }
02454 }

virtual void OpenDDS::DCPS::DataReaderImpl::purge_data ( SubscriptionInstance instance  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.

Referenced by release_instance().

ACE_INLINE unsigned int & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size (  ) 

Configure the size of the raw data collection buffer.

Definition at line 26 of file DataReaderImpl.inl.

References raw_latency_buffer_size_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00027 {
00028   return this->raw_latency_buffer_size_;
00029 }

ACE_INLINE OpenDDS::DCPS::DataCollector< double >::OnFull & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type (  ) 

Configure the type of the raw data collection buffer.

Definition at line 33 of file DataReaderImpl.inl.

References raw_latency_buffer_type_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00034 {
00035   return this->raw_latency_buffer_type_;
00036 }

ACE_INLINE const OpenDDS::DCPS::DataReaderImpl::StatsMapType & OpenDDS::DCPS::DataReaderImpl::raw_latency_statistics (  )  const

Expose the statistics container.

Definition at line 19 of file DataReaderImpl.inl.

References statistics_.

00020 {
00021   return this->statistics_;
00022 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count 
) [pure virtual]

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]

Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]

Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

void OpenDDS::DCPS::DataReaderImpl::register_for_writer ( const RepoId ,
const RepoId ,
const RepoId ,
const TransportLocatorSeq ,
DiscoveryListener  
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3367 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_writer().

03372 {
03373   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
03374 }

void OpenDDS::DCPS::DataReaderImpl::reject_coherent ( PublicationId writer_id,
RepoId publisher_id 
)

Definition at line 3087 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, instances_, OPENDDS_STRING, reset_coherent_info(), and sample_lock_.

Referenced by verify_coherent_changes_completion().

03089 {
03090   if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
03091     GuidConverter reader (this->subscription_id_);
03092     GuidConverter writer (writer_id);
03093     GuidConverter publisher (publisher_id);
03094     ACE_DEBUG((LM_DEBUG,
03095         ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
03096         ACE_TEXT(" reader %C writer %C publisher %C \n"),
03097         OPENDDS_STRING(reader).c_str(),
03098         OPENDDS_STRING(writer).c_str(),
03099         OPENDDS_STRING(publisher).c_str()));
03100   }
03101 
03102   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
03103   ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
03104 
03105   for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
03106       iter != this->instances_.end(); ++iter) {
03107     iter->second->rcvd_strategy_->reject_coherent(
03108         writer_id, publisher_id);
03109   }
03110   this->reset_coherent_info (writer_id, publisher_id);
03111 }

void OpenDDS::DCPS::DataReaderImpl::release_instance ( DDS::InstanceHandle_t  handle  ) 

Release the instance with the handle.

Definition at line 2102 of file DataReaderImpl.cpp.

References get_handle_instance(), instances_, monitor_, owner_manager_, purge_data(), release_instance_i(), OpenDDS::DCPS::OwnershipManager::remove_writers(), and OpenDDS::DCPS::Monitor::report().

Referenced by OpenDDS::DCPS::InstanceState::release().

02103 {
02104 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02105   if (this->is_exclusive_ownership_) {
02106     this->owner_manager_->remove_writers (handle);
02107   }
02108 #endif
02109 
02110   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
02111   SubscriptionInstance* instance = this->get_handle_instance(handle);
02112 
02113   if (instance == 0) {
02114     ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
02115         "could not find the instance by handle 0x%x\n", handle));
02116     return;
02117   }
02118 
02119   this->purge_data(instance);
02120 
02121   { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02122   this->instances_.erase(handle);
02123   }
02124   this->release_instance_i(handle);
02125   if (this->monitor_) {
02126     this->monitor_->report();
02127   }
02128 }

virtual void OpenDDS::DCPS::DataReaderImpl::release_instance_i ( DDS::InstanceHandle_t  handle  )  [protected, pure virtual]

Referenced by release_instance().

void OpenDDS::DCPS::DataReaderImpl::remove_all_associations (  ) 

Definition at line 744 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, publication_handle_lock_, remove_associations(), OpenDDS::DCPS::TransportClient::stop_associating(), and writers_.

00745 {
00746   DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
00747   // stop pending associations
00748   this->stop_associating();
00749 
00750   OpenDDS::DCPS::WriterIdSeq writers;
00751   int size;
00752 
00753   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00754 
00755   {
00756     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00757 
00758     size = static_cast<int>(writers_.size());
00759     writers.length(size);
00760 
00761     WriterMapType::iterator curr_writer = writers_.begin();
00762     WriterMapType::iterator end_writer = writers_.end();
00763 
00764     int i = 0;
00765 
00766     while (curr_writer != end_writer) {
00767       writers[i++] = curr_writer->first;
00768       ++curr_writer;
00769     }
00770   }
00771 
00772   try {
00773     CORBA::Boolean dont_notify_lost = 0;
00774 
00775     if (0 < size) {
00776       remove_associations(writers, dont_notify_lost);
00777     }
00778 
00779   } catch (const CORBA::Exception&) {
00780       ACE_DEBUG((LM_WARNING,
00781                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00782                  ACE_TEXT("caught exception from remove_associations.\n")));
00783   }
00784 }

void OpenDDS::DCPS::DataReaderImpl::remove_associations ( const WriterIdSeq writers,
bool  callback 
) [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 531 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, remove_associations_i(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and writers_.

Referenced by remove_all_associations(), and remove_or_reschedule().

00533 {
00534   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
00535 
00536   if (writers.length() == 0) {
00537     return;
00538   }
00539 
00540   if (DCPS_debug_level >= 1) {
00541     GuidConverter reader_converter(subscription_id_);
00542     GuidConverter writer_converter(writers[0]);
00543     ACE_DEBUG((LM_DEBUG,
00544         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
00545         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00546         is_bit_,
00547         OPENDDS_STRING(reader_converter).c_str(),
00548         OPENDDS_STRING(writer_converter).c_str(),
00549         writers.length()));
00550   }
00551   if (!this->entity_deleted_.value()) {
00552     // stop pending associations for these writer ids
00553     this->stop_associating(writers.get_buffer(), writers.length());
00554 
00555     // writers which are considered non-active and can
00556     // be removed immediately
00557     WriterIdSeq non_active_writers;
00558     {
00559       CORBA::ULong wr_len = writers.length();
00560       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00561 
00562       for (CORBA::ULong i = 0; i < wr_len; i++) {
00563         PublicationId writer_id = writers[i];
00564 
00565         WriterMapType::iterator it = this->writers_.find(writer_id);
00566         if (it != this->writers_.end() &&
00567             it->second->active(TheServiceParticipant->pending_timeout())) {
00568           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00569         } else {
00570           push_back(non_active_writers, writer_id);
00571         }
00572       }
00573     }
00574     remove_associations_i(non_active_writers, notify_lost);
00575   } else {
00576     remove_associations_i(writers, notify_lost);
00577   }
00578 }

void OpenDDS::DCPS::DataReaderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
) [protected, virtual]

Definition at line 601 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dr_local_objref_, end_historic_sweeper_, id_to_handle_map_, is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.

Referenced by remove_associations(), and remove_or_reschedule().

00603 {
00604   DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
00605 
00606   if (writers.length() == 0) {
00607     return;
00608   }
00609 
00610   if (DCPS_debug_level >= 1) {
00611     GuidConverter reader_converter(subscription_id_);
00612     GuidConverter writer_converter(writers[0]);
00613     ACE_DEBUG((LM_DEBUG,
00614         ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00615         ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616         is_bit_,
00617         OPENDDS_STRING(reader_converter).c_str(),
00618         OPENDDS_STRING(writer_converter).c_str(),
00619         writers.length()));
00620   }
00621   DDS::InstanceHandleSeq handles;
00622 
00623   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00624 
00625   // This is used to hold the list of writers which were actually
00626   // removed, which is a proper subset of the writers which were
00627   // requested to be removed.
00628   WriterIdSeq updated_writers;
00629 
00630   CORBA::ULong wr_len;
00631 
00632   //Remove the writers from writer list. If the supplied writer
00633   //is not in the cached writers list then it is already removed.
00634   //We just need remove the writers in the list that have not been
00635   //removed.
00636   {
00637     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638 
00639     wr_len = writers.length();
00640 
00641     for (CORBA::ULong i = 0; i < wr_len; i++) {
00642       PublicationId writer_id = writers[i];
00643 
00644       WriterMapType::iterator it = this->writers_.find(writer_id);
00645 
00646       if (it != this->writers_.end()) {
00647         it->second->removed();
00648         end_historic_sweeper_->cancel_timer(it->second);
00649         remove_association_sweeper_->cancel_timer(it->second);
00650       }
00651 
00652       if (this->writers_.erase(writer_id) == 0) {
00653         if (DCPS_debug_level >= 1) {
00654           GuidConverter converter(writer_id);
00655           ACE_DEBUG((LM_DEBUG,
00656               ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00657               ACE_TEXT("the writer local %C was already removed.\n"),
00658               OPENDDS_STRING(converter).c_str()));
00659         }
00660 
00661       } else {
00662         push_back(updated_writers, writer_id);
00663       }
00664     }
00665   }
00666 
00667   wr_len = updated_writers.length();
00668 
00669   // Return now if the supplied writers have been removed already.
00670   if (wr_len == 0) {
00671     return;
00672   }
00673 
00674   if (!is_bit_) {
00675     // The writer should be in the id_to_handle map at this time.  Note
00676     // it if it not there.
00677     if (this->lookup_instance_handles(updated_writers, handles) == false) {
00678       if (DCPS_debug_level > 4) {
00679         ACE_DEBUG((LM_DEBUG,
00680             ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
00681             ACE_TEXT("lookup_instance_handles failed.\n")));
00682       }
00683     }
00684 
00685     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00686       id_to_handle_map_.erase(updated_writers[i]);
00687     }
00688   }
00689 
00690   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00691     {
00692       this->disassociate(updated_writers[i]);
00693     }
00694   }
00695 
00696   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00697   if (!this->is_bit_) {
00698     // Derive the change in the number of publications writing to this reader.
00699     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00700     this->subscription_match_status_.current_count_change
00701     = matchedPublications - this->subscription_match_status_.current_count;
00702 
00703     // Only process status if the number of publications has changed.
00704     if (this->subscription_match_status_.current_count_change != 0) {
00705       this->subscription_match_status_.current_count = matchedPublications;
00706 
00707       /// Section 7.1.4.1: total_count will not decrement.
00708 
00709       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00710       this->subscription_match_status_.last_publication_handle
00711       = handles[ wr_len - 1];
00712 
00713       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00714 
00715       DDS::DataReaderListener_var listener
00716       = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00717 
00718       if (!CORBA::is_nil(listener.in())) {
00719         listener->on_subscription_matched(
00720             dr_local_objref_.in(),
00721             this->subscription_match_status_);
00722 
00723         // Client will look at it so next time it looks the change should be 0
00724         this->subscription_match_status_.total_count_change = 0;
00725         this->subscription_match_status_.current_count_change = 0;
00726       }
00727       notify_status_condition();
00728     }
00729   }
00730 
00731   // If this remove_association is invoked when the InfoRepo
00732   // detects a lost writer then make a callback to notify
00733   // subscription lost.
00734   if (notify_lost) {
00735     this->notify_subscription_lost(handles);
00736   }
00737 
00738   if (this->monitor_) {
00739     this->monitor_->report();
00740   }
00741 }

void OpenDDS::DCPS::DataReaderImpl::remove_or_reschedule ( const PublicationId pub_id  )  [protected]

Definition at line 581 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::push_back(), remove_associations(), remove_associations_i(), and writers_.

00582 {
00583   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584   WriterMapType::iterator where = writers_.find(pub_id);
00585   if (writers_.end() != where) {
00586     WriterInfo& info = *where->second;
00587     WriterIdSeq writers;
00588     push_back(writers, pub_id);
00589     bool notify = info.notify_lost_;
00590     if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591       write_guard.release();
00592       remove_associations_i(writers, notify);
00593     } else {
00594       write_guard.release();
00595       remove_associations(writers, notify);
00596     }
00597   }
00598 }

void OpenDDS::DCPS::DataReaderImpl::reschedule_deadline (  ) 

Definition at line 2930 of file DataReaderImpl.cpp.

References instances_.

Referenced by OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline().

02931 {
02932   if (this->watchdog_) {
02933     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
02934     for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
02935         iter != this->instances_.end();
02936         ++iter) {
02937       if (iter->second->deadline_timer_id_ != -1) {
02938         if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
02939           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"),
02940               ACE_TEXT("reset_timer_interval")));
02941         }
02942       }
02943     }
02944   }
02945 }

void OpenDDS::DCPS::DataReaderImpl::reset_coherent_info ( const PublicationId writer_id,
const RepoId publisher_id 
)

Definition at line 3114 of file DataReaderImpl.cpp.

References writers_.

Referenced by reject_coherent().

03116 {
03117   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
03118 
03119   WriterMapType::iterator itEnd = this->writers_.end();
03120   for (WriterMapType::iterator it = this->writers_.begin();
03121       it != itEnd; ++it) {
03122     if (it->second->writer_id_ == writer_id
03123         && it->second->publisher_id_ == publisher_id) {
03124       it->second->reset_coherent_info();
03125     }
03126   }
03127 }

void OpenDDS::DCPS::DataReaderImpl::reset_latency_stats (  )  [virtual]

Definition at line 2507 of file DataReaderImpl.cpp.

References statistics_.

02508 {
02509   for (StatsMapType::iterator current = this->statistics_.begin();
02510       current != this->statistics_.end();
02511       ++current) {
02512     current->second.reset_stats();
02513   }
02514 }

void OpenDDS::DCPS::DataReaderImpl::reset_ownership ( ::DDS::InstanceHandle_t  instance  ) 

Definition at line 3282 of file DataReaderImpl.cpp.

References writers_.

Referenced by OpenDDS::DCPS::InstanceState::reset_ownership().

03283 {
03284   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03285   for (WriterMapType::iterator iter = writers_.begin();
03286       iter != writers_.end();
03287       ++iter) {
03288     iter->second->set_owner_evaluated(instance, false);
03289   }
03290 }

void OpenDDS::DCPS::DataReaderImpl::resume_sample_processing ( const PublicationId pub_id  )  [private]

when done handling historic samples, resume

Definition at line 3293 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), deliver_historic(), end_historic_sweeper_, OPENDDS_MAP(), and writers_.

Referenced by add_link(), data_received(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout().

03294 {
03295   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
03296   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
03297   WriterMapType::iterator where = writers_.find(pub_id);
03298   if (writers_.end() != where) {
03299     WriterInfo& info = *where->second;
03300     // Stop filtering these
03301     if (info.waiting_for_end_historic_samples_) {
03302       end_historic_sweeper_->cancel_timer(where->second);
03303       if (!info.historic_samples_.empty()) {
03304         info.last_historic_seq_ = info.historic_samples_.rbegin()->first;
03305       }
03306       to_deliver.swap(info.historic_samples_);
03307       write_guard.release();
03308       deliver_historic(to_deliver);
03309     }
03310   }
03311 }

void OpenDDS::DCPS::DataReaderImpl::sample_info ( DDS::SampleInfo sample_info,
const ReceivedDataElement ptr 
) [protected]

Definition at line 1934 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, and OpenDDS::DCPS::ReceivedDataElement::sequence_.

01936 {
01937 
01938   sample_info.sample_rank = 0;
01939 
01940   // generation_rank =
01941   //    (MRSIC.disposed_generation_count +
01942   //     MRSIC.no_writers_generation_count)
01943   //  - (S.disposed_generation_count +
01944   //     S.no_writers_generation_count)
01945   //
01946   sample_info.generation_rank =
01947       (sample_info.disposed_generation_count +
01948           sample_info.no_writers_generation_count) -
01949           sample_info.generation_rank;
01950 
01951   // absolute_generation_rank =
01952   //     (MRS.disposed_generation_count +
01953   //      MRS.no_writers_generation_count)
01954   //   - (S.disposed_generation_count +
01955   //      S.no_writers_generation_count)
01956   //
01957   sample_info.absolute_generation_rank =
01958       (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
01959           static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
01960           sample_info.absolute_generation_rank;
01961 
01962   sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
01963 }

virtual void OpenDDS::DCPS::DataReaderImpl::set_instance_state ( DDS::InstanceHandle_t  instance,
DDS::InstanceStateKind  state 
) [pure virtual]

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_listener ( DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 1018 of file DataReaderImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

01021 {
01022   listener_mask_ = mask;
01023   //note: OK to duplicate  a nil object ref
01024   listener_ = DDS::DataReaderListener::_duplicate(a_listener);
01025   return DDS::RETCODE_OK;
01026 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_qos ( const DDS::DataReaderQos qos  )  [virtual]

Definition at line 941 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::ReactorInterceptor::destroy(), domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::SubscriberImpl::get_qos(), last_deadline_missed_total_count_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, participant_servant_, qos_, requested_deadline_missed_status_, RequestedDeadlineWatchdog, OpenDDS::DCPS::Watchdog::reset_interval(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.

00943 {
00944 
00945   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00946   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00947   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00948 
00949   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00950     if (qos_ == qos)
00951       return DDS::RETCODE_OK;
00952 
00953     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00954       return DDS::RETCODE_IMMUTABLE_POLICY;
00955 
00956     } else {
00957       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00958       DDS::SubscriberQos subscriberQos;
00959       this->subscriber_servant_->get_qos(subscriberQos);
00960       const bool status =
00961           disco->update_subscription_qos(
00962               this->participant_servant_->get_domain_id(),
00963               this->participant_servant_->get_id(),
00964               this->subscription_id_,
00965               qos,
00966               subscriberQos);
00967       if (!status) {
00968         ACE_ERROR_RETURN((LM_ERROR,
00969             ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
00970             ACE_TEXT("qos not updated. \n")),
00971             DDS::RETCODE_ERROR);
00972       }
00973     }
00974 
00975     // Reset the deadline timer if the period has changed.
00976     if (qos_.deadline.period.sec != qos.deadline.period.sec
00977         || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00978       if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00979           && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00980         this->watchdog_ =
00981             new RequestedDeadlineWatchdog(
00982                 this->sample_lock_,
00983                 qos.deadline,
00984                 this,
00985                 this->dr_local_objref_.in(),
00986                 this->requested_deadline_missed_status_,
00987                 this->last_deadline_missed_total_count_);
00988 
00989       } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00990           && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00991         this->watchdog_->cancel_all();
00992         this->watchdog_->destroy();
00993         this->watchdog_ = 0;
00994 
00995       } else {
00996         this->watchdog_->reset_interval(
00997             duration_to_time_value(qos.deadline.period));
00998       }
00999     }
01000 
01001     qos_ = qos;
01002 
01003     return DDS::RETCODE_OK;
01004 
01005   } else {
01006     return DDS::RETCODE_INCONSISTENT_POLICY;
01007   }
01008 }

void OpenDDS::DCPS::DataReaderImpl::set_sample_lost_status ( const DDS::SampleLostStatus status  )  [protected]

Definition at line 2378 of file DataReaderImpl.cpp.

References sample_lost_status_.

02380 {
02381   //!!!caller should have acquired sample_lock_
02382   sample_lost_status_ = status;
02383 }

void OpenDDS::DCPS::DataReaderImpl::set_sample_rejected_status ( const DDS::SampleRejectedStatus status  )  [protected]

Definition at line 2386 of file DataReaderImpl.cpp.

References sample_rejected_status_.

02388 {
02389   //!!!caller should have acquired sample_lock_
02390   sample_rejected_status_ = status;
02391 }

void OpenDDS::DCPS::DataReaderImpl::set_subscriber_qos ( const DDS::SubscriberQos qos  ) 

Definition at line 3246 of file DataReaderImpl.cpp.

References subqos_.

03248 {
03249   this->subqos_ = qos;
03250 }

void OpenDDS::DCPS::DataReaderImpl::signal_liveliness ( const RepoId remote_participant  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 835 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_t::guidPrefix, instances_, OPENDDS_VECTOR(), and writers_.

00836 {
00837   RepoId prefix = remote_participant;
00838   prefix.entityId = EntityId_t();
00839 
00840   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00841 
00842   typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair;
00843   typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
00844   WriterSet writers;
00845 
00846   {
00847     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00848     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00849            limit = writers_.end();
00850          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00851          ++pos) {
00852       writers.push_back(std::make_pair(pos->first, pos->second));
00853     }
00854   }
00855 
00856   ACE_Time_Value when = ACE_OS::gettimeofday();
00857   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00858        pos != limit;
00859        ++pos) {
00860     pos->second->received_activity(when);
00861   }
00862 
00863   if (!writers.empty()) {
00864     ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
00865     for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00866          pos != limit;
00867          ++pos) {
00868       for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
00869            iter != instances_.end();
00870            ++iter) {
00871         SubscriptionInstance *ptr = iter->second;
00872         ptr->instance_state_.lively(pos->first);
00873       }
00874     }
00875   }
00876 }

void OpenDDS::DCPS::DataReaderImpl::statistics_enabled ( CORBA::Boolean  statistics_enabled  )  [virtual]

Definition at line 2523 of file DataReaderImpl.cpp.

References statistics_enabled_.

02525 {
02526   this->statistics_enabled_ = statistics_enabled;
02527 }

CORBA::Boolean OpenDDS::DCPS::DataReaderImpl::statistics_enabled (  )  [virtual]

Definition at line 2517 of file DataReaderImpl.cpp.

References statistics_enabled_.

02518 {
02519   return this->statistics_enabled_;
02520 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::take ( AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [pure virtual]

CORBA::Long OpenDDS::DCPS::DataReaderImpl::total_samples (  )  const [protected]

Definition at line 1965 of file DataReaderImpl.cpp.

References instances_.

01966 {
01967   //!!!caller should have acquired sample_lock_
01968   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
01969 
01970   CORBA::Long count(0);
01971 
01972   for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
01973       iter != instances_.end();
01974       ++iter) {
01975     SubscriptionInstance *ptr = iter->second;
01976 
01977     count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_);
01978   }
01979 
01980   return count;
01981 }

void OpenDDS::DCPS::DataReaderImpl::transport_assoc_done ( int  flags,
const RepoId remote_id 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 412 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, OpenDDS::DCPS::TransportClient::ASSOC_OK, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dr_local_objref_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, liveliness_timer_, monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::Monitor::report(), sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.

00413 {
00414   if (!(flags & ASSOC_OK)) {
00415     if (DCPS_debug_level) {
00416       const GuidConverter conv(remote_id);
00417       ACE_DEBUG((LM_ERROR,
00418           ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00419           ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00420           OPENDDS_STRING(conv).c_str()));
00421     }
00422     return;
00423   }
00424 
00425   const bool active = flags & ASSOC_ACTIVE;
00426   {
00427 
00428     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00429 
00430     // LIVELINESS policy timers are managed here.
00431     if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00432       if (DCPS_debug_level >= 5) {
00433         GuidConverter converter(subscription_id_);
00434         ACE_DEBUG((LM_DEBUG,
00435             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00436             ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00437             OPENDDS_STRING(converter).c_str()));
00438       }
00439       // this call will start the timer if it is not already set
00440       liveliness_timer_->check_liveliness();
00441     }
00442   }
00443   // We no longer hold the publication_handle_lock_.
00444 
00445   if (!is_bit_) {
00446 
00447     DDS::InstanceHandle_t handle = participant_servant_->id_to_handle(remote_id);
00448 
00449     // We acquire the publication_handle_lock_ for the remainder of our
00450     // processing.
00451     {
00452       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
00453 
00454       // This insertion is idempotent.
00455       id_to_handle_map_.insert(
00456           RepoIdToHandleMap::value_type(remote_id, handle));
00457 
00458       if (DCPS_debug_level > 4) {
00459         GuidConverter converter(remote_id);
00460         ACE_DEBUG((LM_DEBUG,
00461             ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
00462             ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00463             OPENDDS_STRING(converter).c_str(),
00464             handle));
00465       }
00466 
00467       // We need to adjust these after the insertions have all completed
00468       // since insertions are not guaranteed to increase the number of
00469       // currently matched publications.
00470       const int matchedPublications = static_cast<int>(id_to_handle_map_.size());
00471       subscription_match_status_.current_count_change =
00472           matchedPublications - subscription_match_status_.current_count;
00473       subscription_match_status_.current_count = matchedPublications;
00474 
00475       ++subscription_match_status_.total_count;
00476       ++subscription_match_status_.total_count_change;
00477 
00478       subscription_match_status_.last_publication_handle = handle;
00479 
00480       set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00481 
00482       DDS::DataReaderListener_var listener =
00483           listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00484 
00485       if (!CORBA::is_nil(listener)) {
00486         listener->on_subscription_matched(dr_local_objref_,
00487             subscription_match_status_);
00488 
00489         // TBD - why does the spec say to change this but not change
00490         //       the ChangeFlagStatus after a listener call?
00491 
00492         // Client will look at it so next time it looks the change should be 0
00493         subscription_match_status_.total_count_change = 0;
00494         subscription_match_status_.current_count_change = 0;
00495       }
00496 
00497       notify_status_condition();
00498     }
00499 
00500     {
00501       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00502       ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00503 
00504       if(!writers_.count(remote_id)){
00505         return;
00506       }
00507       writers_[remote_id]->handle_ = handle;
00508     }
00509   }
00510 
00511   if (!active) {
00512     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00513 
00514     disco->association_complete(domain_id_, participant_servant_->get_id(),
00515         subscription_id_, remote_id);
00516   }
00517 
00518   if (monitor_) {
00519     monitor_->report();
00520   }
00521 }

void OpenDDS::DCPS::DataReaderImpl::unregister_for_writer ( const RepoId ,
const RepoId ,
const RepoId  
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3377 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_writer().

03380 {
03381   TransportClient::unregister_for_writer(participant, readerid, writerid);
03382 }

void OpenDDS::DCPS::DataReaderImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 787 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, dr_local_objref_, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00788 {
00789   DDS::DataReaderListener_var listener =
00790       listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
00791 
00792   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00793       guard,
00794       this->publication_handle_lock_);
00795 
00796 
00797   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00798     // This test should make the method idempotent.
00799     return;
00800   }
00801 
00802   set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00803       true);
00804 
00805   // copy status and increment change
00806   requested_incompatible_qos_status_.total_count = status.total_count;
00807   requested_incompatible_qos_status_.total_count_change +=
00808       status.count_since_last_send;
00809   requested_incompatible_qos_status_.last_policy_id =
00810       status.last_policy_id;
00811   requested_incompatible_qos_status_.policies = status.policies;
00812 
00813   if (!CORBA::is_nil(listener.in())) {
00814     listener->on_requested_incompatible_qos(dr_local_objref_.in(),
00815         requested_incompatible_qos_status_);
00816 
00817     // TBD - why does the spec say to change total_count_change but not
00818     // change the ChangeFlagStatus after a listener call?
00819 
00820     // client just looked at it so next time it looks the
00821     // change should be 0
00822     requested_incompatible_qos_status_.total_count_change = 0;
00823   }
00824 
00825   notify_status_condition();
00826 }

void OpenDDS::DCPS::DataReaderImpl::update_ownership_strength ( const PublicationId pub_id,
const CORBA::Long &  ownership_strength 
)

Definition at line 2993 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, and writers_.

02995 {
02996   ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02997       read_guard,
02998       this->writers_lock_);
02999   for (WriterMapType::iterator iter = writers_.begin();
03000       iter != writers_.end();
03001       ++iter) {
03002     if (iter->second->writer_id_ == pub_id) {
03003       if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) {
03004         if (DCPS_debug_level >= 1) {
03005           GuidConverter reader_converter(this->subscription_id_);
03006           GuidConverter writer_converter(pub_id);
03007           ACE_DEBUG((LM_DEBUG,
03008               ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
03009               ACE_TEXT("local %C update remote %C strength from %d to %d \n"),
03010               OPENDDS_STRING(reader_converter).c_str(),
03011               OPENDDS_STRING(writer_converter).c_str(),
03012               iter->second->writer_qos_.ownership_strength, ownership_strength));
03013         }
03014         iter->second->writer_qos_.ownership_strength.value = ownership_strength;
03015         iter->second->clear_owner_evaluated ();
03016       }
03017       break;
03018     }
03019   }
03020 }

void OpenDDS::DCPS::DataReaderImpl::update_subscription_params ( const DDS::StringSeq params  )  const

Definition at line 3271 of file DataReaderImpl.cpp.

References domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), participant_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and TheServiceParticipant.

03272 {
03273   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
03274   disco->update_subscription_params(participant_servant_->get_domain_id(),
03275       participant_servant_->get_id(),
03276       subscription_id_,
03277       params);
03278 }

bool OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion ( WriterInfo writer  )  [private]

Definition at line 3024 of file DataReaderImpl.cpp.

References accept_coherent(), OpenDDS::DCPS::SubscriberImpl::coherent_change_received(), OpenDDS::DCPS::WriterInfo::coherent_change_received(), coherent_changes_completed(), OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::WriterInfo::group_coherent_, DDS::INSTANCE_PRESENTATION_QOS, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::WriterInfo::publisher_id_, reject_coherent(), OpenDDS::DCPS::REJECTED, OpenDDS::DCPS::WriterInfo::reset_coherent_info(), state, subscriber_servant_, and OpenDDS::DCPS::WriterInfo::writer_id_.

Referenced by data_received().

03025 {
03026   if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS
03027       || ! this->subqos_.presentation.coherent_access) {
03028     this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03029     this->coherent_changes_completed (this);
03030     return true;
03031   }
03032 
03033   // verify current coherent changes from single writer
03034   Coherent_State state = writer->coherent_change_received();
03035   if (writer->group_coherent_) { // GROUP coherent
03036     if (state != NOT_COMPLETED_YET) {
03037       // verify if all readers received complete coherent changes in a group.
03038       this->subscriber_servant_->coherent_change_received (
03039           writer->publisher_id_, this, state);
03040     }
03041   }
03042   else {  // TOPIC coherent
03043     if (state == COMPLETED) {
03044       this->accept_coherent (writer->writer_id_, writer->publisher_id_);
03045     }
03046     else if (state == REJECTED) {
03047       this->reject_coherent (writer->writer_id_, writer->publisher_id_);
03048     }
03049     else {// NOT_COMPLETED
03050       return false;
03051     }
03052 
03053     // decision made: either COMPLETED or REJECTED
03054     writer->reset_coherent_info ();
03055   }
03056 
03057   return state == COMPLETED;
03058 }

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::wait_for_historical_data ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 1147 of file DataReaderImpl.cpp.

01149 {
01150   // Add your implementation here
01151   return 0;
01152 }

void OpenDDS::DCPS::DataReaderImpl::writer_activity ( const DataSampleHeader header  ) 

update liveliness info for this writer.

Definition at line 1379 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, header, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and writers_.

Referenced by data_received().

01380 {
01381   // caller should have the sample_lock_ !!!
01382 
01383   RcHandle<WriterInfo> writer;
01384 
01385   // The received_activity() has to be called outside the writers_lock_
01386   // because it probably acquire writers_lock_ read lock recursively
01387   // (in handle_timeout). This could cause deadlock when there are writers
01388   // waiting.
01389   {
01390     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
01391     WriterMapType::iterator iter = writers_.find(header.publication_id_);
01392 
01393     if (iter != writers_.end()) {
01394       writer = iter->second;
01395 
01396     } else if (DCPS_debug_level > 4) {
01397       // This may not be an error since it could happen that the sample
01398       // is delivered to the datareader after the write is dis-associated
01399       // with this datareader.
01400       GuidConverter reader_converter(subscription_id_);
01401       GuidConverter writer_converter(header.publication_id_);
01402       ACE_DEBUG((LM_DEBUG,
01403           ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
01404           ACE_TEXT("reader %C is not associated with writer %C.\n"),
01405           OPENDDS_STRING(reader_converter).c_str(),
01406           OPENDDS_STRING(writer_converter).c_str()));
01407     }
01408   }
01409 
01410   if (!writer.is_nil()) {
01411     ACE_Time_Value when = ACE_OS::gettimeofday();
01412     writer->received_activity(when);
01413 
01414     if ((header.message_id_ == SAMPLE_DATA) ||
01415         (header.message_id_ == INSTANCE_REGISTRATION) ||
01416         (header.message_id_ == UNREGISTER_INSTANCE) ||
01417         (header.message_id_ == DISPOSE_INSTANCE) ||
01418         (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
01419 
01420       const SequenceNumber defaultSN;
01421       SequenceRange resetRange(defaultSN, header.sequence_);
01422 
01423       if (writer->seen_data_ && !header.sequence_repair_) {
01424         // Data samples should be acknowledged prior to any
01425         // reader-side filtering to ensure discontinuities
01426         // are not unintentionally introduced.
01427         writer->ack_sequence(header.sequence_);
01428 
01429       } else {
01430         // In order to properly track out of order delivery,
01431         // a baseline must be established based on the first
01432         // data sample received.
01433         writer->seen_data_ = true;
01434         writer->ack_sequence_.reset();
01435         writer->ack_sequence_.insert(resetRange);
01436       }
01437 
01438 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01439       if (header.coherent_change_) {
01440         if (writer->coherent_samples_ == 0) {
01441           writer->coherent_sample_sequence_.reset();
01442           writer->coherent_sample_sequence_.insert(resetRange);
01443         }
01444         else {
01445           writer->coherent_sample_sequence_.insert(header.sequence_);
01446         }
01447       }
01448 #endif
01449     }
01450   }
01451 }

void OpenDDS::DCPS::DataReaderImpl::writer_became_alive ( WriterInfo info,
const ACE_Time_Value &  when 
) [virtual]

tell instances when a DataWriter transitions to being alive The writer state is inout parameter, it has to be set ALIVE before handle_timeout is called since some subroutine use the state.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2216 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, liveliness_timer_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, OpenDDS::DCPS::Monitor::report(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02218 {
02219   if (DCPS_debug_level >= 5) {
02220     GuidConverter reader_converter(subscription_id_);
02221     GuidConverter writer_converter(info.writer_id_);
02222     ACE_DEBUG((LM_DEBUG,
02223         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
02224         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02225         OPENDDS_STRING(reader_converter).c_str(),
02226         OPENDDS_STRING(writer_converter).c_str(),
02227         info.get_state_str().c_str()));
02228   }
02229 
02230   // caller should already have the samples_lock_ !!!
02231 
02232   // NOTE: each instance will change to ALIVE_STATE when they receive a sample
02233 
02234   bool liveliness_changed = false;
02235 
02236   if (info.state_ != WriterInfo::ALIVE) {
02237     liveliness_changed_status_.alive_count++;
02238     liveliness_changed_status_.alive_count_change++;
02239     liveliness_changed = true;
02240   }
02241 
02242   if (info.state_ == WriterInfo::DEAD) {
02243     liveliness_changed_status_.not_alive_count--;
02244     liveliness_changed_status_.not_alive_count_change--;
02245     liveliness_changed = true;
02246   }
02247 
02248   liveliness_changed_status_.last_publication_handle = info.handle_;
02249 
02250   set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02251 
02252   if (liveliness_changed_status_.alive_count < 0) {
02253     ACE_ERROR((LM_ERROR,
02254         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02255         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02256         liveliness_changed_status_.alive_count));
02257     return;
02258   }
02259 
02260   if (liveliness_changed_status_.not_alive_count < 0) {
02261     ACE_ERROR((LM_ERROR,
02262         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
02263         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"),
02264         liveliness_changed_status_.not_alive_count));
02265     return;
02266   }
02267 
02268   // Change the state to ALIVE since handle_timeout may call writer_became_dead
02269   // which need the current state info.
02270   info.state_ = WriterInfo::ALIVE;
02271 
02272   if (this->monitor_) {
02273     this->monitor_->report();
02274   }
02275 
02276   // Call listener only when there are liveliness status changes.
02277   if (liveliness_changed) {
02278     // Avoid possible deadlock by releasing sample_lock_.
02279     // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock()
02280     // for information about the locks involved.
02281     ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02282     this->notify_liveliness_change();
02283   }
02284 
02285   // this call will start the liveliness timer if it is not already set
02286   liveliness_timer_->check_liveliness();
02287 }

void OpenDDS::DCPS::DataReaderImpl::writer_became_dead ( WriterInfo info,
const ACE_Time_Value &  when 
) [virtual]

tell instances when a DataWriter transitions to DEAD The writer state is inout parameter, the state is set to DEAD when it returns.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2290 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::WriterInfo::NOT_SET, notify_liveliness_change(), OPENDDS_STRING, owner_manager_, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::WriterInfo::seen_data_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02292 {
02293   if (DCPS_debug_level >= 5) {
02294     GuidConverter reader_converter(subscription_id_);
02295     GuidConverter writer_converter(info.writer_id_);
02296     ACE_DEBUG((LM_DEBUG,
02297         ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
02298         ACE_TEXT("reader %C from writer %C previous state %C.\n"),
02299 
02300         OPENDDS_STRING(reader_converter).c_str(),
02301         OPENDDS_STRING(writer_converter).c_str(),
02302         info.get_state_str().c_str()));
02303   }
02304 
02305 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02306   if (this->is_exclusive_ownership_) {
02307     this->owner_manager_->remove_writer (info.writer_id_);
02308     info.clear_owner_evaluated ();
02309   }
02310 #endif
02311 
02312   // caller should already have the samples_lock_ !!!
02313   bool liveliness_changed = false;
02314 
02315   if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) {
02316     liveliness_changed_status_.not_alive_count++;
02317     liveliness_changed_status_.not_alive_count_change++;
02318     liveliness_changed = true;
02319   }
02320 
02321   if (info.state_ == WriterInfo::ALIVE) {
02322     liveliness_changed_status_.alive_count--;
02323     liveliness_changed_status_.alive_count_change--;
02324     liveliness_changed_status_.not_alive_count++;
02325     liveliness_changed_status_.not_alive_count_change++;
02326     liveliness_changed = true;
02327   }
02328 
02329   liveliness_changed_status_.last_publication_handle = info.handle_;
02330 
02331   //update the state to DEAD.
02332   info.state_ = WriterInfo::DEAD;
02333   info.seen_data_ = false;
02334 
02335   if (this->monitor_) {
02336     this->monitor_->report();
02337   }
02338 
02339   if (liveliness_changed_status_.alive_count < 0) {
02340     ACE_ERROR((LM_ERROR,
02341         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02342         ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"),
02343         liveliness_changed_status_.alive_count));
02344     return;
02345   }
02346 
02347   if (liveliness_changed_status_.not_alive_count < 0) {
02348     ACE_ERROR((LM_ERROR,
02349         ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
02350         ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"),
02351         liveliness_changed_status_.not_alive_count));
02352     return;
02353   }
02354 
02355   instances_liveliness_update(info, when);
02356 
02357   // Call listener only when there are liveliness status changes.
02358   if (liveliness_changed) {
02359     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02360     this->notify_liveliness_change();
02361   }
02362 }

void OpenDDS::DCPS::DataReaderImpl::writer_removed ( WriterInfo info  )  [virtual]

tell instance when a DataWriter is removed. The liveliness status need update.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2173 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, owner_manager_, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.

02174 {
02175   if (DCPS_debug_level >= 5) {
02176     GuidConverter reader_converter(subscription_id_);
02177     GuidConverter writer_converter(info.writer_id_);
02178     ACE_DEBUG((LM_DEBUG,
02179         ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
02180         ACE_TEXT("reader %C from writer %C.\n"),
02181         OPENDDS_STRING(reader_converter).c_str(),
02182         OPENDDS_STRING(writer_converter).c_str()));
02183   }
02184 
02185 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02186   if (this->is_exclusive_ownership_) {
02187     this->owner_manager_->remove_writer (info.writer_id_);
02188     info.clear_owner_evaluated ();
02189   }
02190 #endif
02191 
02192   bool liveliness_changed = false;
02193 
02194   if (info.state_ == WriterInfo::ALIVE) {
02195     -- liveliness_changed_status_.alive_count;
02196     -- liveliness_changed_status_.alive_count_change;
02197     liveliness_changed = true;
02198   }
02199 
02200   if (info.state_ == WriterInfo::DEAD) {
02201     -- liveliness_changed_status_.not_alive_count;
02202     -- liveliness_changed_status_.not_alive_count_change;
02203     liveliness_changed = true;
02204   }
02205 
02206   liveliness_changed_status_.last_publication_handle = info.handle_;
02207   instances_liveliness_update(info, ACE_OS::gettimeofday());
02208 
02209   if (liveliness_changed) {
02210     set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
02211     this->notify_liveliness_change();
02212   }
02213 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 687 of file DataReaderImpl.h.

friend class EndHistoricSamplesMissedSweeper [friend]

Definition at line 684 of file DataReaderImpl.h.

friend class InstanceState [friend]

Definition at line 683 of file DataReaderImpl.h.

friend class QueryConditionImpl [friend]

Definition at line 192 of file DataReaderImpl.h.

Referenced by create_querycondition().

friend class RemoveAssociationSweeper< DataReaderImpl > [friend]

Definition at line 685 of file DataReaderImpl.h.

friend class RequestedDeadlineWatchdog [friend]

Definition at line 191 of file DataReaderImpl.h.

Referenced by enable(), and set_qos().

friend class SubscriberImpl [friend]

Definition at line 193 of file DataReaderImpl.h.


Member Data Documentation

bool OpenDDS::DCPS::DataReaderImpl::always_get_history_ [private]

Definition at line 818 of file DataReaderImpl.h.

Referenced by filter_sample().

BudgetExceededStatus OpenDDS::DCPS::DataReaderImpl::budget_exceeded_status_ [private]

Definition at line 715 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and notify_latency().

bool OpenDDS::DCPS::DataReaderImpl::coherent_ [protected]

Is accessing to Group coherent changes ?

Definition at line 637 of file DataReaderImpl.h.

Referenced by begin_access(), and end_access().

DDS::ContentFilteredTopic_var OpenDDS::DCPS::DataReaderImpl::content_filtered_topic_ [protected]

Definition at line 632 of file DataReaderImpl.h.

Referenced by cleanup(), enable_filtering(), and get_cf_topic().

CORBA::Long OpenDDS::DCPS::DataReaderImpl::depth_ [private]

Definition at line 698 of file DataReaderImpl.h.

Referenced by enable().

DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id_ [private]

Definition at line 692 of file DataReaderImpl.h.

Referenced by enable(), get_next_handle(), init(), set_qos(), transport_assoc_done(), and update_subscription_params().

DDS::DataReader_var OpenDDS::DCPS::DataReaderImpl::dr_local_objref_ [private]

Definition at line 694 of file DataReaderImpl.h.

Referenced by cleanup(), coherent_changes_completed(), get_dr_obj_ref(), init(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().

EndHistoricSamplesMissedSweeper* OpenDDS::DCPS::DataReaderImpl::end_historic_sweeper_ [private]

Definition at line 695 of file DataReaderImpl.h.

Referenced by add_link(), cleanup(), remove_associations_i(), resume_sample_processing(), and ~DataReaderImpl().

GroupRakeData OpenDDS::DCPS::DataReaderImpl::group_coherent_ordered_data_ [protected]

Ordered group samples.

Definition at line 640 of file DataReaderImpl.h.

Referenced by end_access(), and get_ordered_data().

RepoIdToHandleMap OpenDDS::DCPS::DataReaderImpl::id_to_handle_map_ [private]

Definition at line 706 of file DataReaderImpl.h.

Referenced by get_matched_publications(), remove_associations_i(), and transport_assoc_done().

bool OpenDDS::DCPS::DataReaderImpl::initialized_ [private]

Flag indicates that the init() is called.

Definition at line 817 of file DataReaderImpl.h.

Referenced by init(), and ~DataReaderImpl().

SubscriptionInstanceMapType OpenDDS::DCPS::DataReaderImpl::instances_ [mutable, protected]

: document why the instances_ container is mutable.

Definition at line 588 of file DataReaderImpl.h.

Referenced by accept_coherent(), cleanup(), contains_sample(), data_received(), get_handle_instance(), get_instance_handles(), get_ordered_data(), have_instance_states(), have_sample_states(), have_view_states(), instances_liveliness_update(), num_zero_copies(), reject_coherent(), release_instance(), reschedule_deadline(), signal_liveliness(), and total_samples().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::instances_lock_ [mutable, protected]

Assume since the container is mutable(?!!?) it may need to use the lock while const. : remove the recursive nature of the instances_lock if not needed.

Definition at line 593 of file DataReaderImpl.h.

bool OpenDDS::DCPS::DataReaderImpl::is_bit_ [private]

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 814 of file DataReaderImpl.h.

Referenced by add_association(), init(), is_bit(), remove_associations(), remove_associations_i(), and transport_assoc_done().

bool OpenDDS::DCPS::DataReaderImpl::is_exclusive_ownership_ [protected]

Definition at line 626 of file DataReaderImpl.h.

Referenced by data_received(), and init().

CORBA::Long OpenDDS::DCPS::DataReaderImpl::last_deadline_missed_total_count_ [private]

Definition at line 807 of file DataReaderImpl.h.

Referenced by enable(), get_requested_deadline_missed_status(), and set_qos().

DDS::DataReaderListener_var OpenDDS::DCPS::DataReaderImpl::listener_ [private]

Definition at line 691 of file DataReaderImpl.h.

Referenced by get_listener(), init(), listener_for(), and set_listener().

DDS::StatusMask OpenDDS::DCPS::DataReaderImpl::listener_mask_ [private]

Definition at line 690 of file DataReaderImpl.h.

Referenced by init(), listener_for(), notify_liveliness_change(), and set_listener().

DDS::LivelinessChangedStatus OpenDDS::DCPS::DataReaderImpl::liveliness_changed_status_ [private]

Definition at line 709 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), get_liveliness_changed_status(), instances_liveliness_update(), notify_liveliness_change(), writer_became_alive(), writer_became_dead(), and writer_removed().

LivelinessTimer* OpenDDS::DCPS::DataReaderImpl::liveliness_timer_ [private]

Definition at line 805 of file DataReaderImpl.h.

Referenced by cleanup(), transport_assoc_done(), writer_became_alive(), and ~DataReaderImpl().

Monitor* OpenDDS::DCPS::DataReaderImpl::monitor_ [private]

Monitor object for this entity.

Definition at line 846 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), enable(), release_instance(), remove_associations_i(), transport_assoc_done(), writer_became_alive(), and writer_became_dead().

size_t OpenDDS::DCPS::DataReaderImpl::n_chunks_ [private]

Definition at line 699 of file DataReaderImpl.h.

Referenced by enable().

OwnershipManager* OpenDDS::DCPS::DataReaderImpl::owner_manager_ [protected]

Definition at line 628 of file DataReaderImpl.h.

Referenced by cleanup(), OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), init(), release_instance(), writer_became_dead(), and writer_removed().

DomainParticipantImpl* OpenDDS::DCPS::DataReaderImpl::participant_servant_ [protected]

Definition at line 622 of file DataReaderImpl.h.

Referenced by get_dp_id(), get_instance_handle(), get_matched_publication_data(), get_next_handle(), init(), OpenDDS::DCPS::InstanceState::sample_info(), set_qos(), transport_assoc_done(), and update_subscription_params().

Monitor* OpenDDS::DCPS::DataReaderImpl::periodic_monitor_ [private]

Periodic Monitor object for this entity.

Definition at line 849 of file DataReaderImpl.h.

Referenced by DataReaderImpl().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::publication_handle_lock_ [private]

Definition at line 702 of file DataReaderImpl.h.

Referenced by add_association(), remove_all_associations(), remove_associations_i(), and transport_assoc_done().

DDS::DataReaderQos OpenDDS::DCPS::DataReaderImpl::qos_ [protected]

Definition at line 610 of file DataReaderImpl.h.

Referenced by enable(), filter_instance(), filter_sample(), get_qos(), init(), and set_qos().

unsigned int OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size_ [private]

Bound (or initial reservation) of raw latency buffer.

Definition at line 836 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_size().

DataCollector<double>::OnFull OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type_ [private]

Type of raw latency data buffer.

Definition at line 839 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_type().

ReceivedDataAllocator* OpenDDS::DCPS::DataReaderImpl::rd_allocator_ [protected]

Definition at line 609 of file DataReaderImpl.h.

Referenced by enable(), and ~DataReaderImpl().

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataReaderImpl::reactor_ [private]

The orb's reactor to be used to register the liveliness timer.

Definition at line 731 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_reactor().

ReadConditionSet OpenDDS::DCPS::DataReaderImpl::read_conditions_ [private]

Definition at line 843 of file DataReaderImpl.h.

Referenced by create_querycondition(), create_readcondition(), delete_contained_entities(), delete_readcondition(), has_readcondition(), and notify_read_conditions().

RemoveAssociationSweeper<DataReaderImpl>* OpenDDS::DCPS::DataReaderImpl::remove_association_sweeper_ [private]

Definition at line 696 of file DataReaderImpl.h.

Referenced by cleanup(), remove_associations(), remove_associations_i(), and ~DataReaderImpl().

DDS::RequestedDeadlineMissedStatus OpenDDS::DCPS::DataReaderImpl::requested_deadline_missed_status_ [private]

Definition at line 710 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), enable(), get_requested_deadline_missed_status(), and set_qos().

DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::DataReaderImpl::requested_incompatible_qos_status_ [private]

Definition at line 711 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), get_requested_incompatible_qos_status(), and update_incompatible_qos().

Reverse_Lock_t OpenDDS::DCPS::DataReaderImpl::reverse_pub_handle_lock_ [private]

Definition at line 703 of file DataReaderImpl.h.

Reverse_Lock_t OpenDDS::DCPS::DataReaderImpl::reverse_sample_lock_ [protected]

Definition at line 620 of file DataReaderImpl.h.

Referenced by coherent_changes_completed(), data_received(), notify_read_conditions(), and writer_became_alive().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::sample_lock_ [protected]

lock protecting sample container as well as statuses.

Definition at line 617 of file DataReaderImpl.h.

Referenced by accept_coherent(), begin_access(), contains_sample(), end_access(), get_instance_handles(), get_ordered_data(), OpenDDS::DCPS::QueryConditionImpl::get_trigger_value(), reject_coherent(), and transport_assoc_done().

DDS::SampleLostStatus OpenDDS::DCPS::DataReaderImpl::sample_lost_status_ [protected]

Definition at line 614 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), get_sample_lost_status(), and set_sample_lost_status().

DDS::SampleRejectedStatus OpenDDS::DCPS::DataReaderImpl::sample_rejected_status_ [protected]

Definition at line 613 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), get_sample_rejected_status(), and set_sample_rejected_status().

StatsMapType OpenDDS::DCPS::DataReaderImpl::statistics_ [private]

Statistics for this reader, collected for each writer.

Definition at line 833 of file DataReaderImpl.h.

Referenced by add_association(), get_latency_stats(), process_latency(), raw_latency_statistics(), and reset_latency_stats().

bool OpenDDS::DCPS::DataReaderImpl::statistics_enabled_ [private]

Flag indicating status of statistics gathering.

Definition at line 821 of file DataReaderImpl.h.

Referenced by statistics_enabled().

DDS::SubscriberQos OpenDDS::DCPS::DataReaderImpl::subqos_ [protected]

Definition at line 642 of file DataReaderImpl.h.

Referenced by set_subscriber_qos().

SubscriberImpl* OpenDDS::DCPS::DataReaderImpl::subscriber_servant_ [private]

Definition at line 693 of file DataReaderImpl.h.

Referenced by coherent_changes_completed(), data_received(), enable(), get_subscriber(), get_subscriber_servant(), init(), listener_for(), parent(), set_qos(), and verify_coherent_changes_completion().

SubscriptionLostStatus OpenDDS::DCPS::DataReaderImpl::subscription_lost_status_ [private]

Todo:
The subscription_lost_status_ and subscription_reconnecting_status_ are left here for future use when we add get_subscription_lost_status() and get_subscription_reconnecting_status() methods.

Definition at line 724 of file DataReaderImpl.h.

DDS::SubscriptionMatchedStatus OpenDDS::DCPS::DataReaderImpl::subscription_match_status_ [private]

Definition at line 712 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), get_subscription_matched_status(), remove_associations_i(), and transport_assoc_done().

DDS::TopicDescription_var OpenDDS::DCPS::DataReaderImpl::topic_desc_ [private]

Definition at line 689 of file DataReaderImpl.h.

Referenced by get_topicdescription(), and init().

TopicImpl* OpenDDS::DCPS::DataReaderImpl::topic_servant_ [protected]

Definition at line 623 of file DataReaderImpl.h.

Referenced by cleanup(), enable(), get_topic_id(), get_topic_name(), inconsistent_topic(), and init().

bool OpenDDS::DCPS::DataReaderImpl::transport_disabled_ [private]

Definition at line 851 of file DataReaderImpl.h.

Referenced by disable_transport(), and enable().

RequestedDeadlineWatchdog* OpenDDS::DCPS::DataReaderImpl::watchdog_ [private]

Watchdog responsible for reporting missed offered deadlines.

Definition at line 810 of file DataReaderImpl.h.

Referenced by cleanup(), data_received(), enable(), and set_qos().

WriterMapType OpenDDS::DCPS::DataReaderImpl::writers_ [private]

Definition at line 827 of file DataReaderImpl.h.

Referenced by add_association(), add_link(), check_historic(), OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i(), cleanup(), coherent_change_received(), data_received(), filter_instance(), get_writer_states(), notify_liveliness_change(), remove_all_associations(), remove_associations(), remove_associations_i(), remove_or_reschedule(), reset_coherent_info(), reset_ownership(), resume_sample_processing(), signal_liveliness(), transport_assoc_done(), update_ownership_strength(), writer_activity(), and ~DataReaderImpl().

ACE_RW_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::writers_lock_ [private]

RW lock for reading/writing publications.

Definition at line 830 of file DataReaderImpl.h.

Referenced by add_association(), add_link(), check_historic(), and OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:12 2016 for OpenDDS by  doxygen 1.4.7