Implements the DDS::DataReader interface. More...
#include <DataReaderImpl.h>
Implements the DDS::DataReader interface.
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
This class must be inherited by the type-specific datareader which is specific to the data-type associated with the topic.
Definition at line 190 of file DataReaderImpl.h.
typedef VarLess<DDS::ReadCondition> OpenDDS::DCPS::DataReaderImpl::RCCompLess [private] |
Definition at line 852 of file DataReaderImpl.h.
typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::DataReaderImpl::Reverse_Lock_t [protected] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 629 of file DataReaderImpl.h.
typedef std::pair<PublicationId, WriterInfo::WriterState> OpenDDS::DCPS::DataReaderImpl::WriterStatePair |
Definition at line 434 of file DataReaderImpl.h.
OpenDDS::DCPS::DataReaderImpl::DataReaderImpl | ( | ) |
Definition at line 53 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, budget_exceeded_status_, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, DDS::SampleRejectedStatus::last_instance_handle, DDS::RequestedDeadlineMissedStatus::last_instance_handle, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::LivelinessChangedStatus::last_publication_handle, DDS::SampleRejectedStatus::last_reason, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::NOT_REJECTED, periodic_monitor_, DDS::RequestedIncompatibleQosStatus::policies, reactor_, requested_deadline_missed_status_, requested_incompatible_qos_status_, sample_lost_status_, sample_rejected_status_, subscription_match_status_, TheServiceParticipant, OpenDDS::DCPS::BudgetExceededStatus::total_count, DDS::SampleRejectedStatus::total_count, DDS::SampleLostStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count, OpenDDS::DCPS::BudgetExceededStatus::total_count_change, DDS::SampleRejectedStatus::total_count_change, DDS::SampleLostStatus::total_count_change, DDS::SubscriptionMatchedStatus::total_count_change, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::RequestedDeadlineMissedStatus::total_count_change.
00054 : qos_(TheServiceParticipant->initial_DataReaderQos()), 00055 reverse_sample_lock_(sample_lock_), 00056 topic_servant_(0), 00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00058 is_exclusive_ownership_ (false), 00059 #endif 00060 coherent_(false), 00061 subqos_ (TheServiceParticipant->initial_SubscriberQos()), 00062 topic_desc_(0), 00063 listener_mask_(DEFAULT_STATUS_MASK), 00064 domain_id_(0), 00065 end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00066 remove_association_sweeper_(make_rch<RemoveAssociationSweeper<DataReaderImpl> >(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00067 n_chunks_(TheServiceParticipant->n_chunks()), 00068 reverse_pub_handle_lock_(publication_handle_lock_), 00069 reactor_(0), 00070 liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00071 last_deadline_missed_total_count_(0), 00072 is_bit_(false), 00073 always_get_history_(false), 00074 statistics_enabled_(false), 00075 raw_latency_buffer_size_(0), 00076 raw_latency_buffer_type_(DataCollector<double>::KeepOldest), 00077 monitor_(0), 00078 periodic_monitor_(0), 00079 transport_disabled_(false) 00080 { 00081 reactor_ = TheServiceParticipant->timer(); 00082 00083 liveliness_changed_status_.alive_count = 0; 00084 liveliness_changed_status_.not_alive_count = 0; 00085 liveliness_changed_status_.alive_count_change = 0; 00086 liveliness_changed_status_.not_alive_count_change = 0; 00087 liveliness_changed_status_.last_publication_handle = 00088 DDS::HANDLE_NIL; 00089 00090 requested_deadline_missed_status_.total_count = 0; 00091 requested_deadline_missed_status_.total_count_change = 0; 00092 requested_deadline_missed_status_.last_instance_handle = 00093 DDS::HANDLE_NIL; 00094 00095 requested_incompatible_qos_status_.total_count = 0; 00096 requested_incompatible_qos_status_.total_count_change = 0; 00097 requested_incompatible_qos_status_.last_policy_id = 0; 00098 requested_incompatible_qos_status_.policies.length(0); 00099 00100 subscription_match_status_.total_count = 0; 00101 subscription_match_status_.total_count_change = 0; 00102 subscription_match_status_.current_count = 0; 00103 subscription_match_status_.current_count_change = 0; 00104 subscription_match_status_.last_publication_handle = 00105 DDS::HANDLE_NIL; 00106 00107 sample_lost_status_.total_count = 0; 00108 sample_lost_status_.total_count_change = 0; 00109 00110 sample_rejected_status_.total_count = 0; 00111 sample_rejected_status_.total_count_change = 0; 00112 sample_rejected_status_.last_reason = DDS::NOT_REJECTED; 00113 sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL; 00114 00115 this->budget_exceeded_status_.total_count = 0; 00116 this->budget_exceeded_status_.total_count_change = 0; 00117 this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL; 00118 00119 monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this); 00120 periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this); 00121 }
OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl | ( | ) | [virtual] |
Definition at line 125 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, ownership_manager(), and topic_servant_.
00126 { 00127 DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6); 00128 00129 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00130 OwnershipManagerPtr owner_manager = this->ownership_manager(); 00131 if (owner_manager) { 00132 owner_manager->unregister_reader(topic_servant_->type_name(), this); 00133 } 00134 #endif 00135 00136 }
void OpenDDS::DCPS::DataReaderImpl::accept_coherent | ( | PublicationId & | writer_id, | |
RepoId & | publisher_id | |||
) |
Definition at line 2934 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instances_, instances_lock_, LM_DEBUG, OPENDDS_STRING, sample_lock_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
Referenced by verify_coherent_changes_completion().
02936 { 02937 if (::OpenDDS::DCPS::DCPS_debug_level > 0) { 02938 GuidConverter reader (this->subscription_id_); 02939 GuidConverter writer (writer_id); 02940 GuidConverter publisher (publisher_id); 02941 ACE_DEBUG((LM_DEBUG, 02942 ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()") 02943 ACE_TEXT(" reader %C writer %C publisher %C \n"), 02944 OPENDDS_STRING(reader).c_str(), 02945 OPENDDS_STRING(writer).c_str(), 02946 OPENDDS_STRING(publisher).c_str())); 02947 } 02948 02949 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 02950 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02951 02952 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 02953 iter != this->instances_.end(); ++iter) { 02954 iter->second->rcvd_strategy_->accept_coherent( 02955 writer_id, publisher_id); 02956 } 02957 }
void OpenDDS::DCPS::DataReaderImpl::accept_sample_processing | ( | const SubscriptionInstance_rch & | instance, | |
const DataSampleHeader & | header, | |||
bool | is_new_instance | |||
) | [protected] |
Definition at line 3259 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER, ACE_OS::gettimeofday(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, OpenDDS::DCPS::RcHandle< T >::in(), LM_WARNING, notify_read_conditions(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, reverse_sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, verify_coherent_changes_completion(), watchdog_, writers_, and writers_lock_.
Referenced by data_received().
03260 { 03261 bool accepted = true; 03262 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 03263 bool verify_coherent = false; 03264 #endif 03265 RcHandle<WriterInfo> writer; 03266 03267 if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) { 03268 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_); 03269 03270 WriterMapType::iterator where = writers_.find(header.publication_id_); 03271 03272 if (where != writers_.end()) { 03273 if (header.coherent_change_) { 03274 03275 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 03276 // Received coherent change 03277 where->second->group_coherent_ = header.group_coherent_; 03278 where->second->publisher_id_ = header.publisher_id_; 03279 ++where->second->coherent_samples_; 03280 verify_coherent = true; 03281 #endif 03282 writer = where->second; 03283 } 03284 } 03285 else { 03286 GuidConverter subscriptionBuffer(subscription_id_); 03287 GuidConverter publicationBuffer(header.publication_id_); 03288 ACE_DEBUG((LM_WARNING, 03289 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ") 03290 ACE_TEXT("subscription %C failed to find ") 03291 ACE_TEXT("publication data for %C.\n"), 03292 OPENDDS_STRING(subscriptionBuffer).c_str(), 03293 OPENDDS_STRING(publicationBuffer).c_str())); 03294 } 03295 } 03296 03297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 03298 if (verify_coherent) { 03299 accepted = verify_coherent_changes_completion(writer.in()); 03300 } 03301 #endif 03302 03303 if (instance && watchdog_.in()) { 03304 instance->last_sample_tv_ = instance->cur_sample_tv_; 03305 instance->cur_sample_tv_ = ACE_OS::gettimeofday(); 03306 03307 // Watchdog can't be called with sample_lock_ due to reactor deadlock 03308 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 03309 if (is_new_instance) { 03310 watchdog_->schedule_timer(instance); 03311 } else { 03312 watchdog_->execute(instance, false); 03313 } 03314 } 03315 03316 if (accepted) { 03317 notify_read_conditions(); 03318 } 03319 }
void OpenDDS::DCPS::DataReaderImpl::add_association | ( | const RepoId & | yourId, | |
const WriterAssociation & | writer, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 207 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, qos_, raw_latency_buffer_size_, raw_latency_buffer_type_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataWriterQos::transport_priority, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
00210 { 00211 if (DCPS_debug_level) { 00212 GuidConverter reader_converter(yourId); 00213 GuidConverter writer_converter(writer.writerId); 00214 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ") 00215 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_, 00216 OPENDDS_STRING(reader_converter).c_str(), 00217 OPENDDS_STRING(writer_converter).c_str())); 00218 } 00219 00220 if (entity_deleted_.value()) { 00221 if (DCPS_debug_level) { 00222 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association") 00223 ACE_TEXT(" This is a deleted datareader, ignoring add.\n"))); 00224 } 00225 return; 00226 } 00227 00228 // We are being called back from the repository before we are done 00229 // processing after our call to the repository that caused this call 00230 // (from the repository) to be made. 00231 if (GUID_UNKNOWN == subscription_id_) { 00232 subscription_id_ = yourId; 00233 } 00234 00235 //Why do we need the publication_handle_lock_ here? No access to id_to_handle_map_... 00236 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00237 00238 00239 // For each writer in the list of writers to associate with, we 00240 // create a WriterInfo and a WriterStats object and store them in 00241 // our internal maps. 00242 // 00243 { 00244 00245 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_); 00246 00247 const PublicationId& writer_id = writer.writerId; 00248 RcHandle<WriterInfo> info = make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos); 00249 std::pair<WriterMapType::iterator, bool> bpair = writers_.insert( 00250 // This insertion is idempotent. 00251 WriterMapType::value_type( 00252 writer_id, 00253 info)); 00254 00255 // Schedule timer if necessary 00256 // - only need to check reader qos - we know the writer must be >= reader 00257 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 00258 info->waiting_for_end_historic_samples_ = true; 00259 } 00260 00261 this->statistics_.insert( 00262 StatsMapType::value_type( 00263 writer_id, 00264 WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_))); 00265 00266 // If this is a durable reader 00267 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 00268 // TODO schedule timer for removing flag from writers 00269 } 00270 00271 if (DCPS_debug_level > 4) { 00272 GuidConverter converter(writer_id); 00273 ACE_DEBUG((LM_DEBUG, 00274 "(%P|%t) DataReaderImpl::add_association: " 00275 "inserted writer %C.return %d \n", 00276 OPENDDS_STRING(converter).c_str(), bpair.second)); 00277 00278 WriterMapType::iterator iter = writers_.find(writer_id); 00279 if (iter != writers_.end()) { 00280 // This may not be an error since it could happen that the sample 00281 // is delivered to the datareader after the write is dis-associated 00282 // with this datareader. 00283 GuidConverter reader_converter(subscription_id_); 00284 GuidConverter writer_converter(writer_id); 00285 ACE_DEBUG((LM_DEBUG, 00286 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ") 00287 ACE_TEXT("reader %C is associated with writer %C.\n"), 00288 OPENDDS_STRING(reader_converter).c_str(), 00289 OPENDDS_STRING(writer_converter).c_str())); 00290 } 00291 } 00292 } 00293 00294 // Propagate the add_associations processing down into the Transport 00295 // layer here. This will establish the transport support and reserve 00296 // usage of an existing connection or initiate creation of a new 00297 // connection if no suitable connection is available. 00298 AssociationData data; 00299 data.remote_id_ = writer.writerId; 00300 data.remote_data_ = writer.writerTransInfo; 00301 data.publication_transport_priority_ = 00302 writer.writerQos.transport_priority.value; 00303 data.remote_reliable_ = 00304 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00305 data.remote_durable_ = 00306 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00307 00308 //Do not hold publication_handle_lock_ when calling associate due to possible reactor 00309 //deadlock on passive side completion 00310 //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_ 00311 //is held here anyway 00312 guard.release(); 00313 00314 if (!associate(data, active)) { 00315 if (DCPS_debug_level) { 00316 ACE_ERROR((LM_ERROR, 00317 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ") 00318 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00319 } 00320 } 00321 }
void OpenDDS::DCPS::DataReaderImpl::add_link | ( | const DataLink_rch & | link, | |
const RepoId & | peer | |||
) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 3219 of file DataReaderImpl.cpp.
References DDS::DataReaderQos::durability, end_historic_sweeper_, OPENDDS_STRING, qos_, resume_sample_processing(), OpenDDS::DCPS::TransportImpl::transport_type(), DDS::VOLATILE_DURABILITY_QOS, writers_, and writers_lock_.
03220 { 03221 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 03222 03223 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_); 03224 03225 WriterMapType::iterator it = writers_.find(peer); 03226 if (it != writers_.end()) { 03227 // Schedule timer if necessary 03228 // - only need to check reader qos - we know the writer must be >= reader 03229 end_historic_sweeper_->schedule_timer(it->second); 03230 } 03231 } 03232 TransportClient::add_link(link, peer); 03233 TransportImpl& impl = link->impl(); 03234 OPENDDS_STRING type = impl.transport_type(); 03235 03236 if (type == "rtps_udp" || type == "multicast") { 03237 resume_sample_processing(peer); 03238 } 03239 }
void OpenDDS::DCPS::DataReaderImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 440 of file DataReaderImpl.cpp.
void OpenDDS::DCPS::DataReaderImpl::begin_access | ( | ) |
Definition at line 3078 of file DataReaderImpl.cpp.
References coherent_, and sample_lock_.
03079 { 03080 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03081 this->coherent_ = true; 03082 }
bool OpenDDS::DCPS::DataReaderImpl::check_historic | ( | const ReceivedDataSample & | sample | ) | [private] |
collect samples received before END_HISTORIC_SAMPLES returns false if normal processing of this sample should be skipped
Definition at line 3188 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), writers_, and writers_lock_.
Referenced by data_received().
03189 { 03190 ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true); 03191 WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_); 03192 if (iter != writers_.end()) { 03193 const SequenceNumber& seq = sample.header_.sequence_; 03194 if (iter->second->waiting_for_end_historic_samples_) { 03195 iter->second->historic_samples_.insert(std::make_pair(seq, sample)); 03196 return false; 03197 } 03198 if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN() 03199 && !sample.header_.historic_sample_ 03200 && seq <= iter->second->last_historic_seq_) { 03201 // this sample must have been seen before the END_HISTORIC_SAMPLES control msg 03202 return false; 03203 } 03204 } 03205 return true; 03206 }
bool OpenDDS::DCPS::DataReaderImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 1647 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportInst::is_reliable(), qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.
01648 { 01649 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 01650 return ti.is_reliable(); 01651 } 01652 return true; 01653 }
void OpenDDS::DCPS::DataReaderImpl::cleanup | ( | void | ) | [virtual] |
Definition at line 140 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::NO_STATUS_MASK, and set_listener().
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().
00141 { 00142 // As first step set our listener to nill which will prevent us from calling 00143 // back onto the listener at the moment the related DDS entity has been 00144 // deleted 00145 set_listener(0, NO_STATUS_MASK); 00146 00147 00148 }
bool OpenDDS::DCPS::DataReaderImpl::coherent_change_received | ( | WriterInfo * | writer | ) | [private] |
void OpenDDS::DCPS::DataReaderImpl::coherent_change_received | ( | RepoId | publisher_id, | |
Coherent_State & | result | |||
) |
Definition at line 3004 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, state, writers_, and writers_lock_.
03005 { 03006 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 03007 03008 result = COMPLETED; 03009 for (WriterMapType::iterator iter = writers_.begin(); 03010 iter != writers_.end(); 03011 ++iter) { 03012 03013 if (iter->second->publisher_id_ == publisher_id) { 03014 const Coherent_State state = iter->second->coherent_change_received(); 03015 if (state == NOT_COMPLETED_YET) { 03016 result = NOT_COMPLETED_YET; 03017 break; 03018 } 03019 else if (state == REJECTED) { 03020 result = REJECTED; 03021 } 03022 } 03023 } 03024 }
void OpenDDS::DCPS::DataReaderImpl::coherent_changes_completed | ( | DataReaderImpl * | reader | ) |
Definition at line 3028 of file DataReaderImpl.cpp.
References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), reverse_sample_lock_, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().
Referenced by verify_coherent_changes_completion().
03029 { 03030 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 03031 if (!subscriber) 03032 return; 03033 03034 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true); 03035 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true); 03036 03037 ::DDS::SubscriberListener_var sub_listener = 03038 subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS); 03039 if (!CORBA::is_nil(sub_listener.in())) 03040 { 03041 if (reader == this) { 03042 // Release the sample_lock before listener callback. 03043 ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 03044 sub_listener->on_data_on_readers(subscriber.in()); 03045 } 03046 03047 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false); 03048 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 03049 } 03050 else 03051 { 03052 subscriber->notify_status_condition(); 03053 03054 ::DDS::DataReaderListener_var listener = 03055 this->listener_for (::DDS::DATA_AVAILABLE_STATUS); 03056 03057 if (!CORBA::is_nil(listener.in())) 03058 { 03059 if (reader == this) { 03060 // Release the sample_lock before listener callback. 03061 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 03062 listener->on_data_available(this); 03063 } else { 03064 listener->on_data_available(this); 03065 } 03066 03067 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false); 03068 subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 03069 } 03070 else 03071 { 03072 this->notify_status_condition(); 03073 } 03074 } 03075 }
bool OpenDDS::DCPS::DataReaderImpl::contains_sample | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) |
Fold-in the three separate loops of have_sample_states(), have_view_states(), and have_instance_states(). Takes the sample_lock_.
Definition at line 1750 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, instances_lock_, item(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, sample_lock_, and OpenDDS::DCPS::InstanceState::view_state().
Referenced by OpenDDS::DCPS::ReadConditionImpl::get_trigger_value().
01752 { 01753 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false); 01754 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01755 01756 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 01757 end = instances_.end(); iter != end; ++iter) { 01758 SubscriptionInstance& inst = *iter->second; 01759 01760 if ((inst.instance_state_.view_state() & view_states) && 01761 (inst.instance_state_.instance_state() & instance_states)) { 01762 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0; 01763 item = item->next_data_sample_) { 01764 if (item->sample_state_ & sample_states 01765 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01766 && !item->coherent_change_ 01767 #endif 01768 ) { 01769 return true; 01770 } 01771 } 01772 } 01773 } 01774 01775 return false; 01776 }
virtual bool OpenDDS::DCPS::DataReaderImpl::contains_sample_filtered | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
const FilterEvaluator & | evaluator, | |||
const DDS::StringSeq & | params | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::QueryConditionImpl::get_trigger_value().
DDS::QueryCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_querycondition | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
const char * | query_expression, | |||
const DDS::StringSeq & | query_parameters | |||
) | [virtual] |
Definition at line 793 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, QueryConditionImpl, read_conditions_, DDS::RETCODE_OK, and sample_lock_.
00799 { 00800 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0); 00801 try { 00802 DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states, 00803 view_states, instance_states, query_expression); 00804 if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) { 00805 return 0; 00806 } 00807 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc); 00808 read_conditions_.insert(rc); 00809 return qc._retn(); 00810 } catch (const std::exception& e) { 00811 if (DCPS_debug_level) { 00812 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ") 00813 ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"), 00814 e.what())); 00815 } 00816 } 00817 return 0; 00818 }
DDS::ReadCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_readcondition | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [virtual] |
Definition at line 780 of file DataReaderImpl.cpp.
References read_conditions_, and sample_lock_.
00784 { 00785 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0); 00786 DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states, 00787 view_states, instance_states); 00788 read_conditions_.insert(rc); 00789 return rc._retn(); 00790 }
void OpenDDS::DCPS::DataReaderImpl::data_received | ( | const ReceivedDataSample & | sample | ) | [virtual] |
process a message that has been received - could be control or a data sample.
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 1354 of file DataReaderImpl.cpp.
References accept_sample_processing(), ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, check_historic(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_demarshal(), OpenDDS::DCPS::DISPOSE_INSTANCE, dispose_unregister(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, dup(), OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::END_HISTORIC_SAMPLES, filter_sample(), OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::EntityImpl::get_deleted(), get_repo_id(), get_subscriber_servant(), OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::INSTANCE_REGISTRATION, instances_, instances_lock_, is_exclusive_ownership_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, notify_read_conditions(), OPENDDS_STRING, ownership_manager(), process_latency(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::RcHandle< T >::reset(), resume_sample_processing(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, verify_coherent_changes_completion(), watchdog_, writer_activity(), writers_, and writers_lock_.
Referenced by deliver_historic().
01355 { 01356 DBG_ENTRY_LVL("DataReaderImpl","data_received",6); 01357 01358 // ensure some other thread is not changing the sample container 01359 // or statuses related to samples. 01360 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 01361 01362 if (get_deleted()) return; 01363 01364 if (DCPS_debug_level > 9) { 01365 GuidConverter converter(subscription_id_); 01366 ACE_DEBUG((LM_DEBUG, 01367 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01368 ACE_TEXT("%C received sample: %C.\n"), 01369 OPENDDS_STRING(converter).c_str(), 01370 to_string(sample.header_).c_str())); 01371 } 01372 01373 switch (sample.header_.message_id_) { 01374 case SAMPLE_DATA: 01375 case INSTANCE_REGISTRATION: { 01376 if (!check_historic(sample)) break; 01377 01378 DataSampleHeader const & header = sample.header_; 01379 01380 this->writer_activity(header); 01381 01382 // Verify data has not exceeded its lifespan. 01383 if (this->filter_sample(header)) break; 01384 01385 // This adds the reader to the set/list of readers with data. 01386 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 01387 if (subscriber) 01388 subscriber->data_received(this); 01389 01390 // Only gather statistics about real samples, not registration data, etc. 01391 if (header.message_id_ == SAMPLE_DATA) { 01392 this->process_latency(sample); 01393 } 01394 01395 // This also adds to the sample container and makes any callbacks 01396 // and condition modifications. 01397 01398 SubscriptionInstance_rch instance; 01399 bool is_new_instance = false; 01400 bool filtered = false; 01401 if (sample.header_.key_fields_only_) { 01402 dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING); 01403 } else { 01404 dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING); 01405 } 01406 01407 // Per sample logging 01408 if (DCPS_debug_level >= 8) { 01409 GuidConverter reader_converter(subscription_id_); 01410 GuidConverter writer_converter(header.publication_id_); 01411 01412 ACE_DEBUG ((LM_DEBUG, 01413 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ") 01414 ACE_TEXT("instance %d is_new_instance %d filtered %d \n"), 01415 OPENDDS_STRING(reader_converter).c_str(), 01416 OPENDDS_STRING(writer_converter).c_str(), 01417 instance ? instance->instance_handle_ : 0, 01418 is_new_instance, filtered)); 01419 } 01420 01421 if (filtered) break; // sample filtered from instance 01422 01423 if (instance) accept_sample_processing(instance, header, is_new_instance); 01424 } 01425 break; 01426 01427 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01428 case END_COHERENT_CHANGES: { 01429 CoherentChangeControl control; 01430 01431 this->writer_activity(sample.header_); 01432 01433 Serializer serializer( 01434 sample.sample_.get(), sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER); 01435 if (!(serializer >> control)) { 01436 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ") 01437 ACE_TEXT("deserialization coherent change control failed.\n"))); 01438 return; 01439 } 01440 01441 if (DCPS_debug_level > 0) { 01442 std::stringstream buffer; 01443 buffer << control << std::endl; 01444 01445 ACE_DEBUG((LM_DEBUG, 01446 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01447 ACE_TEXT("END_COHERENT_CHANGES %C\n"), 01448 buffer.str().c_str())); 01449 } 01450 01451 RcHandle<WriterInfo> writer; 01452 { 01453 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 01454 01455 WriterMapType::iterator it = 01456 this->writers_.find(sample.header_.publication_id_); 01457 01458 if (it == this->writers_.end()) { 01459 GuidConverter sub_id(this->subscription_id_); 01460 GuidConverter pub_id(sample.header_.publication_id_); 01461 ACE_DEBUG((LM_WARNING, 01462 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ") 01463 ACE_TEXT(" subscription %C failed to find ") 01464 ACE_TEXT(" publication data for %C!\n"), 01465 OPENDDS_STRING(sub_id).c_str(), 01466 OPENDDS_STRING(pub_id).c_str())); 01467 return; 01468 } 01469 else { 01470 writer = it->second; 01471 } 01472 it->second->set_group_info (control); 01473 } 01474 01475 if (this->verify_coherent_changes_completion(writer.in())) { 01476 this->notify_read_conditions(); 01477 } 01478 } 01479 break; 01480 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 01481 01482 case DATAWRITER_LIVELINESS: { 01483 if (DCPS_debug_level >= 4) { 01484 GuidConverter reader_converter(subscription_id_); 01485 GuidConverter writer_converter(sample.header_.publication_id_); 01486 ACE_DEBUG((LM_DEBUG, 01487 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01488 ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"), 01489 OPENDDS_STRING(reader_converter).c_str(), 01490 OPENDDS_STRING(writer_converter).c_str())); 01491 } 01492 this->writer_activity(sample.header_); 01493 01494 // tell all instances they got a liveliness message 01495 { 01496 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_); 01497 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01498 iter != instances_.end(); 01499 ++iter) { 01500 iter->second->instance_state_.lively(sample.header_.publication_id_); 01501 } 01502 } 01503 01504 } 01505 break; 01506 01507 case DISPOSE_INSTANCE: { 01508 if (!check_historic(sample)) break; 01509 this->writer_activity(sample.header_); 01510 SubscriptionInstance_rch instance; 01511 01512 if (this->watchdog_.in()) { 01513 // Find the instance first for timer cancellation since 01514 // the instance may be deleted during dispose and can 01515 // not be accessed. 01516 ReceivedDataSample dup(sample); 01517 this->lookup_instance(dup, instance); 01518 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01519 OwnershipManagerPtr owner_manager = this->ownership_manager(); 01520 01521 if (! this->is_exclusive_ownership_ 01522 || (owner_manager 01523 && (instance) 01524 && (owner_manager->is_owner (instance->instance_handle_, 01525 sample.header_.publication_id_)))) { 01526 #endif 01527 this->watchdog_->cancel_timer(instance); 01528 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01529 } 01530 #endif 01531 } 01532 instance.reset(); 01533 this->dispose_unregister(sample, instance); 01534 } 01535 this->notify_read_conditions(); 01536 break; 01537 01538 case UNREGISTER_INSTANCE: { 01539 if (!check_historic(sample)) break; 01540 this->writer_activity(sample.header_); 01541 SubscriptionInstance_rch instance; 01542 01543 if (this->watchdog_.in()) { 01544 // Find the instance first for timer cancellation since 01545 // the instance may be deleted during dispose and can 01546 // not be accessed. 01547 ReceivedDataSample dup(sample); 01548 this->lookup_instance(dup, instance); 01549 if (instance) { 01550 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01551 if (! this->is_exclusive_ownership_ 01552 || (this->is_exclusive_ownership_ 01553 && instance->instance_state_.is_last (sample.header_.publication_id_))) { 01554 #endif 01555 this->watchdog_->cancel_timer(instance); 01556 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01557 } 01558 #endif 01559 } 01560 } 01561 instance.reset(); 01562 this->dispose_unregister(sample, instance); 01563 } 01564 this->notify_read_conditions(); 01565 break; 01566 01567 case DISPOSE_UNREGISTER_INSTANCE: { 01568 if (!check_historic(sample)) break; 01569 this->writer_activity(sample.header_); 01570 SubscriptionInstance_rch instance; 01571 01572 if (this->watchdog_.in()) { 01573 // Find the instance first for timer cancellation since 01574 // the instance may be deleted during dispose and can 01575 // not be accessed. 01576 ReceivedDataSample dup(sample); 01577 this->lookup_instance(dup, instance); 01578 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01579 OwnershipManagerPtr owner_manager = this->ownership_manager(); 01580 if (! this->is_exclusive_ownership_ 01581 || (owner_manager 01582 && (instance) 01583 && (owner_manager->is_owner (instance->instance_handle_, 01584 sample.header_.publication_id_))) 01585 || (this->is_exclusive_ownership_ 01586 && (instance) 01587 && instance->instance_state_.is_last (sample.header_.publication_id_))) { 01588 #endif 01589 if (instance) { 01590 this->watchdog_->cancel_timer(instance); 01591 } 01592 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01593 } 01594 #endif 01595 } 01596 instance.reset(); 01597 this->dispose_unregister(sample, instance); 01598 } 01599 this->notify_read_conditions(); 01600 break; 01601 01602 case END_HISTORIC_SAMPLES: { 01603 if (sample.header_.message_length_ >= sizeof(RepoId)) { 01604 Serializer ser(sample.sample_.get()); 01605 RepoId readerId = GUID_UNKNOWN; 01606 if (!(ser >> readerId)) { 01607 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ") 01608 ACE_TEXT("deserialization reader failed.\n"))); 01609 return; 01610 } 01611 if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) { 01612 break; // not our message 01613 } 01614 } 01615 if (DCPS_debug_level > 4) { 01616 ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n")); 01617 } 01618 // Going to acquire writers lock, release samples lock 01619 guard.release(); 01620 this->resume_sample_processing(sample.header_.publication_id_); 01621 if (DCPS_debug_level > 4) { 01622 GuidConverter pub_id(sample.header_.publication_id_); 01623 ACE_DEBUG(( 01624 LM_INFO, 01625 "(%P|%t) Resumed sample processing for durable writer %C\n", 01626 OPENDDS_STRING(pub_id).c_str())); 01627 } 01628 break; 01629 } 01630 01631 default: 01632 ACE_ERROR((LM_ERROR, 01633 "(%P|%t) ERROR: DataReaderImpl::data_received" 01634 "unexpected message_id = %d\n", 01635 sample.header_.message_id_)); 01636 break; 01637 } 01638 }
virtual void OpenDDS::DCPS::DataReaderImpl::dds_demarshal | ( | const ReceivedDataSample & | sample, | |
SubscriptionInstance_rch & | instance, | |||
bool & | is_new_instance, | |||
bool & | filtered, | |||
MarshalingType | marshaling_type | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by data_received().
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_contained_entities | ( | ) | [virtual] |
Implements DDS::DataReader.
Definition at line 838 of file DataReaderImpl.cpp.
References read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and sample_lock_.
00839 { 00840 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00841 DDS::RETCODE_OUT_OF_RESOURCES); 00842 read_conditions_.clear(); 00843 return DDS::RETCODE_OK; 00844 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_readcondition | ( | DDS::ReadCondition_ptr | a_condition | ) | [virtual] |
Definition at line 828 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, and sample_lock_.
00830 { 00831 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00832 DDS::RETCODE_OUT_OF_RESOURCES); 00833 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition); 00834 return read_conditions_.erase(rc) 00835 ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET; 00836 }
void OpenDDS::DCPS::DataReaderImpl::deliver_historic | ( | OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& | samples | ) | [private] |
deliver samples that were held by check_historic()
Definition at line 3208 of file DataReaderImpl.cpp.
References data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, and OPENDDS_MAP().
Referenced by resume_sample_processing().
03209 { 03210 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t; 03211 const iter_t end = samples.end(); 03212 for (iter_t iter = samples.begin(); iter != end; ++iter) { 03213 iter->second.header_.historic_sample_ = true; 03214 data_received(iter->second); 03215 } 03216 }
ACE_INLINE void OpenDDS::DCPS::DataReaderImpl::disable_transport | ( | ) |
Definition at line 33 of file DataReaderImpl.inl.
References transport_disabled_.
Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr().
00034 { 00035 this->transport_disabled_ = true; 00036 }
void OpenDDS::DCPS::DataReaderImpl::dispose_unregister | ( | const ReceivedDataSample & | sample, | |
SubscriptionInstance_rch & | instance | |||
) | [virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Definition at line 2263 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.
Referenced by data_received().
02265 { 02266 if (DCPS_debug_level > 0) { 02267 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n")); 02268 } 02269 }
DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 673 of file DataReaderImpl.h.
00673 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 1136 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::connection_info(), content_filtered_topic_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::deadline, depth_, domain_id_, dp_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_subscriber_servant(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::DataReaderQos::history, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataReaderQos::liveliness, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, n_chunks_, DDS::Duration_t::nanosec, participant_servant_, qos_, rd_allocator_, OpenDDS::DCPS::ref(), DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), requested_deadline_missed_status_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataReaderQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, sample_lock_, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, transport_disabled_, DDS::VOLATILE_DURABILITY_QOS, and watchdog_.
Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), and OpenDDS::DCPS::SubscriberImpl::create_datareader().
01137 { 01138 //According spec: 01139 // - Calling enable on an already enabled Entity returns OK and has no 01140 // effect. 01141 // - Calling enable on an Entity whose factory is not enabled will fail 01142 // and return PRECONDITION_NOT_MET. 01143 01144 if (this->is_enabled()) { 01145 return DDS::RETCODE_OK; 01146 } 01147 01148 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 01149 if (!subscriber) { 01150 return DDS::RETCODE_ERROR; 01151 } 01152 01153 if (!subscriber->is_enabled()) { 01154 return DDS::RETCODE_PRECONDITION_NOT_MET; 01155 } 01156 01157 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock(); 01158 if (participant) { 01159 dp_id_ = participant->get_id(); 01160 } 01161 01162 if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) { 01163 // The spec says qos_.history.depth is "has no effect" 01164 // when history.kind = KEEP_ALL so use max_samples_per_instance 01165 depth_ = qos_.resource_limits.max_samples_per_instance; 01166 01167 } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS 01168 depth_ = qos_.history.depth; 01169 } 01170 01171 if (depth_ == DDS::LENGTH_UNLIMITED) { 01172 // DDS::LENGTH_UNLIMITED is negative so make it a positive 01173 // value that is for all intents and purposes unlimited 01174 // and we can use it for comparisons. 01175 // use 2147483647L because that is the greatest value a signed 01176 // CORBA::Long can have. 01177 // WARNING: The client risks running out of memory in this case. 01178 depth_ = 2147483647L; 01179 } 01180 01181 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 01182 n_chunks_ = qos_.resource_limits.max_samples; 01183 } 01184 01185 //else using value from Service_Participant 01186 01187 // enable the type specific part of this DataReader 01188 this->enable_specific(); 01189 01190 //Note: the QoS used to set n_chunks_ is Changable=No so 01191 // it is OK that we cannot change the size of our allocators. 01192 rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_)); 01193 01194 if (DCPS_debug_level >= 2) 01195 ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable" 01196 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01197 rd_allocator_.get(), n_chunks_)); 01198 01199 if ((qos_.liveliness.lease_duration.sec != 01200 DDS::DURATION_INFINITE_SEC) && 01201 (qos_.liveliness.lease_duration.nanosec != 01202 DDS::DURATION_INFINITE_NSEC)) { 01203 liveliness_lease_duration_ = 01204 duration_to_time_value(qos_.liveliness.lease_duration); 01205 } 01206 01207 // Setup the requested deadline watchdog if the configured deadline 01208 // period is not the default (infinite). 01209 DDS::Duration_t const deadline_period = this->qos_.deadline.period; 01210 01211 if (!this->watchdog_ 01212 && (deadline_period.sec != DDS::DURATION_INFINITE_SEC 01213 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) { 01214 this->watchdog_ = make_rch<RequestedDeadlineWatchdog>( 01215 ref(this->sample_lock_), 01216 this->qos_.deadline, 01217 ref(*this), 01218 ref(this->requested_deadline_missed_status_), 01219 ref(this->last_deadline_missed_total_count_)); 01220 } 01221 01222 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 01223 disco->pre_reader(this); 01224 01225 this->set_enabled(); 01226 01227 if (topic_servant_ && !transport_disabled_) { 01228 01229 try { 01230 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS, 01231 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 01232 } catch (const Transport::Exception&) { 01233 ACE_ERROR((LM_ERROR, 01234 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ") 01235 ACE_TEXT("Transport Exception.\n"))); 01236 return DDS::RETCODE_ERROR; 01237 01238 } 01239 01240 const TransportLocatorSeq& trans_conf_info = this->connection_info(); 01241 01242 CORBA::String_var filterClassName = ""; 01243 CORBA::String_var filterExpression = ""; 01244 DDS::StringSeq exprParams; 01245 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01246 01247 if (content_filtered_topic_) { 01248 filterClassName = content_filtered_topic_->get_filter_class_name(); 01249 filterExpression = content_filtered_topic_->get_filter_expression(); 01250 content_filtered_topic_->get_expression_parameters(exprParams); 01251 } 01252 01253 #endif 01254 01255 DDS::SubscriberQos sub_qos; 01256 subscriber->get_qos(sub_qos); 01257 01258 this->subscription_id_ = 01259 disco->add_subscription(this->domain_id_, 01260 this->dp_id_, 01261 this->topic_servant_->get_id(), 01262 this, 01263 this->qos_, 01264 trans_conf_info, 01265 sub_qos, 01266 filterClassName, 01267 filterExpression, 01268 exprParams); 01269 01270 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) { 01271 ACE_ERROR((LM_WARNING, 01272 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::enable, ") 01273 ACE_TEXT("add_subscription returned invalid id.\n"))); 01274 return DDS::RETCODE_ERROR; 01275 } 01276 } 01277 01278 if (topic_servant_) { 01279 const CORBA::String_var name = topic_servant_->get_name(); 01280 DDS::ReturnCode_t return_value = 01281 subscriber->reader_enabled(name.in(), this); 01282 01283 if (this->monitor_) { 01284 this->monitor_->report(); 01285 } 01286 01287 return return_value; 01288 } else { 01289 return DDS::RETCODE_OK; 01290 } 01291 }
void OpenDDS::DCPS::DataReaderImpl::enable_filtering | ( | ContentFilteredTopicImpl * | cft | ) |
Definition at line 3130 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader(), and content_filtered_topic_.
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().
03131 { 03132 cft->add_reader(*this); 03133 content_filtered_topic_ = cft; 03134 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable_specific | ( | ) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by enable().
void OpenDDS::DCPS::DataReaderImpl::end_access | ( | ) |
Definition at line 3085 of file DataReaderImpl.cpp.
References coherent_, group_coherent_ordered_data_, post_read_or_take(), OpenDDS::DCPS::GroupRakeData::reset(), and sample_lock_.
03086 { 03087 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03088 this->coherent_ = false; 03089 this->group_coherent_ordered_data_.reset(); 03090 this->post_read_or_take(); 03091 }
bool OpenDDS::DCPS::DataReaderImpl::filter_sample | ( | const DataSampleHeader & | header | ) | [protected] |
Check if the received data sample or instance should be filtered.
Definition at line 2571 of file DataReaderImpl.cpp.
References ACE_TEXT(), always_get_history_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, ACE_OS::gettimeofday(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, qos_, ACE_Time_Value::sec(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::time_to_time_value(), ACE_Time_Value::usec(), and DDS::VOLATILE_DURABILITY_QOS.
Referenced by data_received().
02572 { 02573 ACE_Time_Value now(ACE_OS::gettimeofday()); 02574 02575 // Expire historic data if QoS indicates VOLATILE. 02576 if (!always_get_history_ && header.historic_sample_ 02577 && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) { 02578 if (DCPS_debug_level >= 8) { 02579 ACE_DEBUG((LM_DEBUG, 02580 ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ") 02581 ACE_TEXT("Discarded historic data.\n"))); 02582 } 02583 02584 return true; // Data filtered. 02585 } 02586 02587 // The LIFESPAN_DURATION_FLAG is set when sample data is sent 02588 // with a non-default LIFESPAN duration value. 02589 if (header.lifespan_duration_) { 02590 // Finite lifespan. Check if data has expired. 02591 02592 DDS::Time_t const tmp = { 02593 header.source_timestamp_sec_ + header.lifespan_duration_sec_, 02594 header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_ 02595 }; 02596 02597 // We assume that the publisher host's clock and subcriber host's 02598 // clock are synchronized (allowed by the spec). 02599 ACE_Time_Value const expiration_time( 02600 OpenDDS::DCPS::time_to_time_value(tmp)); 02601 02602 if (now >= expiration_time) { 02603 if (DCPS_debug_level >= 8) { 02604 ACE_Time_Value const diff(now - expiration_time); 02605 ACE_DEBUG((LM_DEBUG, 02606 ACE_TEXT("OpenDDS (%P|%t) Received data ") 02607 ACE_TEXT("expired by %d seconds, %d microseconds.\n"), 02608 diff.sec(), 02609 diff.usec())); 02610 } 02611 02612 return true; // Data filtered. 02613 } 02614 } 02615 02616 return false; 02617 }
DDS::ContentFilteredTopic_ptr OpenDDS::DCPS::DataReaderImpl::get_cf_topic | ( | ) | const |
Definition at line 3137 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), content_filtered_topic_, and OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get().
03138 { 03139 return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get()); 03140 }
CORBA::Long OpenDDS::DCPS::DataReaderImpl::get_depth | ( | ) | const [inline] |
Definition at line 394 of file DataReaderImpl.h.
00394 { 00395 return depth_; 00396 }
OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_dp_id | ( | ) |
Definition at line 2832 of file DataReaderImpl.cpp.
References dp_id_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02833 { 02834 return dp_id_; 02835 }
SubscriptionInstance_rch OpenDDS::DCPS::DataReaderImpl::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) | [protected] |
Definition at line 2413 of file DataReaderImpl.cpp.
References ACE_TEXT(), instances_, instances_lock_, and LM_WARNING.
Referenced by release_instance().
02414 { 02415 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch()); 02416 02417 SubscriptionInstanceMapType::iterator iter = instances_.find(handle); 02418 if (iter == instances_.end()) { 02419 ACE_DEBUG((LM_WARNING, 02420 ACE_TEXT("(%P|%t) WARNING: ") 02421 ACE_TEXT("DataReaderImpl::get_handle_instance: ") 02422 ACE_TEXT("lookup for 0x%x failed\n"), 02423 handle)); 02424 return SubscriptionInstance_rch(); 02425 } // if (0 != instances_.find(handle, instance)) 02426 02427 return iter->second; 02428 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 197 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
00198 { 00199 using namespace OpenDDS::DCPS; 00200 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 00201 if (participant) 00202 return participant->id_to_handle(subscription_id_); 00203 return 0; 00204 }
void OpenDDS::DCPS::DataReaderImpl::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 2838 of file DataReaderImpl.cpp.
References instances_, instances_lock_, and sample_lock_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02839 { 02840 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 02841 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02842 02843 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 02844 end = instances_.end(); iter != end; ++iter) { 02845 instance_handles.push_back(iter->first); 02846 } 02847 }
void OpenDDS::DCPS::DataReaderImpl::get_latency_stats | ( | OpenDDS::DCPS::LatencyStatisticsSeq & | stats | ) | [virtual] |
Definition at line 2366 of file DataReaderImpl.cpp.
References statistics_.
02368 { 02369 stats.length(static_cast<CORBA::ULong>(this->statistics_.size())); 02370 int index = 0; 02371 02372 for (StatsMapType::const_iterator current = this->statistics_.begin(); 02373 current != this->statistics_.end(); 02374 ++current, ++index) { 02375 stats[ index] = current->second.get_stats(); 02376 stats[ index].publication = current->first; 02377 } 02378 }
DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_listener | ( | ) | [virtual] |
Implements DDS::DataReader.
Definition at line 940 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), and listener_.
00941 { 00942 return DDS::DataReaderListener::_duplicate(listener_.in()); 00943 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_liveliness_changed_status | ( | DDS::LivelinessChangedStatus & | status | ) | [virtual] |
Definition at line 973 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count_change, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::RETCODE_OK, sample_lock_, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().
00975 { 00976 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 00977 00978 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, 00979 false); 00980 status = liveliness_changed_status_; 00981 00982 liveliness_changed_status_.alive_count_change = 0; 00983 liveliness_changed_status_.not_alive_count_change = 0; 00984 00985 return DDS::RETCODE_OK; 00986 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publication_data | ( | DDS::PublicationBuiltinTopicData & | publication_data, | |
DDS::InstanceHandle_t | publication_handle | |||
) | [virtual] |
Definition at line 1097 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, publication_handle_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01100 { 01101 if (enabled_ == false) { 01102 ACE_ERROR_RETURN((LM_ERROR, 01103 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::") 01104 ACE_TEXT("get_matched_publication_data: ") 01105 ACE_TEXT("Entity is not enabled. \n")), 01106 DDS::RETCODE_NOT_ENABLED); 01107 } 01108 01109 01110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01111 guard, 01112 this->publication_handle_lock_, 01113 DDS::RETCODE_ERROR); 01114 01115 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 01116 01117 if (!participant) 01118 return DDS::RETCODE_ERROR; 01119 01120 DDS::PublicationBuiltinTopicDataSeq data; 01121 const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>( 01122 participant.in(), 01123 BUILT_IN_PUBLICATION_TOPIC, 01124 publication_handle, 01125 data); 01126 01127 if (ret == DDS::RETCODE_OK) { 01128 publication_data = data[0]; 01129 } 01130 01131 return ret; 01132 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publications | ( | DDS::InstanceHandleSeq & | publication_handles | ) | [virtual] |
Definition at line 1066 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, LM_ERROR, publication_handle_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01068 { 01069 if (enabled_ == false) { 01070 ACE_ERROR_RETURN((LM_ERROR, 01071 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ") 01072 ACE_TEXT(" Entity is not enabled. \n")), 01073 DDS::RETCODE_NOT_ENABLED); 01074 } 01075 01076 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01077 guard, 01078 this->publication_handle_lock_, 01079 DDS::RETCODE_ERROR); 01080 01081 // Copy out the handles for the current set of publications. 01082 int index = 0; 01083 publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size())); 01084 01085 for (RepoIdToHandleMap::iterator 01086 current = this->id_to_handle_map_.begin(); 01087 current != this->id_to_handle_map_.end(); 01088 ++current, ++index) { 01089 publication_handles[ index] = current->second; 01090 } 01091 01092 return DDS::RETCODE_OK; 01093 }
size_t OpenDDS::DCPS::DataReaderImpl::get_n_chunks | ( | ) | const [inline] |
Definition at line 397 of file DataReaderImpl.h.
00397 { 00398 return n_chunks_; 00399 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_next_handle | ( | const DDS::BuiltinTopicKey_t & | key | ) | [protected] |
Get an instance handle for a new instance.
Definition at line 2431 of file DataReaderImpl.cpp.
References domain_id_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), is_bit(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, TheServiceParticipant, and topic_servant_.
02432 { 02433 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 02434 if (!participant) 02435 return 0; 02436 02437 if (is_bit()) { 02438 Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_); 02439 CORBA::String_var topic = topic_servant_->get_name(); 02440 02441 RepoId id = disc->bit_key_to_repo_id(participant.in(), topic, key); 02442 return participant->id_to_handle(id); 02443 02444 } else { 02445 return participant->id_to_handle(GUID_UNKNOWN); 02446 } 02447 }
void OpenDDS::DCPS::DataReaderImpl::get_ordered_data | ( | GroupRakeData & | data, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) |
Definition at line 3094 of file DataReaderImpl.cpp.
References group_coherent_ordered_data_, OpenDDS::DCPS::GroupRakeData::insert_sample(), instances_, instances_lock_, item(), and sample_lock_.
03098 { 03099 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03100 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 03101 03102 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 03103 iter != instances_.end(); ++iter) { 03104 SubscriptionInstance_rch ptr = iter->second; 03105 if ((ptr->instance_state_.view_state() & view_states) && 03106 (ptr->instance_state_.instance_state() & instance_states)) { 03107 size_t i(0); 03108 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 03109 item != 0; item = item->next_data_sample_) { 03110 if ((item->sample_state_ & sample_states) && !item->coherent_change_) { 03111 data.insert_sample(item, ptr, ++i); 03112 this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i); 03113 } 03114 } 03115 } 03116 } 03117 }
Priority OpenDDS::DCPS::DataReaderImpl::get_priority_value | ( | const AssociationData & | data | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 675 of file DataReaderImpl.h.
References OpenDDS::DCPS::AssociationData::publication_transport_priority_.
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_qos | ( | DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 923 of file DataReaderImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::InstanceState::schedule_release().
00925 { 00926 qos = qos_; 00927 return DDS::RETCODE_OK; 00928 }
ACE_Reactor_Timer_Interface * OpenDDS::DCPS::DataReaderImpl::get_reactor | ( | void | ) |
Definition at line 2820 of file DataReaderImpl.cpp.
References reactor_.
Referenced by OpenDDS::DCPS::InstanceState::cancel_release(), and OpenDDS::DCPS::InstanceState::schedule_release().
02821 { 02822 return this->reactor_; 02823 }
const RepoId& OpenDDS::DCPS::DataReaderImpl::get_repo_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 672 of file DataReaderImpl.h.
Referenced by data_received().
00672 { return this->subscription_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_deadline_missed_status | ( | DDS::RequestedDeadlineMissedStatus & | status | ) | [virtual] |
Definition at line 989 of file DataReaderImpl.cpp.
References last_deadline_missed_total_count_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, requested_deadline_missed_status_, DDS::RETCODE_OK, sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.
00991 { 00992 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 00993 00994 set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS, 00995 false); 00996 00997 this->requested_deadline_missed_status_.total_count_change = 00998 this->requested_deadline_missed_status_.total_count 00999 - this->last_deadline_missed_total_count_; 01000 01001 // DDS::RequestedDeadlineMissedStatus::last_instance_handle field 01002 // is updated by the RequestedDeadlineWatchdog. 01003 01004 // Update for next status check. 01005 this->last_deadline_missed_total_count_ = 01006 this->requested_deadline_missed_status_.total_count; 01007 01008 status = requested_deadline_missed_status_; 01009 01010 return DDS::RETCODE_OK; 01011 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_incompatible_qos_status | ( | DDS::RequestedIncompatibleQosStatus & | status | ) | [virtual] |
Definition at line 1014 of file DataReaderImpl.cpp.
References publication_handle_lock_, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::RequestedIncompatibleQosStatus::total_count_change.
01016 { 01017 01018 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe( 01019 this->publication_handle_lock_); 01020 01021 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 01022 false); 01023 status = requested_incompatible_qos_status_; 01024 requested_incompatible_qos_status_.total_count_change = 0; 01025 01026 return DDS::RETCODE_OK; 01027 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_lost_status | ( | DDS::SampleLostStatus & | status | ) | [virtual] |
Definition at line 1046 of file DataReaderImpl.cpp.
References DDS::RETCODE_OK, sample_lock_, DDS::SAMPLE_LOST_STATUS, sample_lost_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleLostStatus::total_count_change.
01048 { 01049 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 01050 01051 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false); 01052 status = sample_lost_status_; 01053 sample_lost_status_.total_count_change = 0; 01054 return DDS::RETCODE_OK; 01055 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_rejected_status | ( | DDS::SampleRejectedStatus & | status | ) | [virtual] |
Definition at line 961 of file DataReaderImpl.cpp.
References DDS::RETCODE_OK, sample_lock_, DDS::SAMPLE_REJECTED_STATUS, sample_rejected_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleRejectedStatus::total_count_change.
00963 { 00964 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 00965 00966 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false); 00967 status = sample_rejected_status_; 00968 sample_rejected_status_.total_count_change = 0; 00969 return DDS::RETCODE_OK; 00970 }
DDS::Subscriber_ptr OpenDDS::DCPS::DataReaderImpl::get_subscriber | ( | ) | [virtual] |
Implements DDS::DataReader.
Definition at line 955 of file DataReaderImpl.cpp.
References get_subscriber_servant().
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::DRMonitorImpl::report().
00956 { 00957 return get_subscriber_servant()._retn(); 00958 }
RcHandle< SubscriberImpl > OpenDDS::DCPS::DataReaderImpl::get_subscriber_servant | ( | ) | [protected] |
Definition at line 1675 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and subscriber_servant_.
Referenced by coherent_changes_completed(), data_received(), enable(), get_subscriber(), listener_for(), post_read_or_take(), set_qos(), and verify_coherent_changes_completion().
01676 { 01677 return subscriber_servant_.lock(); 01678 }
RepoId OpenDDS::DCPS::DataReaderImpl::get_subscription_id | ( | ) | const |
Definition at line 1680 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WriterInfoListener::subscription_id_.
Referenced by OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DRPeriodicMonitorImpl::report(), and OpenDDS::DCPS::DRMonitorImpl::report().
01681 { 01682 return subscription_id_; 01683 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_subscription_matched_status | ( | DDS::SubscriptionMatchedStatus & | status | ) | [virtual] |
Definition at line 1030 of file DataReaderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count_change, publication_handle_lock_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, and DDS::SubscriptionMatchedStatus::total_count_change.
01032 { 01033 01034 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe( 01035 this->publication_handle_lock_); 01036 01037 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false); 01038 status = subscription_match_status_; 01039 subscription_match_status_.total_count_change = 0; 01040 subscription_match_status_.current_count_change = 0; 01041 01042 return DDS::RETCODE_OK; 01043 }
OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_topic_id | ( | ) |
Definition at line 2826 of file DataReaderImpl.cpp.
References topic_servant_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02827 { 02828 return this->topic_servant_->get_id(); 02829 }
DDS::TopicDescription_ptr OpenDDS::DCPS::DataReaderImpl::get_topicdescription | ( | ) | [virtual] |
Implements DDS::DataReader.
Definition at line 945 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), content_filtered_topic_, OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get(), and topic_desc_.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
00946 { 00947 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00948 if (content_filtered_topic_) { 00949 return DDS::TopicDescription::_duplicate(content_filtered_topic_.get()); 00950 } 00951 #endif 00952 return DDS::TopicDescription::_duplicate(topic_desc_.in()); 00953 }
void OpenDDS::DCPS::DataReaderImpl::get_writer_states | ( | WriterStatePairVec & | writer_states | ) |
Definition at line 2850 of file DataReaderImpl.cpp.
References writers_, and writers_lock_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02851 { 02852 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 02853 read_guard, 02854 this->writers_lock_); 02855 for (WriterMapType::iterator iter = writers_.begin(); 02856 iter != writers_.end(); 02857 ++iter) { 02858 writer_states.push_back(WriterStatePair(iter->first, 02859 iter->second->get_state())); 02860 } 02861 }
bool OpenDDS::DCPS::DataReaderImpl::has_readcondition | ( | DDS::ReadCondition_ptr | a_condition | ) | [protected] |
Definition at line 821 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), and read_conditions_.
00822 { 00823 //sample lock already held 00824 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition); 00825 return read_conditions_.find(rc) != read_conditions_.end(); 00826 }
bool OpenDDS::DCPS::DataReaderImpl::has_zero_copies | ( | ) |
This method is used for a precondition check of delete_datareader.
true | We have zero-copy samples loaned out | |
false | We have no zero-copy samples loaned out |
Definition at line 2725 of file DataReaderImpl.cpp.
References instances_, instances_lock_, item(), and sample_lock_.
02726 { 02727 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 02728 guard, 02729 this->sample_lock_, 02730 true /* assume we have loans */); 02731 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true); 02732 02733 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 02734 iter != instances_.end(); 02735 ++iter) { 02736 SubscriptionInstance_rch ptr = iter->second; 02737 02738 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 02739 item != 0; item = item->next_data_sample_) { 02740 if (item->zero_copy_cnt_ > 0) { 02741 return true; 02742 } 02743 } 02744 } 02745 02746 return false; 02747 }
bool OpenDDS::DCPS::DataReaderImpl::have_instance_states | ( | DDS::InstanceStateMask | instance_states | ) | const |
!!caller should have acquired sample_lock_ : determine correct failed lock return value.
Definition at line 1728 of file DataReaderImpl.cpp.
References instances_, and instances_lock_.
01730 { 01731 //!!!caller should have acquired sample_lock_ 01732 /// @TODO: determine correct failed lock return value. 01733 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01734 01735 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01736 iter != instances_.end(); 01737 ++iter) { 01738 SubscriptionInstance_rch ptr = iter->second; 01739 01740 if (ptr->instance_state_.instance_state() & instance_states) { 01741 return true; 01742 } 01743 } 01744 01745 return false; 01746 }
bool OpenDDS::DCPS::DataReaderImpl::have_sample_states | ( | DDS::SampleStateMask | sample_states | ) | const |
!!caller should have acquired sample_lock_ : determine correct failed lock return value.
Definition at line 1685 of file DataReaderImpl.cpp.
References instances_, instances_lock_, and item().
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states().
01687 { 01688 //!!!caller should have acquired sample_lock_ 01689 /// @TODO: determine correct failed lock return value. 01690 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false); 01691 01692 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01693 iter != instances_.end(); 01694 ++iter) { 01695 SubscriptionInstance_rch ptr = iter->second; 01696 01697 for (ReceivedDataElement *item = ptr->rcvd_samples_.head_; 01698 item != 0; item = item->next_data_sample_) { 01699 if (item->sample_state_ & sample_states) { 01700 return true; 01701 } 01702 } 01703 } 01704 01705 return false; 01706 }
bool OpenDDS::DCPS::DataReaderImpl::have_view_states | ( | DDS::ViewStateMask | view_states | ) | const |
!!caller should have acquired sample_lock_ : determine correct failed lock return value.
Definition at line 1709 of file DataReaderImpl.cpp.
References instances_, and instances_lock_.
01710 { 01711 //!!!caller should have acquired sample_lock_ 01712 /// @TODO: determine correct failed lock return value. 01713 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01714 01715 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01716 iter != instances_.end(); 01717 ++iter) { 01718 SubscriptionInstance_rch ptr = iter->second; 01719 01720 if (ptr->instance_state_.view_state() & view_states) { 01721 return true; 01722 } 01723 } 01724 01725 return false; 01726 }
void OpenDDS::DCPS::DataReaderImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 731 of file DataReaderImpl.cpp.
References topic_servant_.
00732 { 00733 topic_servant_->inconsistent_topic(); 00734 }
void OpenDDS::DCPS::DataReaderImpl::init | ( | TopicDescriptionImpl * | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
DDS::DataReaderListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant, | |||
SubscriberImpl * | subscriber | |||
) |
Definition at line 150 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::SubscriberImpl::get_qos(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, LM_WARNING, participant_servant_, qos_, DDS::RETCODE_OK, ACE_OS::strcmp(), subscriber_servant_, topic_desc_, and topic_servant_.
Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00157 { 00158 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc); 00159 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) { 00160 topic_servant_ = a_topic; 00161 } 00162 00163 CORBA::String_var topic_name = a_topic_desc->get_name(); 00164 00165 #if !defined (DDS_HAS_MINIMUM_BIT) 00166 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0 00167 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0 00168 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0 00169 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0; 00170 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00171 00172 qos_ = qos; 00173 00174 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00175 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS; 00176 #endif 00177 00178 listener_ = DDS::DataReaderListener::_duplicate(a_listener); 00179 listener_mask_ = mask; 00180 00181 // Only store the participant pointer, since it is our "grand" 00182 // parent, we will exist as long as it does 00183 participant_servant_ = *participant; 00184 00185 domain_id_ = participant->get_domain_id(); 00186 00187 subscriber_servant_ = *subscriber; 00188 00189 if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) { 00190 ACE_DEBUG((LM_WARNING, 00191 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ") 00192 ACE_TEXT("failed to get SubscriberQos\n"))); 00193 } 00194 }
void OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [private] |
Definition at line 2235 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count, instances_, instances_lock_, liveliness_changed_status_, and OpenDDS::DCPS::WriterInfo::writer_id_.
Referenced by writer_became_dead(), and writer_removed().
02237 { 02238 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02239 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 02240 next = iter; iter != instances_.end(); iter = next) { 02241 ++next; 02242 iter->second->instance_state_.writer_became_dead( 02243 info.writer_id_, liveliness_changed_status_.alive_count, when); 02244 } 02245 }
bool OpenDDS::DCPS::DataReaderImpl::is_bit | ( | ) | const |
Definition at line 2719 of file DataReaderImpl.cpp.
References is_bit_.
Referenced by get_next_handle().
02720 { 02721 return this->is_bit_; 02722 }
DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::listener_for | ( | DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change. If this datareader has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/subscriber.
Definition at line 1779 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), get_subscriber_servant(), CORBA::is_nil(), listener_, and listener_mask_.
Referenced by coherent_changes_completed(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().
01780 { 01781 // per 2.1.4.3.1 Listener Access to Plain Communication Status 01782 // use this entities factory if listener is mask not enabled 01783 // for this kind. 01784 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 01785 if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) { 01786 return subscriber->listener_for(kind); 01787 01788 } else { 01789 return DDS::DataReaderListener::_duplicate(listener_.in()); 01790 } 01791 }
void OpenDDS::DCPS::DataReaderImpl::liveliness_lost | ( | ) |
virtual void OpenDDS::DCPS::DataReaderImpl::lookup_instance | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance_rch & | instance | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by data_received().
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::lookup_instance_generic | ( | const void * | data | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
void OpenDDS::DCPS::DataReaderImpl::lookup_instance_handles | ( | const WriterIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the publication repo ids.
Definition at line 2539 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, and participant_servant_.
Referenced by notify_latency(), notify_subscription_disconnected(), notify_subscription_lost(), notify_subscription_reconnected(), and remove_associations_i().
02541 { 02542 CORBA::ULong const num_wrts = ids.length(); 02543 02544 if (DCPS_debug_level > 9) { 02545 const char* separator = ""; 02546 OPENDDS_STRING guids; 02547 02548 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 02549 guids += separator; 02550 guids += OPENDDS_STRING(GuidConverter(ids[i])); 02551 separator = ", "; 02552 } 02553 02554 ACE_DEBUG((LM_DEBUG, 02555 ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ") 02556 ACE_TEXT("searching for handles for writer Ids: %C.\n"), 02557 guids.c_str())); 02558 } 02559 02560 hdls.length(num_wrts); 02561 02562 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 02563 if (participant) { 02564 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 02565 hdls[i] = participant->id_to_handle(ids[i]); 02566 } 02567 } 02568 }
void OpenDDS::DCPS::DataReaderImpl::notify_latency | ( | PublicationId | writer | ) |
Definition at line 2333 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), budget_exceeded_status_, CORBA::is_nil(), OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, listener_, lookup_instance_handles(), OpenDDS::DCPS::BudgetExceededStatus::total_count, and OpenDDS::DCPS::BudgetExceededStatus::total_count_change.
Referenced by process_latency().
02334 { 02335 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02336 // is given to this DataReader then narrow() fails. 02337 DataReaderListener_var listener 02338 = DataReaderListener::_narrow(this->listener_.in()); 02339 02340 if (!CORBA::is_nil(listener.in())) { 02341 WriterIdSeq writerIds; 02342 writerIds.length(1); 02343 writerIds[ 0] = writer; 02344 02345 DDS::InstanceHandleSeq handles; 02346 this->lookup_instance_handles(writerIds, handles); 02347 02348 if (handles.length() >= 1) { 02349 this->budget_exceeded_status_.last_instance_handle = handles[ 0]; 02350 02351 } else { 02352 this->budget_exceeded_status_.last_instance_handle = -1; 02353 } 02354 02355 ++this->budget_exceeded_status_.total_count; 02356 ++this->budget_exceeded_status_.total_count_change; 02357 02358 listener->on_budget_exceeded(this, this->budget_exceeded_status_); 02359 02360 this->budget_exceeded_status_.total_count_change = 0; 02361 } 02362 }
void OpenDDS::DCPS::DataReaderImpl::notify_liveliness_change | ( | ) |
Definition at line 2749 of file DataReaderImpl.cpp.
References ACE_TEXT(), DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, CORBA::is_nil(), listener_, listener_for(), listener_mask_, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_dds_string(), and writers_.
Referenced by writer_became_alive(), writer_became_dead(), and writer_removed().
02750 { 02751 // N.B. writers_lock_ should already be acquired when 02752 // this method is called. 02753 02754 DDS::DataReaderListener_var listener 02755 = listener_for(DDS::LIVELINESS_CHANGED_STATUS); 02756 02757 if (!CORBA::is_nil(listener.in())) { 02758 listener->on_liveliness_changed(this, liveliness_changed_status_); 02759 02760 liveliness_changed_status_.alive_count_change = 0; 02761 liveliness_changed_status_.not_alive_count_change = 0; 02762 } 02763 notify_status_condition(); 02764 02765 if (DCPS_debug_level > 9) { 02766 OPENDDS_STRING output_str; 02767 output_str += "subscription "; 02768 output_str += OPENDDS_STRING(GuidConverter(subscription_id_)); 02769 output_str += ", listener at: 0x"; 02770 output_str += to_dds_string(this->listener_.in ()); 02771 02772 for (WriterMapType::iterator current = this->writers_.begin(); 02773 current != this->writers_.end(); 02774 ++current) { 02775 RepoId id = current->first; 02776 output_str += "\n\tNOTIFY: writer[ "; 02777 output_str += OPENDDS_STRING(GuidConverter(id)); 02778 output_str += "] == "; 02779 output_str += current->second->get_state_str(); 02780 } 02781 02782 output_str + "\n"; 02783 ACE_DEBUG((LM_DEBUG, 02784 ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ") 02785 ACE_TEXT("listener at 0x%x, mask 0x%x.\n") 02786 ACE_TEXT("\tNOTIFY: %C\n"), 02787 listener.in (), 02788 listener_mask_, 02789 output_str.c_str())); 02790 } 02791 }
void OpenDDS::DCPS::DataReaderImpl::notify_read_conditions | ( | ) | [protected] |
Data has arrived into the cache, unblock waiting ReadConditions.
Definition at line 1655 of file DataReaderImpl.cpp.
References ACE_TEXT(), LM_ERROR, read_conditions_, reverse_sample_lock_, and OpenDDS::DCPS::ConditionImpl::signal_all().
Referenced by accept_sample_processing(), and data_received().
01656 { 01657 //sample lock is already held 01658 ReadConditionSet local_read_conditions = read_conditions_; 01659 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01660 01661 for (ReadConditionSet::iterator it = local_read_conditions.begin(), 01662 end = local_read_conditions.end(); it != end; ++it) { 01663 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in()); 01664 if (ci) { 01665 ci->signal_all(); 01666 } else { 01667 ACE_ERROR((LM_ERROR, 01668 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ") 01669 ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n"))); 01670 } 01671 } 01672 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_disconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2450 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.
02451 { 02452 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6); 02453 02454 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02455 // is given to this DataReader then narrow() fails. 02456 DataReaderListener_var the_listener 02457 = DataReaderListener::_narrow(this->listener_.in()); 02458 02459 if (!CORBA::is_nil(the_listener.in())) { 02460 SubscriptionLostStatus status; 02461 02462 // Since this callback may come after remove_association which removes 02463 // the writer from id_to_handle map, we can ignore this error. 02464 this->lookup_instance_handles(pubids, status.publication_handles); 02465 the_listener->on_subscription_disconnected(this, status); 02466 } 02467 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 2492 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), len, listener_, OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.
02493 { 02494 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6); 02495 02496 if (!this->is_bit_) { 02497 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02498 // is given to this DataReader then narrow() fails. 02499 DataReaderListener_var the_listener 02500 = DataReaderListener::_narrow(this->listener_.in()); 02501 02502 if (!CORBA::is_nil(the_listener.in())) { 02503 SubscriptionLostStatus status; 02504 02505 CORBA::ULong len = handles.length(); 02506 status.publication_handles.length(len); 02507 02508 for (CORBA::ULong i = 0; i < len; ++ i) { 02509 status.publication_handles[i] = handles[i]; 02510 } 02511 02512 the_listener->on_subscription_lost(this, status); 02513 } 02514 } 02515 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2518 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.
Referenced by remove_associations_i().
02519 { 02520 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6); 02521 02522 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02523 // is given to this DataReader then narrow() fails. 02524 DataReaderListener_var the_listener 02525 = DataReaderListener::_narrow(this->listener_.in()); 02526 02527 if (!CORBA::is_nil(the_listener.in())) { 02528 SubscriptionLostStatus status; 02529 02530 // Since this callback may come after remove_association which removes 02531 // the writer from id_to_handle map, we can ignore this error. 02532 this->lookup_instance_handles(pubids, status.publication_handles); 02533 the_listener->on_subscription_lost(this, status); 02534 } 02535 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_reconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2470 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), OpenDDS::DCPS::SubscriptionLostStatus::publication_handles, and status.
02471 { 02472 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6); 02473 02474 if (!this->is_bit_) { 02475 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02476 // is given to this DataReader then narrow() fails. 02477 DataReaderListener_var the_listener 02478 = DataReaderListener::_narrow(this->listener_.in()); 02479 02480 if (!CORBA::is_nil(the_listener.in())) { 02481 SubscriptionLostStatus status; 02482 02483 // If it's reconnected then the reader should be in id_to_handle 02484 this->lookup_instance_handles(pubids, status.publication_handles); 02485 02486 the_listener->on_subscription_reconnected(this, status); 02487 } 02488 } 02489 }
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP | ( | DDS::InstanceHandle_t | , | |
SubscriptionInstance_rch | ||||
) |
Referenced by deliver_historic(), and resume_sample_processing().
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
RcHandle< WriterInfo > | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
publications writing to this reader.
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
WriterStats | , | |||
GUID_tKeyLessThan | ||||
) |
Type of collection of statistics for writers to this reader.
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET_CMP | ( | DDS::ReadCondition_var | , | |
RCCompLess | ||||
) | [private] |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | void * | ) |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | WriterStatePair | ) |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
bool OpenDDS::DCPS::DataReaderImpl::ownership_filter_instance | ( | const SubscriptionInstance_rch & | instance, | |
const PublicationId & | pubid | |||
) | [protected] |
Definition at line 2621 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_UNKNOWN, is_exclusive_ownership_, LM_DEBUG, OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, and writers_.
02623 { 02624 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02625 if (this->is_exclusive_ownership_) { 02626 02627 WriterMapType::iterator iter = writers_.find(pubid); 02628 02629 if (iter == writers_.end()) { 02630 if (DCPS_debug_level > 4) { 02631 // This may not be an error since it could happen that the sample 02632 // is delivered to the datareader after the write is dis-associated 02633 // with this datareader. 02634 GuidConverter reader_converter(subscription_id_); 02635 GuidConverter writer_converter(pubid); 02636 ACE_DEBUG((LM_DEBUG, 02637 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ") 02638 ACE_TEXT("reader %C is not associated with writer %C.\n"), 02639 OPENDDS_STRING(reader_converter).c_str(), 02640 OPENDDS_STRING(writer_converter).c_str())); 02641 } 02642 return true; 02643 } 02644 02645 02646 // Evaulate the owner of the instance if not selected and filter 02647 // current message if it's not from owner writer. 02648 if ( instance->instance_state_.get_owner () == GUID_UNKNOWN 02649 || ! iter->second->is_owner_evaluated (instance->instance_handle_)) { 02650 OwnershipManagerPtr owner_manager = this->ownership_manager(); 02651 02652 bool is_owner = owner_manager && owner_manager->select_owner ( 02653 instance->instance_handle_, 02654 iter->second->writer_id_, 02655 iter->second->writer_qos_.ownership_strength.value, 02656 &instance->instance_state_); 02657 iter->second->set_owner_evaluated (instance->instance_handle_, true); 02658 02659 if (! is_owner) { 02660 if (DCPS_debug_level >= 1) { 02661 GuidConverter reader_converter(subscription_id_); 02662 GuidConverter writer_converter(pubid); 02663 GuidConverter owner_converter (instance->instance_state_.get_owner ()); 02664 ACE_DEBUG((LM_DEBUG, 02665 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ") 02666 ACE_TEXT("reader %C writer %C is not elected as owner %C\n"), 02667 OPENDDS_STRING(reader_converter).c_str(), 02668 OPENDDS_STRING(writer_converter).c_str(), 02669 OPENDDS_STRING(owner_converter).c_str())); 02670 } 02671 return true; 02672 } 02673 } 02674 else if (! (instance->instance_state_.get_owner () == pubid)) { 02675 if (DCPS_debug_level >= 1) { 02676 GuidConverter reader_converter(subscription_id_); 02677 GuidConverter writer_converter(pubid); 02678 GuidConverter owner_converter (instance->instance_state_.get_owner ()); 02679 ACE_DEBUG((LM_DEBUG, 02680 ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ") 02681 ACE_TEXT("reader %C writer %C is not owner %C\n"), 02682 OPENDDS_STRING(reader_converter).c_str(), 02683 OPENDDS_STRING(writer_converter).c_str(), 02684 OPENDDS_STRING(owner_converter).c_str())); 02685 } 02686 return true; 02687 } 02688 } 02689 #else 02690 ACE_UNUSED_ARG(pubid); 02691 ACE_UNUSED_ARG(instance); 02692 #endif 02693 return false; 02694 }
OwnershipManagerPtr OpenDDS::DCPS::DataReaderImpl::ownership_manager | ( | ) | [inline] |
Definition at line 462 of file DataReaderImpl.h.
Referenced by data_received(), OpenDDS::DCPS::InstanceState::dispose_was_received(), ownership_filter_instance(), release_instance(), OpenDDS::DCPS::InstanceState::unregister_was_received(), writer_became_dead(), writer_removed(), ~DataReaderImpl(), and OpenDDS::DCPS::InstanceState::~InstanceState().
00462 { return OwnershipManagerPtr(this); }
RcHandle< EntityImpl > OpenDDS::DCPS::DataReaderImpl::parent | ( | void | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 1641 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and subscriber_servant_.
01642 { 01643 return this->subscriber_servant_.lock(); 01644 }
void OpenDDS::DCPS::DataReaderImpl::post_read_or_take | ( | ) | [protected] |
Definition at line 2793 of file DataReaderImpl.cpp.
References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().
Referenced by end_access().
02794 { 02795 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 02796 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 02797 if (subscriber) 02798 subscriber->set_status_changed_flag( 02799 DDS::DATA_ON_READERS_STATUS, false); 02800 }
void OpenDDS::DCPS::DataReaderImpl::prepare_to_delete | ( | ) | [protected] |
Definition at line 2405 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::send_final_acks(), OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().
02406 { 02407 this->set_deleted(true); 02408 this->stop_associating(); 02409 this->send_final_acks(); 02410 }
void OpenDDS::DCPS::DataReaderImpl::process_latency | ( | const ReceivedDataSample & | sample | ) |
NB: This message is generated contemporaneously with a similar message from writer_activity(). That message is not marked as an error, so we follow that lead and leave this as an informational message, guarded by debug level. This seems to be due to late samples (samples delivered after an association has been torn down). We may want to promote this to a warning if other conditions causing this symptom are discovered.
Definition at line 2271 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, ACE_OS::gettimeofday(), OpenDDS::DCPS::ReceivedDataSample::header_, DDS::DataReaderQos::latency_budget, LM_DEBUG, ACE_Time_Value::msec(), notify_latency(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, qos_, ACE_Time_Value::sec(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, statistics_, statistics_enabled(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::time_value_to_duration().
Referenced by data_received().
02272 { 02273 StatsMapType::iterator location 02274 = this->statistics_.find(sample.header_.publication_id_); 02275 02276 if (location != this->statistics_.end()) { 02277 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC }; 02278 02279 // Only when the user has specified a latency budget or statistics 02280 // are enabled we need to calculate our latency 02281 if ((this->statistics_enabled()) || 02282 (this->qos_.latency_budget.duration > zero)) { 02283 // This starts as the current time. 02284 ACE_Time_Value latency = ACE_OS::gettimeofday(); 02285 02286 // The time interval starts at the send end. 02287 DDS::Duration_t then = { 02288 sample.header_.source_timestamp_sec_, 02289 sample.header_.source_timestamp_nanosec_ 02290 }; 02291 02292 // latency delay in ACE_Time_Value format. 02293 latency -= duration_to_time_value(then); 02294 02295 if (this->statistics_enabled()) { 02296 location->second.add_stat(latency); 02297 } 02298 02299 if (DCPS_debug_level > 9) { 02300 ACE_DEBUG((LM_DEBUG, 02301 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ") 02302 ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"), 02303 latency.sec(), 02304 latency.msec())); 02305 } 02306 02307 if (this->qos_.latency_budget.duration > zero) { 02308 // Check latency against the budget. 02309 if (time_value_to_duration(latency) > this->qos_.latency_budget.duration) { 02310 this->notify_latency(sample.header_.publication_id_); 02311 } 02312 } 02313 } 02314 } else if (DCPS_debug_level > 0) { 02315 /// NB: This message is generated contemporaneously with a similar 02316 /// message from writer_activity(). That message is not marked 02317 /// as an error, so we follow that lead and leave this as an 02318 /// informational message, guarded by debug level. This seems 02319 /// to be due to late samples (samples delivered after an 02320 /// association has been torn down). We may want to promote this 02321 /// to a warning if other conditions causing this symptom are 02322 /// discovered. 02323 GuidConverter reader_converter(subscription_id_); 02324 GuidConverter writer_converter(sample.header_.publication_id_); 02325 ACE_DEBUG((LM_DEBUG, 02326 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ") 02327 ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"), 02328 OPENDDS_STRING(reader_converter).c_str(), 02329 OPENDDS_STRING(writer_converter).c_str())); 02330 } 02331 }
virtual void OpenDDS::DCPS::DataReaderImpl::purge_data | ( | SubscriptionInstance_rch | instance | ) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by release_instance().
void OpenDDS::DCPS::DataReaderImpl::qos_change | ( | const DDS::DataReaderQos & | qos | ) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Definition at line 896 of file DataReaderImpl.cpp.
References DDS::DataReaderQos::deadline, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), last_deadline_missed_total_count_, qos_, OpenDDS::DCPS::ref(), requested_deadline_missed_status_, OpenDDS::DCPS::RcHandle< T >::reset(), sample_lock_, and watchdog_.
Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change(), and set_qos().
00897 { 00898 // Reset the deadline timer if the period has changed. 00899 if (qos_.deadline.period.sec != qos.deadline.period.sec || 00900 qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00901 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC && 00902 qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00903 00904 this->watchdog_ = make_rch<RequestedDeadlineWatchdog>( 00905 ref(this->sample_lock_), 00906 qos.deadline, 00907 ref(*this), 00908 ref(this->requested_deadline_missed_status_), 00909 ref(this->last_deadline_missed_total_count_)); 00910 00911 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC && 00912 qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00913 watchdog_->cancel_all(); 00914 watchdog_.reset(); 00915 } else { 00916 watchdog_->reset_interval( 00917 duration_to_time_value(qos.deadline.period)); 00918 } 00919 } 00920 }
ACE_INLINE unsigned int & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size | ( | ) |
Configure the size of the raw data collection buffer.
Definition at line 19 of file DataReaderImpl.inl.
References raw_latency_buffer_size_.
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00020 { 00021 return this->raw_latency_buffer_size_; 00022 }
ACE_INLINE OpenDDS::DCPS::DataCollector< double >::OnFull & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type | ( | ) |
Configure the type of the raw data collection buffer.
Definition at line 26 of file DataReaderImpl.inl.
References raw_latency_buffer_type_.
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00027 { 00028 return this->raw_latency_buffer_type_; 00029 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const OpenDDS::DCPS::DataReaderImpl::StatsMapType & OpenDDS::DCPS::DataReaderImpl::raw_latency_statistics | ( | ) | const |
Expose the statistics container.
Definition at line 12 of file DataReaderImpl.inl.
References statistics_.
00013 { 00014 return this->statistics_; 00015 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_generic | ( | GenericBundle & | gen, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
bool | adjust_ref_count | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_next_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | previous_instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
void OpenDDS::DCPS::DataReaderImpl::register_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 3242 of file DataReaderImpl.cpp.
03247 { 03248 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener); 03249 }
void OpenDDS::DCPS::DataReaderImpl::reject_coherent | ( | PublicationId & | writer_id, | |
RepoId & | publisher_id | |||
) |
Definition at line 2960 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instances_, instances_lock_, LM_DEBUG, OPENDDS_STRING, reset_coherent_info(), sample_lock_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
Referenced by verify_coherent_changes_completion().
02962 { 02963 if (::OpenDDS::DCPS::DCPS_debug_level > 0) { 02964 GuidConverter reader (this->subscription_id_); 02965 GuidConverter writer (writer_id); 02966 GuidConverter publisher (publisher_id); 02967 ACE_DEBUG((LM_DEBUG, 02968 ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()") 02969 ACE_TEXT(" reader %C writer %C publisher %C \n"), 02970 OPENDDS_STRING(reader).c_str(), 02971 OPENDDS_STRING(writer).c_str(), 02972 OPENDDS_STRING(publisher).c_str())); 02973 } 02974 02975 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 02976 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02977 02978 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 02979 iter != this->instances_.end(); ++iter) { 02980 iter->second->rcvd_strategy_->reject_coherent( 02981 writer_id, publisher_id); 02982 } 02983 this->reset_coherent_info (writer_id, publisher_id); 02984 }
void OpenDDS::DCPS::DataReaderImpl::release_instance | ( | DDS::InstanceHandle_t | handle | ) |
Release the instance with the handle.
Definition at line 1968 of file DataReaderImpl.cpp.
References get_handle_instance(), instances_, instances_lock_, LM_ERROR, monitor_, ownership_manager(), purge_data(), release_instance_i(), OpenDDS::DCPS::Monitor::report(), and sample_lock_.
Referenced by OpenDDS::DCPS::InstanceState::release().
01969 { 01970 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01971 OwnershipManagerPtr owner_manager = this->ownership_manager(); 01972 if (owner_manager) { 01973 owner_manager->remove_writers (handle); 01974 } 01975 #endif 01976 01977 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 01978 SubscriptionInstance_rch instance = this->get_handle_instance(handle); 01979 01980 if (!instance) { 01981 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance " 01982 "could not find the instance by handle 0x%x\n", handle)); 01983 return; 01984 } 01985 01986 this->purge_data(instance); 01987 01988 { 01989 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_); 01990 instances_.erase(handle); 01991 } 01992 01993 this->release_instance_i(handle); 01994 if (this->monitor_) { 01995 this->monitor_->report(); 01996 } 01997 }
virtual void OpenDDS::DCPS::DataReaderImpl::release_instance_i | ( | DDS::InstanceHandle_t | handle | ) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by release_instance().
void OpenDDS::DCPS::DataReaderImpl::remove_all_associations | ( | ) |
Definition at line 647 of file DataReaderImpl.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, publication_handle_lock_, remove_associations(), size, OpenDDS::DCPS::TransportClient::stop_associating(), writers_, and writers_lock_.
00648 { 00649 DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6); 00650 // stop pending associations 00651 this->stop_associating(); 00652 00653 OpenDDS::DCPS::WriterIdSeq writers; 00654 int size; 00655 00656 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00657 00658 { 00659 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00660 00661 size = static_cast<int>(writers_.size()); 00662 writers.length(size); 00663 00664 WriterMapType::iterator curr_writer = writers_.begin(); 00665 WriterMapType::iterator end_writer = writers_.end(); 00666 00667 int i = 0; 00668 00669 while (curr_writer != end_writer) { 00670 writers[i++] = curr_writer->first; 00671 ++curr_writer; 00672 } 00673 } 00674 00675 try { 00676 CORBA::Boolean dont_notify_lost = 0; 00677 00678 if (0 < size) { 00679 remove_associations(writers, dont_notify_lost); 00680 } 00681 00682 } catch (const CORBA::Exception&) { 00683 ACE_DEBUG((LM_WARNING, 00684 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00685 ACE_TEXT("caught exception from remove_associations.\n"))); 00686 } 00687 }
void OpenDDS::DCPS::DataReaderImpl::remove_associations | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 447 of file DataReaderImpl.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::entity_deleted_, is_bit_, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, remove_associations_i(), OpenDDS::DCPS::TransportClient::stop_associating(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), writers_, and writers_lock_.
Referenced by remove_all_associations().
00449 { 00450 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6); 00451 00452 if (writers.length() == 0) { 00453 return; 00454 } 00455 00456 if (DCPS_debug_level >= 1) { 00457 GuidConverter reader_converter(subscription_id_); 00458 GuidConverter writer_converter(writers[0]); 00459 ACE_DEBUG((LM_DEBUG, 00460 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ") 00461 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00462 is_bit_, 00463 OPENDDS_STRING(reader_converter).c_str(), 00464 OPENDDS_STRING(writer_converter).c_str(), 00465 writers.length())); 00466 } 00467 if (!this->entity_deleted_.value()) { 00468 // stop pending associations for these writer ids 00469 this->stop_associating(writers.get_buffer(), writers.length()); 00470 00471 // writers which are considered non-active and can 00472 // be removed immediately 00473 WriterIdSeq non_active_writers; 00474 { 00475 CORBA::ULong wr_len = writers.length(); 00476 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00477 00478 for (CORBA::ULong i = 0; i < wr_len; i++) { 00479 PublicationId writer_id = writers[i]; 00480 00481 WriterMapType::iterator it = this->writers_.find(writer_id); 00482 if (it != this->writers_.end() && 00483 it->second->active()) { 00484 remove_association_sweeper_->schedule_timer(it->second, notify_lost); 00485 } else { 00486 push_back(non_active_writers, writer_id); 00487 } 00488 } 00489 } 00490 remove_associations_i(non_active_writers, notify_lost); 00491 } else { 00492 remove_associations_i(writers, notify_lost); 00493 } 00494 }
void OpenDDS::DCPS::DataReaderImpl::remove_associations_i | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [protected, virtual] |
Section 7.1.4.1: total_count will not decrement.
: Reconcile this with the verbiage in section 7.1.4.1
Definition at line 513 of file DataReaderImpl.cpp.
References ACE_TEXT(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), end_historic_sweeper_, id_to_handle_map_, is_bit_, CORBA::is_nil(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), LM_DEBUG, lookup_instance_handles(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.
Referenced by remove_associations(), and remove_publication().
00515 { 00516 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6); 00517 00518 if (writers.length() == 0) { 00519 return; 00520 } 00521 00522 if (DCPS_debug_level >= 1) { 00523 GuidConverter reader_converter(subscription_id_); 00524 GuidConverter writer_converter(writers[0]); 00525 ACE_DEBUG((LM_DEBUG, 00526 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ") 00527 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00528 is_bit_, 00529 OPENDDS_STRING(reader_converter).c_str(), 00530 OPENDDS_STRING(writer_converter).c_str(), 00531 writers.length())); 00532 } 00533 DDS::InstanceHandleSeq handles; 00534 00535 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00536 00537 // This is used to hold the list of writers which were actually 00538 // removed, which is a proper subset of the writers which were 00539 // requested to be removed. 00540 WriterIdSeq updated_writers; 00541 00542 CORBA::ULong wr_len; 00543 00544 //Remove the writers from writer list. If the supplied writer 00545 //is not in the cached writers list then it is already removed. 00546 //We just need remove the writers in the list that have not been 00547 //removed. 00548 { 00549 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00550 00551 wr_len = writers.length(); 00552 00553 for (CORBA::ULong i = 0; i < wr_len; i++) { 00554 PublicationId writer_id = writers[i]; 00555 00556 WriterMapType::iterator it = this->writers_.find(writer_id); 00557 00558 if (it != this->writers_.end()) { 00559 it->second->removed(); 00560 end_historic_sweeper_->cancel_timer(it->second); 00561 remove_association_sweeper_->cancel_timer(it->second); 00562 } 00563 00564 if (this->writers_.erase(writer_id) == 0) { 00565 if (DCPS_debug_level >= 1) { 00566 GuidConverter converter(writer_id); 00567 ACE_DEBUG((LM_DEBUG, 00568 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ") 00569 ACE_TEXT("the writer local %C was already removed.\n"), 00570 OPENDDS_STRING(converter).c_str())); 00571 } 00572 00573 } else { 00574 push_back(updated_writers, writer_id); 00575 } 00576 } 00577 } 00578 00579 wr_len = updated_writers.length(); 00580 00581 // Return now if the supplied writers have been removed already. 00582 if (wr_len == 0) { 00583 return; 00584 } 00585 00586 if (!is_bit_) { 00587 // The writer should be in the id_to_handle map at this time. 00588 this->lookup_instance_handles(updated_writers, handles); 00589 00590 for (CORBA::ULong i = 0; i < wr_len; ++i) { 00591 id_to_handle_map_.erase(updated_writers[i]); 00592 } 00593 } 00594 00595 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) { 00596 { 00597 this->disassociate(updated_writers[i]); 00598 } 00599 } 00600 00601 // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing. 00602 if (!this->is_bit_) { 00603 // Derive the change in the number of publications writing to this reader. 00604 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00605 this->subscription_match_status_.current_count_change 00606 = matchedPublications - this->subscription_match_status_.current_count; 00607 00608 // Only process status if the number of publications has changed. 00609 if (this->subscription_match_status_.current_count_change != 0) { 00610 this->subscription_match_status_.current_count = matchedPublications; 00611 00612 /// Section 7.1.4.1: total_count will not decrement. 00613 00614 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00615 this->subscription_match_status_.last_publication_handle 00616 = handles[ wr_len - 1]; 00617 00618 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00619 00620 DDS::DataReaderListener_var listener 00621 = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00622 00623 if (!CORBA::is_nil(listener.in())) { 00624 listener->on_subscription_matched(this, this->subscription_match_status_); 00625 00626 // Client will look at it so next time it looks the change should be 0 00627 this->subscription_match_status_.total_count_change = 0; 00628 this->subscription_match_status_.current_count_change = 0; 00629 } 00630 notify_status_condition(); 00631 } 00632 } 00633 00634 // If this remove_association is invoked when the InfoRepo 00635 // detects a lost writer then make a callback to notify 00636 // subscription lost. 00637 if (notify_lost) { 00638 this->notify_subscription_lost(handles); 00639 } 00640 00641 if (this->monitor_) { 00642 this->monitor_->report(); 00643 } 00644 }
void OpenDDS::DCPS::DataReaderImpl::remove_publication | ( | const PublicationId & | pub_id | ) | [protected] |
Definition at line 497 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::push_back(), remove_associations_i(), writers_, and writers_lock_.
00498 { 00499 WriterIdSeq writers; 00500 bool notify = false; 00501 { 00502 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00503 WriterMapType::iterator where = writers_.find(pub_id); 00504 if (writers_.end() != where) { 00505 notify = where->second->notify_lost_; 00506 push_back(writers, pub_id); 00507 } 00508 } 00509 remove_associations_i(writers, notify); 00510 }
void OpenDDS::DCPS::DataReaderImpl::reschedule_deadline | ( | ) |
Definition at line 2802 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, instances_lock_, LM_ERROR, and watchdog_.
02803 { 02804 if (this->watchdog_.in()) { 02805 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02806 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 02807 iter != this->instances_.end(); 02808 ++iter) { 02809 if (iter->second->deadline_timer_id_ != -1) { 02810 if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) { 02811 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"), 02812 ACE_TEXT("reset_timer_interval"))); 02813 } 02814 } 02815 } 02816 } 02817 }
void OpenDDS::DCPS::DataReaderImpl::reset_coherent_info | ( | const PublicationId & | writer_id, | |
const RepoId & | publisher_id | |||
) |
Definition at line 2987 of file DataReaderImpl.cpp.
References writers_, and writers_lock_.
Referenced by reject_coherent().
02989 { 02990 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 02991 02992 WriterMapType::iterator itEnd = this->writers_.end(); 02993 for (WriterMapType::iterator it = this->writers_.begin(); 02994 it != itEnd; ++it) { 02995 if (it->second->writer_id_ == writer_id 02996 && it->second->publisher_id_ == publisher_id) { 02997 it->second->reset_coherent_info(); 02998 } 02999 } 03000 }
void OpenDDS::DCPS::DataReaderImpl::reset_latency_stats | ( | ) | [virtual] |
Clear any intermediate statistical values.
Implements OpenDDS::DCPS::DataReaderEx.
Definition at line 2382 of file DataReaderImpl.cpp.
References statistics_.
02383 { 02384 for (StatsMapType::iterator current = this->statistics_.begin(); 02385 current != this->statistics_.end(); 02386 ++current) { 02387 current->second.reset_stats(); 02388 } 02389 }
void OpenDDS::DCPS::DataReaderImpl::reset_ownership | ( | DDS::InstanceHandle_t | instance | ) |
void OpenDDS::DCPS::DataReaderImpl::resume_sample_processing | ( | const PublicationId & | pub_id | ) | [private] |
when done handling historic samples, resume
Definition at line 3168 of file DataReaderImpl.cpp.
References deliver_historic(), end_historic_sweeper_, OpenDDS::DCPS::WriterInfo::last_historic_seq_, OPENDDS_MAP(), OpenDDS::DCPS::WriterInfo::waiting_for_end_historic_samples_, writers_, and writers_lock_.
Referenced by add_link(), and data_received().
03169 { 03170 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver; 03171 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 03172 WriterMapType::iterator where = writers_.find(pub_id); 03173 if (writers_.end() != where) { 03174 WriterInfo& info = *where->second; 03175 // Stop filtering these 03176 if (info.waiting_for_end_historic_samples_) { 03177 end_historic_sweeper_->cancel_timer(where->second); 03178 if (!info.historic_samples_.empty()) { 03179 info.last_historic_seq_ = info.historic_samples_.rbegin()->first; 03180 } 03181 to_deliver.swap(info.historic_samples_); 03182 write_guard.release(); 03183 deliver_historic(to_deliver); 03184 } 03185 } 03186 }
void OpenDDS::DCPS::DataReaderImpl::sample_info | ( | DDS::SampleInfo & | sample_info, | |
const ReceivedDataElement * | ptr | |||
) | [protected] |
Definition at line 1793 of file DataReaderImpl.cpp.
References DDS::SampleInfo::absolute_generation_rank, DDS::SampleInfo::disposed_generation_count, OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::SampleInfo::no_writers_generation_count, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::sample_rank, and OpenDDS::DCPS::ReceivedDataElement::sequence_.
01795 { 01796 01797 sample_info.sample_rank = 0; 01798 01799 // generation_rank = 01800 // (MRSIC.disposed_generation_count + 01801 // MRSIC.no_writers_generation_count) 01802 // - (S.disposed_generation_count + 01803 // S.no_writers_generation_count) 01804 // 01805 sample_info.generation_rank = 01806 (sample_info.disposed_generation_count + 01807 sample_info.no_writers_generation_count) - 01808 sample_info.generation_rank; 01809 01810 // absolute_generation_rank = 01811 // (MRS.disposed_generation_count + 01812 // MRS.no_writers_generation_count) 01813 // - (S.disposed_generation_count + 01814 // S.no_writers_generation_count) 01815 // 01816 sample_info.absolute_generation_rank = 01817 (static_cast<CORBA::Long>(ptr->disposed_generation_count_) + 01818 static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) - 01819 sample_info.absolute_generation_rank; 01820 01821 sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue(); 01822 }
virtual void OpenDDS::DCPS::DataReaderImpl::set_instance_state | ( | DDS::InstanceHandle_t | instance, | |
DDS::InstanceStateKind | state | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_listener | ( | DDS::DataReaderListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 930 of file DataReaderImpl.cpp.
References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.
Referenced by cleanup().
00933 { 00934 listener_mask_ = mask; 00935 //note: OK to duplicate a nil object ref 00936 listener_ = DDS::DataReaderListener::_duplicate(a_listener); 00937 return DDS::RETCODE_OK; 00938 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_qos | ( | const DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 846 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, dp_id_, OpenDDS::DCPS::EntityImpl::enabled_, get_subscriber_servant(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, qos_, qos_change(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00848 { 00849 00850 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00851 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00852 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00853 00854 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00855 if (qos_ == qos) 00856 return DDS::RETCODE_OK; 00857 00858 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00859 return DDS::RETCODE_IMMUTABLE_POLICY; 00860 00861 } else { 00862 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00863 DDS::SubscriberQos subscriberQos; 00864 00865 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 00866 bool status = false; 00867 if (subscriber) { 00868 subscriber->get_qos(subscriberQos); 00869 status = 00870 disco->update_subscription_qos( 00871 domain_id_, 00872 dp_id_, 00873 this->subscription_id_, 00874 qos, 00875 subscriberQos); 00876 } 00877 if (!status) { 00878 ACE_ERROR_RETURN((LM_ERROR, 00879 ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ") 00880 ACE_TEXT("qos not updated. \n")), 00881 DDS::RETCODE_ERROR); 00882 } 00883 } 00884 00885 00886 qos_change(qos); 00887 qos_ = qos; 00888 00889 return DDS::RETCODE_OK; 00890 00891 } else { 00892 return DDS::RETCODE_INCONSISTENT_POLICY; 00893 } 00894 }
void OpenDDS::DCPS::DataReaderImpl::set_sample_lost_status | ( | const DDS::SampleLostStatus & | status | ) | [protected] |
!!caller should have acquired sample_lock_
Definition at line 2248 of file DataReaderImpl.cpp.
References sample_lost_status_.
02250 { 02251 //!!!caller should have acquired sample_lock_ 02252 sample_lost_status_ = status; 02253 }
void OpenDDS::DCPS::DataReaderImpl::set_sample_rejected_status | ( | const DDS::SampleRejectedStatus & | status | ) | [protected] |
!!caller should have acquired sample_lock_
Definition at line 2256 of file DataReaderImpl.cpp.
References sample_rejected_status_.
02258 { 02259 //!!!caller should have acquired sample_lock_ 02260 sample_rejected_status_ = status; 02261 }
void OpenDDS::DCPS::DataReaderImpl::set_subscriber_qos | ( | const DDS::SubscriberQos & | qos | ) |
Definition at line 3122 of file DataReaderImpl.cpp.
References subqos_.
03124 { 03125 this->subqos_ = qos; 03126 }
void OpenDDS::DCPS::DataReaderImpl::signal_liveliness | ( | const RepoId & | remote_participant | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 737 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::GUID_t::entityId, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, instances_, instances_lock_, OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.
00738 { 00739 RepoId prefix = remote_participant; 00740 prefix.entityId = EntityId_t(); 00741 00742 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00743 00744 typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair; 00745 typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet; 00746 WriterSet writers; 00747 00748 { 00749 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00750 for (WriterMapType::iterator pos = writers_.lower_bound(prefix), 00751 limit = writers_.end(); 00752 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix); 00753 ++pos) { 00754 writers.push_back(std::make_pair(pos->first, pos->second)); 00755 } 00756 } 00757 00758 ACE_Time_Value when = ACE_OS::gettimeofday(); 00759 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00760 pos != limit; 00761 ++pos) { 00762 pos->second->received_activity(when); 00763 } 00764 00765 if (!writers.empty()) { 00766 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 00767 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00768 pos != limit; 00769 ++pos) { 00770 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 00771 iter != instances_.end(); 00772 ++iter) { 00773 SubscriptionInstance_rch ptr = iter->second; 00774 ptr->instance_state_.lively(pos->first); 00775 } 00776 } 00777 } 00778 }
void OpenDDS::DCPS::DataReaderImpl::statistics_enabled | ( | CORBA::Boolean | statistics_enabled | ) | [virtual] |
Definition at line 2398 of file DataReaderImpl.cpp.
References statistics_enabled_.
02400 { 02401 this->statistics_enabled_ = statistics_enabled; 02402 }
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl::statistics_enabled | ( | ) | [virtual] |
Definition at line 2392 of file DataReaderImpl.cpp.
References statistics_enabled_.
Referenced by process_latency().
02393 { 02394 return this->statistics_enabled_; 02395 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::take | ( | AbstractSamples & | samples, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
bool OpenDDS::DCPS::DataReaderImpl::time_based_filter_instance | ( | const SubscriptionInstance_rch & | instance, | |
ACE_Time_Value & | filter_time_expired | |||
) | [protected] |
Definition at line 2697 of file DataReaderImpl.cpp.
References DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, ACE_OS::gettimeofday(), qos_, DDS::DataReaderQos::time_based_filter, and OpenDDS::DCPS::time_value_to_duration().
02698 { 02699 ACE_Time_Value now(ACE_OS::gettimeofday()); 02700 02701 // TIME_BASED_FILTER processing; expire data samples 02702 // if minimum separation is not met for instance. 02703 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC }; 02704 02705 if (qos_.time_based_filter.minimum_separation > zero) { 02706 filter_time_expired = now - instance->last_accepted_; 02707 DDS::Duration_t separation = time_value_to_duration(filter_time_expired); 02708 02709 if (separation < qos_.time_based_filter.minimum_separation) { 02710 return true; // Data filtered. 02711 } 02712 } 02713 02714 instance->last_accepted_ = now; 02715 02716 return false; 02717 }
CORBA::Long OpenDDS::DCPS::DataReaderImpl::total_samples | ( | ) | const [protected] |
!!caller should have acquired sample_lock_
Definition at line 1824 of file DataReaderImpl.cpp.
References instances_, and instances_lock_.
01825 { 01826 //!!!caller should have acquired sample_lock_ 01827 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0); 01828 01829 CORBA::Long count(0); 01830 01831 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01832 iter != instances_.end(); 01833 ++iter) { 01834 SubscriptionInstance_rch ptr = iter->second; 01835 01836 count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_); 01837 } 01838 01839 return count; 01840 }
void OpenDDS::DCPS::DataReaderImpl::transport_assoc_done | ( | int | flags, | |
const RepoId & | remote_id | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 324 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, OpenDDS::DCPS::TransportClient::ASSOC_OK, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dp_id_, id_to_handle_map_, is_bit_, CORBA::is_nil(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, liveliness_timer_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::Monitor::report(), sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, writers_, writers_lock_, and ACE_Time_Value::zero.
00325 { 00326 if (!(flags & ASSOC_OK)) { 00327 if (DCPS_debug_level) { 00328 const GuidConverter conv(remote_id); 00329 ACE_ERROR((LM_ERROR, 00330 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00331 ACE_TEXT("ERROR: transport layer failed to associate %C\n"), 00332 OPENDDS_STRING(conv).c_str())); 00333 } 00334 return; 00335 } 00336 00337 const bool active = flags & ASSOC_ACTIVE; 00338 { 00339 00340 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00341 00342 // LIVELINESS policy timers are managed here. 00343 if (liveliness_lease_duration_ != ACE_Time_Value::zero) { 00344 if (DCPS_debug_level >= 5) { 00345 GuidConverter converter(subscription_id_); 00346 ACE_DEBUG((LM_DEBUG, 00347 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00348 ACE_TEXT("starting/resetting liveliness timer for reader %C\n"), 00349 OPENDDS_STRING(converter).c_str())); 00350 } 00351 // this call will start the timer if it is not already set 00352 liveliness_timer_->check_liveliness(); 00353 } 00354 } 00355 // We no longer hold the publication_handle_lock_. 00356 00357 if (!is_bit_) { 00358 00359 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 00360 00361 if (!participant) 00362 return; 00363 00364 DDS::InstanceHandle_t handle = participant->id_to_handle(remote_id); 00365 00366 // We acquire the publication_handle_lock_ for the remainder of our 00367 // processing. 00368 { 00369 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00370 00371 // This insertion is idempotent. 00372 id_to_handle_map_.insert( 00373 RepoIdToHandleMap::value_type(remote_id, handle)); 00374 00375 if (DCPS_debug_level > 4) { 00376 GuidConverter converter(remote_id); 00377 ACE_DEBUG((LM_DEBUG, 00378 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00379 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"), 00380 OPENDDS_STRING(converter).c_str(), 00381 handle)); 00382 } 00383 00384 // We need to adjust these after the insertions have all completed 00385 // since insertions are not guaranteed to increase the number of 00386 // currently matched publications. 00387 const int matchedPublications = static_cast<int>(id_to_handle_map_.size()); 00388 subscription_match_status_.current_count_change = 00389 matchedPublications - subscription_match_status_.current_count; 00390 subscription_match_status_.current_count = matchedPublications; 00391 00392 ++subscription_match_status_.total_count; 00393 ++subscription_match_status_.total_count_change; 00394 00395 subscription_match_status_.last_publication_handle = handle; 00396 00397 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00398 00399 DDS::DataReaderListener_var listener = 00400 listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00401 00402 if (!CORBA::is_nil(listener)) { 00403 listener->on_subscription_matched(this, subscription_match_status_); 00404 00405 // TBD - why does the spec say to change this but not change 00406 // the ChangeFlagStatus after a listener call? 00407 00408 // Client will look at it so next time it looks the change should be 0 00409 subscription_match_status_.total_count_change = 0; 00410 subscription_match_status_.current_count_change = 0; 00411 } 00412 00413 notify_status_condition(); 00414 } 00415 00416 { 00417 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 00418 ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00419 00420 if(!writers_.count(remote_id)){ 00421 return; 00422 } 00423 writers_[remote_id]->handle_ = handle; 00424 } 00425 } 00426 00427 if (!active) { 00428 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00429 00430 disco->association_complete(domain_id_, dp_id_, 00431 subscription_id_, remote_id); 00432 } 00433 00434 if (monitor_) { 00435 monitor_->report(); 00436 } 00437 }
void OpenDDS::DCPS::DataReaderImpl::unregister_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 3252 of file DataReaderImpl.cpp.
03255 { 03256 TransportClient::unregister_for_writer(participant, readerid, writerid); 03257 }
void OpenDDS::DCPS::DataReaderImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 690 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, publication_handle_lock_, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00691 { 00692 DDS::DataReaderListener_var listener = 00693 listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS); 00694 00695 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00696 guard, 00697 this->publication_handle_lock_); 00698 00699 00700 if (this->requested_incompatible_qos_status_.total_count == status.total_count) { 00701 // This test should make the method idempotent. 00702 return; 00703 } 00704 00705 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 00706 true); 00707 00708 // copy status and increment change 00709 requested_incompatible_qos_status_.total_count = status.total_count; 00710 requested_incompatible_qos_status_.total_count_change += 00711 status.count_since_last_send; 00712 requested_incompatible_qos_status_.last_policy_id = 00713 status.last_policy_id; 00714 requested_incompatible_qos_status_.policies = status.policies; 00715 00716 if (!CORBA::is_nil(listener.in())) { 00717 listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_); 00718 00719 // TBD - why does the spec say to change total_count_change but not 00720 // change the ChangeFlagStatus after a listener call? 00721 00722 // client just looked at it so next time it looks the 00723 // change should be 0 00724 requested_incompatible_qos_status_.total_count_change = 0; 00725 } 00726 00727 notify_status_condition(); 00728 }
void OpenDDS::DCPS::DataReaderImpl::update_ownership_strength | ( | const PublicationId & | pub_id, | |
const CORBA::Long & | ownership_strength | |||
) |
Definition at line 2865 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, writers_, and writers_lock_.
02867 { 02868 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 02869 read_guard, 02870 this->writers_lock_); 02871 for (WriterMapType::iterator iter = writers_.begin(); 02872 iter != writers_.end(); 02873 ++iter) { 02874 if (iter->second->writer_id_ == pub_id) { 02875 if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) { 02876 if (DCPS_debug_level >= 1) { 02877 GuidConverter reader_converter(this->subscription_id_); 02878 GuidConverter writer_converter(pub_id); 02879 ACE_DEBUG((LM_DEBUG, 02880 ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ") 02881 ACE_TEXT("local %C update remote %C strength from %d to %d \n"), 02882 OPENDDS_STRING(reader_converter).c_str(), 02883 OPENDDS_STRING(writer_converter).c_str(), 02884 iter->second->writer_qos_.ownership_strength, ownership_strength)); 02885 } 02886 iter->second->writer_qos_.ownership_strength.value = ownership_strength; 02887 iter->second->clear_owner_evaluated (); 02888 } 02889 break; 02890 } 02891 } 02892 }
void OpenDDS::DCPS::DataReaderImpl::update_subscription_params | ( | const DDS::StringSeq & | params | ) | const |
Definition at line 3146 of file DataReaderImpl.cpp.
References domain_id_, dp_id_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and TheServiceParticipant.
03147 { 03148 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 03149 disco->update_subscription_params(domain_id_, 03150 dp_id_, 03151 subscription_id_, 03152 params); 03153 }
bool OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion | ( | WriterInfo * | writer | ) | [private] |
Definition at line 2896 of file DataReaderImpl.cpp.
References accept_coherent(), OpenDDS::DCPS::WriterInfo::coherent_change_received(), coherent_changes_completed(), OpenDDS::DCPS::COMPLETED, get_subscriber_servant(), OpenDDS::DCPS::WriterInfo::group_coherent_, DDS::INSTANCE_PRESENTATION_QOS, OpenDDS::DCPS::NOT_COMPLETED_YET, DDS::SubscriberQos::presentation, OpenDDS::DCPS::WriterInfo::publisher_id_, reject_coherent(), OpenDDS::DCPS::REJECTED, OpenDDS::DCPS::WriterInfo::reset_coherent_info(), state, subqos_, and OpenDDS::DCPS::WriterInfo::writer_id_.
Referenced by accept_sample_processing(), and data_received().
02897 { 02898 if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS 02899 || ! this->subqos_.presentation.coherent_access) { 02900 this->accept_coherent (writer->writer_id_, writer->publisher_id_); 02901 this->coherent_changes_completed (this); 02902 return true; 02903 } 02904 02905 // verify current coherent changes from single writer 02906 Coherent_State state = writer->coherent_change_received(); 02907 if (writer->group_coherent_) { // GROUP coherent 02908 RcHandle<SubscriberImpl> subscriber = get_subscriber_servant(); 02909 if (subscriber && state != NOT_COMPLETED_YET) { 02910 // verify if all readers received complete coherent changes in a group. 02911 subscriber->coherent_change_received ( 02912 writer->publisher_id_, this, state); 02913 } 02914 } 02915 else { // TOPIC coherent 02916 if (state == COMPLETED) { 02917 this->accept_coherent (writer->writer_id_, writer->publisher_id_); 02918 } 02919 else if (state == REJECTED) { 02920 this->reject_coherent (writer->writer_id_, writer->publisher_id_); 02921 } 02922 else {// NOT_COMPLETED 02923 return false; 02924 } 02925 02926 // decision made: either COMPLETED or REJECTED 02927 writer->reset_coherent_info (); 02928 } 02929 02930 return state == COMPLETED; 02931 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::wait_for_historical_data | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
Definition at line 1058 of file DataReaderImpl.cpp.
void OpenDDS::DCPS::DataReaderImpl::writer_activity | ( | const DataSampleHeader & | header | ) |
update liveliness info for this writer.
Definition at line 1294 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, ACE_OS::gettimeofday(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::UNREGISTER_INSTANCE, writers_, and writers_lock_.
Referenced by data_received().
01295 { 01296 // caller should have the sample_lock_ !!! 01297 01298 RcHandle<WriterInfo> writer; 01299 01300 // The received_activity() has to be called outside the writers_lock_ 01301 // because it probably acquire writers_lock_ read lock recursively 01302 // (in handle_timeout). This could cause deadlock when there are writers 01303 // waiting. 01304 { 01305 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 01306 WriterMapType::iterator iter = writers_.find(header.publication_id_); 01307 01308 if (iter != writers_.end()) { 01309 writer = iter->second; 01310 01311 } else if (DCPS_debug_level > 4) { 01312 // This may not be an error since it could happen that the sample 01313 // is delivered to the datareader after the write is dis-associated 01314 // with this datareader. 01315 GuidConverter reader_converter(subscription_id_); 01316 GuidConverter writer_converter(header.publication_id_); 01317 ACE_DEBUG((LM_DEBUG, 01318 ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ") 01319 ACE_TEXT("reader %C is not associated with writer %C.\n"), 01320 OPENDDS_STRING(reader_converter).c_str(), 01321 OPENDDS_STRING(writer_converter).c_str())); 01322 } 01323 } 01324 01325 if (!writer.is_nil()) { 01326 ACE_Time_Value when = ACE_OS::gettimeofday(); 01327 writer->received_activity(when); 01328 01329 if ((header.message_id_ == SAMPLE_DATA) || 01330 (header.message_id_ == INSTANCE_REGISTRATION) || 01331 (header.message_id_ == UNREGISTER_INSTANCE) || 01332 (header.message_id_ == DISPOSE_INSTANCE) || 01333 (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) { 01334 01335 const SequenceNumber defaultSN; 01336 SequenceRange resetRange(defaultSN, header.sequence_); 01337 01338 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01339 if (header.coherent_change_) { 01340 if (writer->coherent_samples_ == 0) { 01341 writer->coherent_sample_sequence_.reset(); 01342 writer->coherent_sample_sequence_.insert(resetRange); 01343 } 01344 else { 01345 writer->coherent_sample_sequence_.insert(header.sequence_); 01346 } 01347 } 01348 #endif 01349 } 01350 } 01351 }
void OpenDDS::DCPS::DataReaderImpl::writer_became_alive | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [virtual] |
tell instances when a DataWriter transitions to being alive The writer state is inout parameter, it has to be set ALIVE before handle_timeout is called since some subroutine use the state.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2086 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, liveliness_timer_, LM_DEBUG, LM_ERROR, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, OpenDDS::DCPS::Monitor::report(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02088 { 02089 if (DCPS_debug_level >= 5) { 02090 GuidConverter reader_converter(subscription_id_); 02091 GuidConverter writer_converter(info.writer_id_); 02092 ACE_DEBUG((LM_DEBUG, 02093 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ") 02094 ACE_TEXT("reader %C from writer %C previous state %C.\n"), 02095 OPENDDS_STRING(reader_converter).c_str(), 02096 OPENDDS_STRING(writer_converter).c_str(), 02097 info.get_state_str().c_str())); 02098 } 02099 02100 // caller should already have the samples_lock_ !!! 02101 02102 // NOTE: each instance will change to ALIVE_STATE when they receive a sample 02103 02104 bool liveliness_changed = false; 02105 02106 if (info.state_ != WriterInfo::ALIVE) { 02107 liveliness_changed_status_.alive_count++; 02108 liveliness_changed_status_.alive_count_change++; 02109 liveliness_changed = true; 02110 } 02111 02112 if (info.state_ == WriterInfo::DEAD) { 02113 liveliness_changed_status_.not_alive_count--; 02114 liveliness_changed_status_.not_alive_count_change--; 02115 liveliness_changed = true; 02116 } 02117 02118 liveliness_changed_status_.last_publication_handle = info.handle_; 02119 02120 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02121 02122 if (liveliness_changed_status_.alive_count < 0) { 02123 ACE_ERROR((LM_ERROR, 02124 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ") 02125 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"), 02126 liveliness_changed_status_.alive_count)); 02127 return; 02128 } 02129 02130 if (liveliness_changed_status_.not_alive_count < 0) { 02131 ACE_ERROR((LM_ERROR, 02132 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ") 02133 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"), 02134 liveliness_changed_status_.not_alive_count)); 02135 return; 02136 } 02137 02138 // Change the state to ALIVE since handle_timeout may call writer_became_dead 02139 // which need the current state info. 02140 info.state_ = WriterInfo::ALIVE; 02141 02142 if (this->monitor_) { 02143 this->monitor_->report(); 02144 } 02145 02146 // Call listener only when there are liveliness status changes. 02147 if (liveliness_changed) { 02148 // Avoid possible deadlock by releasing sample_lock_. 02149 // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock() 02150 // for information about the locks involved. 02151 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 02152 this->notify_liveliness_change(); 02153 } 02154 02155 // this call will start the liveliness timer if it is not already set 02156 liveliness_timer_->check_liveliness(); 02157 }
void OpenDDS::DCPS::DataReaderImpl::writer_became_dead | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [virtual] |
tell instances when a DataWriter transitions to DEAD The writer state is inout parameter, the state is set to DEAD when it returns.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2160 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, LM_ERROR, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::WriterInfo::NOT_SET, notify_liveliness_change(), OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02162 { 02163 if (DCPS_debug_level >= 5) { 02164 GuidConverter reader_converter(subscription_id_); 02165 GuidConverter writer_converter(info.writer_id_); 02166 ACE_DEBUG((LM_DEBUG, 02167 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ") 02168 ACE_TEXT("reader %C from writer %C previous state %C.\n"), 02169 02170 OPENDDS_STRING(reader_converter).c_str(), 02171 OPENDDS_STRING(writer_converter).c_str(), 02172 info.get_state_str().c_str())); 02173 } 02174 02175 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02176 OwnershipManagerPtr owner_manager = this->ownership_manager(); 02177 if (owner_manager) { 02178 owner_manager->remove_writer (info.writer_id_); 02179 info.clear_owner_evaluated (); 02180 } 02181 #endif 02182 02183 // caller should already have the samples_lock_ !!! 02184 bool liveliness_changed = false; 02185 02186 if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) { 02187 liveliness_changed_status_.not_alive_count++; 02188 liveliness_changed_status_.not_alive_count_change++; 02189 liveliness_changed = true; 02190 } 02191 02192 if (info.state_ == WriterInfo::ALIVE) { 02193 liveliness_changed_status_.alive_count--; 02194 liveliness_changed_status_.alive_count_change--; 02195 liveliness_changed_status_.not_alive_count++; 02196 liveliness_changed_status_.not_alive_count_change++; 02197 liveliness_changed = true; 02198 } 02199 02200 liveliness_changed_status_.last_publication_handle = info.handle_; 02201 02202 //update the state to DEAD. 02203 info.state_ = WriterInfo::DEAD; 02204 02205 if (this->monitor_) { 02206 this->monitor_->report(); 02207 } 02208 02209 if (liveliness_changed_status_.alive_count < 0) { 02210 ACE_ERROR((LM_ERROR, 02211 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ") 02212 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"), 02213 liveliness_changed_status_.alive_count)); 02214 return; 02215 } 02216 02217 if (liveliness_changed_status_.not_alive_count < 0) { 02218 ACE_ERROR((LM_ERROR, 02219 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ") 02220 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"), 02221 liveliness_changed_status_.not_alive_count)); 02222 return; 02223 } 02224 02225 instances_liveliness_update(info, when); 02226 02227 // Call listener only when there are liveliness status changes. 02228 if (liveliness_changed) { 02229 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02230 this->notify_liveliness_change(); 02231 } 02232 }
void OpenDDS::DCPS::DataReaderImpl::writer_removed | ( | WriterInfo & | info | ) | [virtual] |
tell instance when a DataWriter is removed. The liveliness status need update.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2042 of file DataReaderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, ACE_OS::gettimeofday(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, LM_DEBUG, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, ownership_manager(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02043 { 02044 if (DCPS_debug_level >= 5) { 02045 GuidConverter reader_converter(subscription_id_); 02046 GuidConverter writer_converter(info.writer_id_); 02047 ACE_DEBUG((LM_DEBUG, 02048 ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ") 02049 ACE_TEXT("reader %C from writer %C.\n"), 02050 OPENDDS_STRING(reader_converter).c_str(), 02051 OPENDDS_STRING(writer_converter).c_str())); 02052 } 02053 02054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02055 OwnershipManagerPtr owner_manager = this->ownership_manager(); 02056 if (owner_manager) { 02057 owner_manager->remove_writer (info.writer_id_); 02058 info.clear_owner_evaluated (); 02059 } 02060 #endif 02061 02062 bool liveliness_changed = false; 02063 02064 if (info.state_ == WriterInfo::ALIVE) { 02065 -- liveliness_changed_status_.alive_count; 02066 -- liveliness_changed_status_.alive_count_change; 02067 liveliness_changed = true; 02068 } 02069 02070 if (info.state_ == WriterInfo::DEAD) { 02071 -- liveliness_changed_status_.not_alive_count; 02072 -- liveliness_changed_status_.not_alive_count_change; 02073 liveliness_changed = true; 02074 } 02075 02076 liveliness_changed_status_.last_publication_handle = info.handle_; 02077 instances_liveliness_update(info, ACE_OS::gettimeofday()); 02078 02079 if (liveliness_changed) { 02080 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02081 this->notify_liveliness_change(); 02082 } 02083 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 697 of file DataReaderImpl.h.
friend class EndHistoricSamplesMissedSweeper [friend] |
Definition at line 694 of file DataReaderImpl.h.
friend class InstanceState [friend] |
Definition at line 693 of file DataReaderImpl.h.
friend class OwnershipManagerPtr [friend] |
Definition at line 460 of file DataReaderImpl.h.
friend class QueryConditionImpl [friend] |
Definition at line 199 of file DataReaderImpl.h.
Referenced by create_querycondition().
friend class RemoveAssociationSweeper< DataReaderImpl > [friend] |
Definition at line 695 of file DataReaderImpl.h.
friend class RequestedDeadlineWatchdog [friend] |
Definition at line 198 of file DataReaderImpl.h.
friend class SubscriberImpl [friend] |
Definition at line 200 of file DataReaderImpl.h.
bool OpenDDS::DCPS::DataReaderImpl::always_get_history_ [private] |
Definition at line 829 of file DataReaderImpl.h.
Referenced by filter_sample().
Definition at line 728 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), and notify_latency().
bool OpenDDS::DCPS::DataReaderImpl::coherent_ [protected] |
Is accessing to Group coherent changes ?
Definition at line 646 of file DataReaderImpl.h.
Referenced by begin_access(), and end_access().
TopicDescriptionPtr<ContentFilteredTopicImpl> OpenDDS::DCPS::DataReaderImpl::content_filtered_topic_ [protected] |
Definition at line 641 of file DataReaderImpl.h.
Referenced by enable(), enable_filtering(), get_cf_topic(), and get_topicdescription().
Definition at line 711 of file DataReaderImpl.h.
Referenced by enable().
Definition at line 702 of file DataReaderImpl.h.
Referenced by enable(), get_next_handle(), init(), set_qos(), transport_assoc_done(), and update_subscription_params().
RepoId OpenDDS::DCPS::DataReaderImpl::dp_id_ [private] |
Definition at line 703 of file DataReaderImpl.h.
Referenced by enable(), get_dp_id(), set_qos(), transport_assoc_done(), and update_subscription_params().
RcHandle<EndHistoricSamplesMissedSweeper> OpenDDS::DCPS::DataReaderImpl::end_historic_sweeper_ [private] |
Definition at line 708 of file DataReaderImpl.h.
Referenced by add_link(), remove_associations_i(), and resume_sample_processing().
Ordered group samples.
Definition at line 649 of file DataReaderImpl.h.
Referenced by end_access(), and get_ordered_data().
RepoIdToHandleMap OpenDDS::DCPS::DataReaderImpl::id_to_handle_map_ [private] |
Definition at line 719 of file DataReaderImpl.h.
Referenced by get_matched_publications(), remove_associations_i(), and transport_assoc_done().
SubscriptionInstanceMapType OpenDDS::DCPS::DataReaderImpl::instances_ [mutable, protected] |
: document why the instances_ container is mutable.
Definition at line 591 of file DataReaderImpl.h.
Referenced by accept_coherent(), contains_sample(), data_received(), get_handle_instance(), get_instance_handles(), get_ordered_data(), has_zero_copies(), have_instance_states(), have_sample_states(), have_view_states(), instances_liveliness_update(), reject_coherent(), release_instance(), reschedule_deadline(), signal_liveliness(), and total_samples().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::instances_lock_ [mutable, protected] |
Assume since the container is mutable(?!!?) it may need to use the lock while const. : remove the recursive nature of the instances_lock if not needed.
Definition at line 596 of file DataReaderImpl.h.
Referenced by accept_coherent(), contains_sample(), data_received(), get_handle_instance(), get_instance_handles(), get_ordered_data(), has_zero_copies(), have_instance_states(), have_sample_states(), have_view_states(), instances_liveliness_update(), reject_coherent(), release_instance(), reschedule_deadline(), signal_liveliness(), and total_samples().
bool OpenDDS::DCPS::DataReaderImpl::is_bit_ [private] |
Flag indicates that this datareader is a builtin topic datareader.
Definition at line 827 of file DataReaderImpl.h.
Referenced by add_association(), init(), is_bit(), notify_subscription_lost(), notify_subscription_reconnected(), remove_associations(), remove_associations_i(), and transport_assoc_done().
bool OpenDDS::DCPS::DataReaderImpl::is_exclusive_ownership_ [protected] |
Definition at line 636 of file DataReaderImpl.h.
Referenced by data_received(), init(), and ownership_filter_instance().
Definition at line 820 of file DataReaderImpl.h.
Referenced by enable(), get_requested_deadline_missed_status(), and qos_change().
DDS::DataReaderListener_var OpenDDS::DCPS::DataReaderImpl::listener_ [private] |
Definition at line 701 of file DataReaderImpl.h.
Referenced by get_listener(), init(), listener_for(), notify_latency(), notify_liveliness_change(), notify_subscription_disconnected(), notify_subscription_lost(), notify_subscription_reconnected(), and set_listener().
Definition at line 700 of file DataReaderImpl.h.
Referenced by init(), listener_for(), notify_liveliness_change(), and set_listener().
Definition at line 722 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_liveliness_changed_status(), instances_liveliness_update(), notify_liveliness_change(), writer_became_alive(), writer_became_dead(), and writer_removed().
Definition at line 818 of file DataReaderImpl.h.
Referenced by transport_assoc_done(), and writer_became_alive().
Monitor* OpenDDS::DCPS::DataReaderImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 857 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), enable(), release_instance(), remove_associations_i(), transport_assoc_done(), writer_became_alive(), and writer_became_dead().
Definition at line 712 of file DataReaderImpl.h.
Referenced by enable().
Definition at line 632 of file DataReaderImpl.h.
Referenced by enable(), get_instance_handle(), get_matched_publication_data(), get_next_handle(), init(), lookup_instance_handles(), OpenDDS::DCPS::InstanceState::sample_info(), and transport_assoc_done().
Periodic Monitor object for this entity.
Definition at line 860 of file DataReaderImpl.h.
Referenced by DataReaderImpl().
Definition at line 715 of file DataReaderImpl.h.
Referenced by add_association(), get_matched_publication_data(), get_matched_publications(), get_requested_incompatible_qos_status(), get_subscription_matched_status(), remove_all_associations(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().
Definition at line 620 of file DataReaderImpl.h.
Referenced by add_association(), add_link(), check_transport_qos(), enable(), filter_sample(), get_qos(), init(), process_latency(), qos_change(), set_qos(), and time_based_filter_instance().
unsigned int OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size_ [private] |
Bound (or initial reservation) of raw latency buffer.
Definition at line 847 of file DataReaderImpl.h.
Referenced by add_association(), and raw_latency_buffer_size().
DataCollector<double>::OnFull OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type_ [private] |
Type of raw latency data buffer.
Definition at line 850 of file DataReaderImpl.h.
Referenced by add_association(), and raw_latency_buffer_type().
Definition at line 619 of file DataReaderImpl.h.
Referenced by enable().
The orb's reactor to be used to register the liveliness timer.
Definition at line 744 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), and get_reactor().
ReadConditionSet OpenDDS::DCPS::DataReaderImpl::read_conditions_ [private] |
Definition at line 854 of file DataReaderImpl.h.
Referenced by create_querycondition(), create_readcondition(), delete_contained_entities(), delete_readcondition(), has_readcondition(), and notify_read_conditions().
RcHandle<RemoveAssociationSweeper<DataReaderImpl> > OpenDDS::DCPS::DataReaderImpl::remove_association_sweeper_ [private] |
Definition at line 709 of file DataReaderImpl.h.
Referenced by remove_associations(), and remove_associations_i().
DDS::RequestedDeadlineMissedStatus OpenDDS::DCPS::DataReaderImpl::requested_deadline_missed_status_ [private] |
Definition at line 723 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), enable(), get_requested_deadline_missed_status(), and qos_change().
DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::DataReaderImpl::requested_incompatible_qos_status_ [private] |
Definition at line 724 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_requested_incompatible_qos_status(), and update_incompatible_qos().
Definition at line 716 of file DataReaderImpl.h.
Definition at line 630 of file DataReaderImpl.h.
Referenced by accept_sample_processing(), coherent_changes_completed(), notify_read_conditions(), and writer_became_alive().
lock protecting sample container as well as statuses.
Definition at line 627 of file DataReaderImpl.h.
Referenced by accept_coherent(), begin_access(), contains_sample(), create_querycondition(), create_readcondition(), data_received(), delete_contained_entities(), delete_readcondition(), enable(), end_access(), get_instance_handles(), get_liveliness_changed_status(), get_ordered_data(), get_requested_deadline_missed_status(), get_sample_lost_status(), get_sample_rejected_status(), OpenDDS::DCPS::QueryConditionImpl::get_trigger_value(), has_zero_copies(), qos_change(), reject_coherent(), release_instance(), signal_liveliness(), and transport_assoc_done().
Definition at line 624 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_sample_lost_status(), and set_sample_lost_status().
Definition at line 623 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_sample_rejected_status(), and set_sample_rejected_status().
StatsMapType OpenDDS::DCPS::DataReaderImpl::statistics_ [private] |
Statistics for this reader, collected for each writer.
Definition at line 844 of file DataReaderImpl.h.
Referenced by add_association(), get_latency_stats(), process_latency(), raw_latency_statistics(), and reset_latency_stats().
bool OpenDDS::DCPS::DataReaderImpl::statistics_enabled_ [private] |
Flag indicating status of statistics gathering.
Definition at line 832 of file DataReaderImpl.h.
Referenced by statistics_enabled().
Definition at line 651 of file DataReaderImpl.h.
Referenced by set_subscriber_qos(), and verify_coherent_changes_completion().
Definition at line 707 of file DataReaderImpl.h.
Referenced by get_subscriber_servant(), init(), and parent().
Definition at line 737 of file DataReaderImpl.h.
Definition at line 725 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_subscription_matched_status(), remove_associations_i(), and transport_assoc_done().
DDS::TopicDescription_var OpenDDS::DCPS::DataReaderImpl::topic_desc_ [private] |
Definition at line 699 of file DataReaderImpl.h.
Referenced by get_topicdescription(), and init().
Definition at line 633 of file DataReaderImpl.h.
Referenced by enable(), get_next_handle(), get_topic_id(), inconsistent_topic(), init(), and ~DataReaderImpl().
bool OpenDDS::DCPS::DataReaderImpl::transport_disabled_ [private] |
Definition at line 862 of file DataReaderImpl.h.
Referenced by disable_transport(), and enable().
Watchdog responsible for reporting missed offered deadlines.
Definition at line 823 of file DataReaderImpl.h.
Referenced by accept_sample_processing(), data_received(), enable(), qos_change(), and reschedule_deadline().
WriterMapType OpenDDS::DCPS::DataReaderImpl::writers_ [private] |
Definition at line 838 of file DataReaderImpl.h.
Referenced by accept_sample_processing(), add_association(), add_link(), check_historic(), coherent_change_received(), data_received(), get_writer_states(), notify_liveliness_change(), ownership_filter_instance(), remove_all_associations(), remove_associations(), remove_associations_i(), remove_publication(), reset_coherent_info(), resume_sample_processing(), signal_liveliness(), transport_assoc_done(), update_ownership_strength(), and writer_activity().
RW lock for reading/writing publications.
Definition at line 841 of file DataReaderImpl.h.
Referenced by accept_sample_processing(), add_association(), add_link(), check_historic(), coherent_change_received(), data_received(), get_writer_states(), remove_all_associations(), remove_associations(), remove_associations_i(), remove_publication(), reset_coherent_info(), resume_sample_processing(), signal_liveliness(), transport_assoc_done(), update_ownership_strength(), and writer_activity().