#include <DataReaderImpl.h>
Inheritance diagram for OpenDDS::DCPS::DataReaderImpl:
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
This class must be inherited by the type-specific datareader which is specific to the data-type associated with the topic.
Definition at line 183 of file DataReaderImpl.h.
typedef VarLess<DDS::ReadCondition> OpenDDS::DCPS::DataReaderImpl::RCCompLess [private] |
Definition at line 841 of file DataReaderImpl.h.
typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::DataReaderImpl::Reverse_Lock_t [protected] |
typedef std::pair<PublicationId, WriterInfo::WriterState> OpenDDS::DCPS::DataReaderImpl::WriterStatePair |
Definition at line 451 of file DataReaderImpl.h.
OpenDDS::DCPS::DataReaderImpl::DataReaderImpl | ( | ) |
Definition at line 51 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, budget_exceeded_status_, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, DDS::SampleRejectedStatus::last_instance_handle, DDS::RequestedDeadlineMissedStatus::last_instance_handle, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::LivelinessChangedStatus::last_publication_handle, DDS::SampleRejectedStatus::last_reason, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::NOT_REJECTED, periodic_monitor_, DDS::RequestedIncompatibleQosStatus::policies, reactor_, requested_deadline_missed_status_, requested_incompatible_qos_status_, sample_lost_status_, sample_rejected_status_, subscription_match_status_, TheServiceParticipant, OpenDDS::DCPS::BudgetExceededStatus::total_count, DDS::SampleRejectedStatus::total_count, DDS::SampleLostStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count, OpenDDS::DCPS::BudgetExceededStatus::total_count_change, DDS::SampleRejectedStatus::total_count_change, DDS::SampleLostStatus::total_count_change, DDS::SubscriptionMatchedStatus::total_count_change, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::RequestedDeadlineMissedStatus::total_count_change.
00052 : rd_allocator_(0), 00053 qos_(TheServiceParticipant->initial_DataReaderQos()), 00054 reverse_sample_lock_(sample_lock_), 00055 participant_servant_(0), 00056 topic_servant_(0), 00057 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00058 is_exclusive_ownership_ (false), 00059 #endif 00060 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00061 owner_manager_ (0), 00062 #endif 00063 coherent_(false), 00064 subqos_ (TheServiceParticipant->initial_SubscriberQos()), 00065 topic_desc_(0), 00066 listener_mask_(DEFAULT_STATUS_MASK), 00067 domain_id_(0), 00068 subscriber_servant_(0), 00069 end_historic_sweeper_(new EndHistoricSamplesMissedSweeper(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00070 remove_association_sweeper_(new RemoveAssociationSweeper<DataReaderImpl>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00071 n_chunks_(TheServiceParticipant->n_chunks()), 00072 reverse_pub_handle_lock_(publication_handle_lock_), 00073 reactor_(0), 00074 liveliness_timer_(new LivelinessTimer(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this)), 00075 last_deadline_missed_total_count_(0), 00076 watchdog_(), 00077 is_bit_(false), 00078 initialized_(false), 00079 always_get_history_(false), 00080 statistics_enabled_(false), 00081 raw_latency_buffer_size_(0), 00082 raw_latency_buffer_type_(DataCollector<double>::KeepOldest), 00083 monitor_(0), 00084 periodic_monitor_(0), 00085 transport_disabled_(false) 00086 { 00087 reactor_ = TheServiceParticipant->timer(); 00088 00089 liveliness_changed_status_.alive_count = 0; 00090 liveliness_changed_status_.not_alive_count = 0; 00091 liveliness_changed_status_.alive_count_change = 0; 00092 liveliness_changed_status_.not_alive_count_change = 0; 00093 liveliness_changed_status_.last_publication_handle = 00094 DDS::HANDLE_NIL; 00095 00096 requested_deadline_missed_status_.total_count = 0; 00097 requested_deadline_missed_status_.total_count_change = 0; 00098 requested_deadline_missed_status_.last_instance_handle = 00099 DDS::HANDLE_NIL; 00100 00101 requested_incompatible_qos_status_.total_count = 0; 00102 requested_incompatible_qos_status_.total_count_change = 0; 00103 requested_incompatible_qos_status_.last_policy_id = 0; 00104 requested_incompatible_qos_status_.policies.length(0); 00105 00106 subscription_match_status_.total_count = 0; 00107 subscription_match_status_.total_count_change = 0; 00108 subscription_match_status_.current_count = 0; 00109 subscription_match_status_.current_count_change = 0; 00110 subscription_match_status_.last_publication_handle = 00111 DDS::HANDLE_NIL; 00112 00113 sample_lost_status_.total_count = 0; 00114 sample_lost_status_.total_count_change = 0; 00115 00116 sample_rejected_status_.total_count = 0; 00117 sample_rejected_status_.total_count_change = 0; 00118 sample_rejected_status_.last_reason = DDS::NOT_REJECTED; 00119 sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL; 00120 00121 this->budget_exceeded_status_.total_count = 0; 00122 this->budget_exceeded_status_.total_count_change = 0; 00123 this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL; 00124 00125 monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this); 00126 periodic_monitor_ = TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this); 00127 }
OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl | ( | ) | [virtual] |
Definition at line 131 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), DBG_ENTRY_LVL, OpenDDS::DCPS::ReactorInterceptor::destroy(), end_historic_sweeper_, initialized_, liveliness_timer_, rd_allocator_, remove_association_sweeper_, OpenDDS::DCPS::ReactorInterceptor::wait(), and writers_.
00132 { 00133 DBG_ENTRY_LVL("DataReaderImpl","~DataReaderImpl",6); 00134 00135 { 00136 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 00137 read_guard, 00138 this->writers_lock_); 00139 // Cancel any uncancelled sweeper timers to decrement reference count. 00140 WriterMapType::iterator writer; 00141 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00142 end_historic_sweeper_->cancel_timer(writer->second); 00143 remove_association_sweeper_->cancel_timer(writer->second); 00144 } 00145 } 00146 00147 end_historic_sweeper_->wait(); 00148 end_historic_sweeper_->destroy(); 00149 00150 remove_association_sweeper_->wait(); 00151 remove_association_sweeper_->destroy(); 00152 00153 liveliness_timer_->cancel_timer(); 00154 liveliness_timer_->wait(); 00155 liveliness_timer_->destroy(); 00156 00157 if (initialized_) { 00158 delete rd_allocator_; 00159 } 00160 }
void OpenDDS::DCPS::DataReaderImpl::accept_coherent | ( | PublicationId & | writer_id, | |
RepoId & | publisher_id | |||
) |
Definition at line 3061 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, instances_, OPENDDS_STRING, and sample_lock_.
Referenced by verify_coherent_changes_completion().
03063 { 03064 if (::OpenDDS::DCPS::DCPS_debug_level > 0) { 03065 GuidConverter reader (this->subscription_id_); 03066 GuidConverter writer (writer_id); 03067 GuidConverter publisher (publisher_id); 03068 ACE_DEBUG((LM_DEBUG, 03069 ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()") 03070 ACE_TEXT(" reader %C writer %C publisher %C \n"), 03071 OPENDDS_STRING(reader).c_str(), 03072 OPENDDS_STRING(writer).c_str(), 03073 OPENDDS_STRING(publisher).c_str())); 03074 } 03075 03076 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03077 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 03078 03079 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 03080 iter != this->instances_.end(); ++iter) { 03081 iter->second->rcvd_strategy_->accept_coherent( 03082 writer_id, publisher_id); 03083 } 03084 }
void OpenDDS::DCPS::DataReaderImpl::add_association | ( | const RepoId & | yourId, | |
const WriterAssociation & | writer, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 295 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, raw_latency_buffer_size_, raw_latency_buffer_type_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
00298 { 00299 if (DCPS_debug_level) { 00300 GuidConverter reader_converter(yourId); 00301 GuidConverter writer_converter(writer.writerId); 00302 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ") 00303 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_, 00304 OPENDDS_STRING(reader_converter).c_str(), 00305 OPENDDS_STRING(writer_converter).c_str())); 00306 } 00307 00308 if (entity_deleted_.value()) { 00309 if (DCPS_debug_level) { 00310 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association") 00311 ACE_TEXT(" This is a deleted datareader, ignoring add.\n"))); 00312 } 00313 return; 00314 } 00315 00316 // We are being called back from the repository before we are done 00317 // processing after our call to the repository that caused this call 00318 // (from the repository) to be made. 00319 if (GUID_UNKNOWN == subscription_id_) { 00320 subscription_id_ = yourId; 00321 } 00322 00323 //Why do we need the publication_handle_lock_ here? No access to id_to_handle_map_... 00324 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00325 00326 00327 // For each writer in the list of writers to associate with, we 00328 // create a WriterInfo and a WriterStats object and store them in 00329 // our internal maps. 00330 // 00331 { 00332 00333 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_); 00334 00335 const PublicationId& writer_id = writer.writerId; 00336 RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos); 00337 std::pair<WriterMapType::iterator, bool> bpair = writers_.insert( 00338 // This insertion is idempotent. 00339 WriterMapType::value_type( 00340 writer_id, 00341 info)); 00342 00343 // Schedule timer if necessary 00344 // - only need to check reader qos - we know the writer must be >= reader 00345 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 00346 info->waiting_for_end_historic_samples_ = true; 00347 } 00348 00349 this->statistics_.insert( 00350 StatsMapType::value_type( 00351 writer_id, 00352 WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_))); 00353 00354 // If this is a durable reader 00355 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 00356 // TODO schedule timer for removing flag from writers 00357 } 00358 00359 if (DCPS_debug_level > 4) { 00360 GuidConverter converter(writer_id); 00361 ACE_DEBUG((LM_DEBUG, 00362 "(%P|%t) DataReaderImpl::add_association: " 00363 "inserted writer %C.return %d \n", 00364 OPENDDS_STRING(converter).c_str(), bpair.second)); 00365 00366 WriterMapType::iterator iter = writers_.find(writer_id); 00367 if (iter != writers_.end()) { 00368 // This may not be an error since it could happen that the sample 00369 // is delivered to the datareader after the write is dis-associated 00370 // with this datareader. 00371 GuidConverter reader_converter(subscription_id_); 00372 GuidConverter writer_converter(writer_id); 00373 ACE_DEBUG((LM_DEBUG, 00374 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ") 00375 ACE_TEXT("reader %C is associated with writer %C.\n"), 00376 OPENDDS_STRING(reader_converter).c_str(), 00377 OPENDDS_STRING(writer_converter).c_str())); 00378 } 00379 } 00380 } 00381 00382 // Propagate the add_associations processing down into the Transport 00383 // layer here. This will establish the transport support and reserve 00384 // usage of an existing connection or initiate creation of a new 00385 // connection if no suitable connection is available. 00386 AssociationData data; 00387 data.remote_id_ = writer.writerId; 00388 data.remote_data_ = writer.writerTransInfo; 00389 data.publication_transport_priority_ = 00390 writer.writerQos.transport_priority.value; 00391 data.remote_reliable_ = 00392 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00393 data.remote_durable_ = 00394 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00395 00396 //Do not hold publication_handle_lock_ when calling associate due to possible reactor 00397 //deadlock on passive side completion 00398 //associate does not access id_to_handle_map_, thus not clear why publication_handle_lock_ 00399 //is held here anyway 00400 guard.release(); 00401 00402 if (!associate(data, active)) { 00403 if (DCPS_debug_level) { 00404 ACE_DEBUG((LM_ERROR, 00405 ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ") 00406 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00407 } 00408 } 00409 }
void OpenDDS::DCPS::DataReaderImpl::add_link | ( | const DataLink_rch & | link, | |
const RepoId & | peer | |||
) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 3344 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::add_link(), end_historic_sweeper_, OPENDDS_STRING, resume_sample_processing(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::schedule_timer(), DDS::VOLATILE_DURABILITY_QOS, writers_, and writers_lock_.
03345 { 03346 if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) { 03347 03348 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_); 03349 03350 WriterMapType::iterator it = writers_.find(peer); 03351 if (it != writers_.end()) { 03352 // Schedule timer if necessary 03353 // - only need to check reader qos - we know the writer must be >= reader 03354 end_historic_sweeper_->schedule_timer(it->second); 03355 } 03356 } 03357 TransportClient::add_link(link, peer); 03358 TransportImpl_rch impl = link->impl(); 03359 OPENDDS_STRING type = impl->transport_type(); 03360 03361 if (type == "rtps_udp" || type == "multicast") { 03362 resume_sample_processing(peer); 03363 } 03364 }
void OpenDDS::DCPS::DataReaderImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 524 of file DataReaderImpl.cpp.
00525 { 00526 // For the current DCPSInfoRepo implementation, the DataReader side will 00527 // always be passive, so association_complete() will not be called. 00528 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::auto_return_loan | ( | void * | seq | ) | [pure virtual] |
This method provides virtual access to type specific code that is used when loans are automatically returned. The destructor of the sequence supporing zero-copy read calls this method on the datareader that provided the loan.
seq | - The sequence of loaned values. |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by TAO::DCPS::ZeroCopyDataSeq< Sample_T, DEF_MAX >::~ZeroCopyDataSeq().
void OpenDDS::DCPS::DataReaderImpl::begin_access | ( | ) |
Definition at line 3202 of file DataReaderImpl.cpp.
References coherent_, and sample_lock_.
03203 { 03204 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03205 this->coherent_ = true; 03206 }
bool OpenDDS::DCPS::DataReaderImpl::check_historic | ( | const ReceivedDataSample & | sample | ) | [private] |
collect samples received before END_HISTORIC_SAMPLES returns false if normal processing of this sample should be skipped
Definition at line 3313 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), writers_, and writers_lock_.
Referenced by data_received().
03314 { 03315 ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true); 03316 WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_); 03317 if (iter != writers_.end()) { 03318 const SequenceNumber& seq = sample.header_.sequence_; 03319 if (iter->second->waiting_for_end_historic_samples_) { 03320 iter->second->historic_samples_.insert(std::make_pair(seq, sample)); 03321 return false; 03322 } 03323 if (iter->second->last_historic_seq_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN() 03324 && !sample.header_.historic_sample_ 03325 && seq <= iter->second->last_historic_seq_) { 03326 // this sample must have been seen before the END_HISTORIC_SAMPLES control msg 03327 return false; 03328 } 03329 } 03330 return true; 03331 }
bool OpenDDS::DCPS::DataReaderImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 1791 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportInst::is_reliable(), and DDS::RELIABLE_RELIABILITY_QOS.
01792 { 01793 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 01794 return ti.is_reliable(); 01795 } 01796 return true; 01797 }
void OpenDDS::DCPS::DataReaderImpl::cleanup | ( | ) |
cleanup the DataWriter.
Definition at line 164 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer(), content_filtered_topic_, dr_local_objref_, end_historic_sweeper_, instances_, liveliness_timer_, owner_manager_, remove_association_sweeper_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), OpenDDS::DCPS::ContentFilteredTopicImpl::remove_reader(), topic_servant_, OpenDDS::DCPS::TopicImpl::type_name(), OpenDDS::DCPS::OwnershipManager::unregister_reader(), OpenDDS::DCPS::TopicDescriptionImpl::update_reader_count(), OpenDDS::DCPS::ReactorInterceptor::wait(), watchdog_, and writers_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().
00165 { 00166 { 00167 // Is this lock necessary? 00168 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00169 guard, 00170 this->sample_lock_); 00171 00172 liveliness_timer_->cancel_timer(); 00173 } 00174 liveliness_timer_->wait(); 00175 00176 // Cancel any watchdog timers 00177 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 00178 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 00179 iter != instances_.end(); 00180 ++iter) { 00181 SubscriptionInstance *ptr = iter->second; 00182 if (this->watchdog_ && ptr->deadline_timer_id_ != -1) { 00183 this->watchdog_->cancel_timer(ptr); 00184 } 00185 } 00186 } 00187 00188 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00189 if (owner_manager_) { 00190 owner_manager_->unregister_reader(topic_servant_->type_name(), this); 00191 } 00192 #endif 00193 00194 if (topic_servant_) { 00195 topic_servant_->remove_entity_ref(); 00196 topic_servant_->_remove_ref(); 00197 } 00198 00199 dr_local_objref_ = DDS::DataReader::_nil(); 00200 00201 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00202 if (!CORBA::is_nil(content_filtered_topic_.in())) { 00203 ContentFilteredTopicImpl* cft = 00204 dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in()); 00205 cft->remove_reader(*this); 00206 cft->update_reader_count(false); 00207 content_filtered_topic_ = DDS::ContentFilteredTopic::_nil (); 00208 } 00209 #endif 00210 00211 { 00212 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 00213 read_guard, 00214 this->writers_lock_); 00215 // Cancel any uncancelled sweeper timers 00216 WriterMapType::iterator writer; 00217 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00218 end_historic_sweeper_->cancel_timer(writer->second); 00219 remove_association_sweeper_->cancel_timer(writer->second); 00220 } 00221 } 00222 00223 end_historic_sweeper_->wait(); 00224 remove_association_sweeper_->wait(); 00225 }
bool OpenDDS::DCPS::DataReaderImpl::coherent_change_received | ( | WriterInfo * | writer | ) | [private] |
void OpenDDS::DCPS::DataReaderImpl::coherent_change_received | ( | RepoId | publisher_id, | |
Coherent_State & | result | |||
) |
Definition at line 3131 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, state, and writers_.
03132 { 03133 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 03134 03135 result = COMPLETED; 03136 for (WriterMapType::iterator iter = writers_.begin(); 03137 iter != writers_.end(); 03138 ++iter) { 03139 03140 if (iter->second->publisher_id_ == publisher_id) { 03141 Coherent_State state = iter->second->coherent_change_received(); 03142 if (state == NOT_COMPLETED_YET) { 03143 result = state; 03144 break; 03145 } 03146 else if (state == REJECTED) { 03147 result = REJECTED; 03148 } 03149 } 03150 } 03151 }
void OpenDDS::DCPS::DataReaderImpl::coherent_changes_completed | ( | DataReaderImpl * | reader | ) |
Definition at line 3155 of file DataReaderImpl.cpp.
References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, dr_local_objref_, listener_for(), OpenDDS::DCPS::SubscriberImpl::listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and subscriber_servant_.
Referenced by verify_coherent_changes_completion().
03156 { 03157 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true); 03158 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true); 03159 03160 ::DDS::SubscriberListener_var sub_listener = 03161 this->subscriber_servant_->listener_for(::DDS::DATA_ON_READERS_STATUS); 03162 if (!CORBA::is_nil(sub_listener.in())) 03163 { 03164 if (reader == this) { 03165 // Release the sample_lock before listener callback. 03166 ACE_GUARD (Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 03167 sub_listener->on_data_on_readers(this->subscriber_servant_); 03168 } 03169 03170 this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false); 03171 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 03172 } 03173 else 03174 { 03175 this->subscriber_servant_->notify_status_condition(); 03176 03177 ::DDS::DataReaderListener_var listener = 03178 this->listener_for (::DDS::DATA_AVAILABLE_STATUS); 03179 03180 if (!CORBA::is_nil(listener.in())) 03181 { 03182 if (reader == this) { 03183 // Release the sample_lock before listener callback. 03184 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 03185 listener->on_data_available(dr_local_objref_.in ()); 03186 } 03187 else { 03188 listener->on_data_available(dr_local_objref_.in ()); 03189 } 03190 03191 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false); 03192 this->subscriber_servant_->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 03193 } 03194 else 03195 { 03196 this->notify_status_condition(); 03197 } 03198 } 03199 }
bool OpenDDS::DCPS::DataReaderImpl::contains_sample | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) |
Fold-in the three separate loops of have_sample_states(), have_view_states(), and have_instance_states(). Takes the sample_lock_.
Definition at line 1892 of file DataReaderImpl.cpp.
References instances_, and sample_lock_.
Referenced by OpenDDS::DCPS::ReadConditionImpl::get_trigger_value().
01894 { 01895 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false); 01896 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01897 01898 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 01899 end = instances_.end(); iter != end; ++iter) { 01900 SubscriptionInstance& inst = *iter->second; 01901 01902 if ((inst.instance_state_.view_state() & view_states) && 01903 (inst.instance_state_.instance_state() & instance_states)) { 01904 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0; 01905 item = item->next_data_sample_) { 01906 if (item->sample_state_ & sample_states 01907 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01908 && !item->coherent_change_ 01909 #endif 01910 ) { 01911 return true; 01912 } 01913 } 01914 } 01915 } 01916 01917 return false; 01918 }
virtual bool OpenDDS::DCPS::DataReaderImpl::contains_sample_filtered | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
const FilterEvaluator & | evaluator, | |||
const DDS::StringSeq & | params | |||
) | [pure virtual] |
Referenced by OpenDDS::DCPS::QueryConditionImpl::get_trigger_value().
DDS::QueryCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_querycondition | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
const char * | query_expression, | |||
const DDS::StringSeq & | query_parameters | |||
) | [virtual] |
Definition at line 891 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, QueryConditionImpl, and read_conditions_.
00897 { 00898 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0); 00899 try { 00900 DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states, 00901 view_states, instance_states, query_expression, query_parameters); 00902 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc); 00903 read_conditions_.insert(rc); 00904 return qc._retn(); 00905 } catch (const std::exception& e) { 00906 if (DCPS_debug_level) { 00907 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ") 00908 ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"), 00909 e.what())); 00910 } 00911 return 0; 00912 } 00913 }
DDS::ReadCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_readcondition | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [virtual] |
Definition at line 878 of file DataReaderImpl.cpp.
References read_conditions_.
00882 { 00883 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0); 00884 DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states, 00885 view_states, instance_states); 00886 read_conditions_.insert(rc); 00887 return rc._retn(); 00888 }
void OpenDDS::DCPS::DataReaderImpl::data_received | ( | const ReceivedDataSample & | sample | ) | [virtual] |
process a message that has been received - could be control or a data sample.
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 1454 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(), check_historic(), OpenDDS::DCPS::SubscriptionInstance::cur_sample_tv_, OpenDDS::DCPS::SubscriberImpl::data_received(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_demarshal(), OpenDDS::DCPS::DISPOSE_INSTANCE, dispose_unregister(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER, OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::EntityImpl::get_deleted(), get_repo_id(), OpenDDS::DCPS::GUID_UNKNOWN, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, is_exclusive_ownership_, OpenDDS::DCPS::InstanceState::is_last(), OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::SubscriptionInstance::last_sample_tv_, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, notify_read_conditions(), OPENDDS_STRING, process_latency(), OpenDDS::DCPS::DataSampleHeader::publication_id_, resume_sample_processing(), reverse_sample_lock_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(), subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, verify_coherent_changes_completion(), watchdog_, writer_activity(), and writers_.
Referenced by deliver_historic().
01455 { 01456 DBG_ENTRY_LVL("DataReaderImpl","data_received",6); 01457 01458 // ensure some other thread is not changing the sample container 01459 // or statuses related to samples. 01460 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 01461 01462 if (get_deleted()) return; 01463 01464 if (DCPS_debug_level > 9) { 01465 GuidConverter converter(subscription_id_); 01466 ACE_DEBUG((LM_DEBUG, 01467 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01468 ACE_TEXT("%C received sample: %C.\n"), 01469 OPENDDS_STRING(converter).c_str(), 01470 to_string(sample.header_).c_str())); 01471 } 01472 01473 switch (sample.header_.message_id_) { 01474 case SAMPLE_DATA: 01475 case INSTANCE_REGISTRATION: { 01476 if (!check_historic(sample)) break; 01477 01478 DataSampleHeader const & header = sample.header_; 01479 01480 this->writer_activity(header); 01481 01482 // Verify data has not exceeded its lifespan. 01483 if (this->filter_sample(header)) break; 01484 01485 // This adds the reader to the set/list of readers with data. 01486 this->subscriber_servant_->data_received(this); 01487 01488 // Only gather statistics about real samples, not registration data, etc. 01489 if (header.message_id_ == SAMPLE_DATA) { 01490 this->process_latency(sample); 01491 } 01492 01493 // This also adds to the sample container and makes any callbacks 01494 // and condition modifications. 01495 01496 SubscriptionInstance* instance = 0; 01497 bool is_new_instance = false; 01498 bool filtered = false; 01499 if (sample.header_.key_fields_only_) { 01500 dds_demarshal(sample, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING); 01501 } else { 01502 dds_demarshal(sample, instance, is_new_instance, filtered, FULL_MARSHALING); 01503 } 01504 01505 // Per sample logging 01506 if (DCPS_debug_level >= 8) { 01507 GuidConverter reader_converter(subscription_id_); 01508 GuidConverter writer_converter(header.publication_id_); 01509 01510 ACE_DEBUG ((LM_DEBUG, 01511 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ") 01512 ACE_TEXT("instance %d is_new_instance %d filtered %d \n"), 01513 OPENDDS_STRING(reader_converter).c_str(), 01514 OPENDDS_STRING(writer_converter).c_str(), 01515 instance ? instance->instance_handle_ : 0, 01516 is_new_instance, filtered)); 01517 } 01518 01519 if (filtered) break; // sample filtered from instance 01520 bool accepted = true; 01521 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01522 bool verify_coherent = false; 01523 #endif 01524 RcHandle<WriterInfo> writer; 01525 01526 if (header.publication_id_.entityId.entityKind 01527 != OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER) { 01528 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 01529 01530 WriterMapType::iterator where 01531 = this->writers_.find(header.publication_id_); 01532 01533 if (where != this->writers_.end()) { 01534 if (header.coherent_change_) { 01535 01536 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01537 // Received coherent change 01538 where->second->group_coherent_ = header.group_coherent_; 01539 where->second->publisher_id_ = header.publisher_id_; 01540 ++where->second->coherent_samples_; 01541 verify_coherent = true; 01542 #endif 01543 writer = where->second; 01544 } 01545 } else { 01546 GuidConverter subscriptionBuffer(this->subscription_id_); 01547 GuidConverter publicationBuffer(header.publication_id_); 01548 ACE_DEBUG((LM_WARNING, 01549 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ") 01550 ACE_TEXT("subscription %C failed to find ") 01551 ACE_TEXT("publication data for %C.\n"), 01552 OPENDDS_STRING(subscriptionBuffer).c_str(), 01553 OPENDDS_STRING(publicationBuffer).c_str())); 01554 } 01555 } 01556 01557 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01558 if (verify_coherent) { 01559 accepted = this->verify_coherent_changes_completion(writer.in()); 01560 } 01561 #endif 01562 01563 if (this->watchdog_) { 01564 instance->last_sample_tv_ = instance->cur_sample_tv_; 01565 instance->cur_sample_tv_ = ACE_OS::gettimeofday(); 01566 01567 // Watchdog can't be called with sample_lock_ due to reactor deadlock 01568 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01569 if (is_new_instance) { 01570 this->watchdog_->schedule_timer(instance); 01571 01572 } else { 01573 this->watchdog_->execute(instance, false); 01574 } 01575 } 01576 01577 if (accepted) { 01578 this->notify_read_conditions(); 01579 } 01580 } 01581 break; 01582 01583 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01584 case END_COHERENT_CHANGES: { 01585 CoherentChangeControl control; 01586 01587 this->writer_activity(sample.header_); 01588 01589 Serializer serializer( 01590 sample.sample_, sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER); 01591 serializer >> control; 01592 01593 if (DCPS_debug_level > 0) { 01594 std::stringstream buffer; 01595 buffer << control << std::endl; 01596 01597 ACE_DEBUG((LM_DEBUG, 01598 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01599 ACE_TEXT("END_COHERENT_CHANGES %C\n"), 01600 buffer.str().c_str())); 01601 } 01602 01603 RcHandle<WriterInfo> writer; 01604 { 01605 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 01606 01607 WriterMapType::iterator it = 01608 this->writers_.find(sample.header_.publication_id_); 01609 01610 if (it == this->writers_.end()) { 01611 GuidConverter sub_id(this->subscription_id_); 01612 GuidConverter pub_id(sample.header_.publication_id_); 01613 ACE_DEBUG((LM_WARNING, 01614 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ") 01615 ACE_TEXT(" subscription %C failed to find ") 01616 ACE_TEXT(" publication data for %C!\n"), 01617 OPENDDS_STRING(sub_id).c_str(), 01618 OPENDDS_STRING(pub_id).c_str())); 01619 return; 01620 } 01621 else { 01622 writer = it->second; 01623 } 01624 it->second->set_group_info (control); 01625 } 01626 01627 if (this->verify_coherent_changes_completion(writer.in())) { 01628 this->notify_read_conditions(); 01629 } 01630 } 01631 break; 01632 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 01633 01634 case DATAWRITER_LIVELINESS: { 01635 if (DCPS_debug_level >= 4) { 01636 GuidConverter reader_converter(subscription_id_); 01637 GuidConverter writer_converter(sample.header_.publication_id_); 01638 ACE_DEBUG((LM_DEBUG, 01639 ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ") 01640 ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"), 01641 OPENDDS_STRING(reader_converter).c_str(), 01642 OPENDDS_STRING(writer_converter).c_str())); 01643 } 01644 this->writer_activity(sample.header_); 01645 01646 // tell all instances they got a liveliness message 01647 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 01648 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01649 iter != instances_.end(); 01650 ++iter) { 01651 SubscriptionInstance *ptr = iter->second; 01652 01653 ptr->instance_state_.lively(sample.header_.publication_id_); 01654 } 01655 } 01656 01657 } 01658 break; 01659 01660 case DISPOSE_INSTANCE: { 01661 if (!check_historic(sample)) break; 01662 this->writer_activity(sample.header_); 01663 SubscriptionInstance* instance = 0; 01664 01665 if (this->watchdog_) { 01666 // Find the instance first for timer cancellation since 01667 // the instance may be deleted during dispose and can 01668 // not be accessed. 01669 ReceivedDataSample dup(sample); 01670 this->lookup_instance(dup, instance); 01671 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01672 if (! this->is_exclusive_ownership_ 01673 || (this->is_exclusive_ownership_ 01674 && (instance != 0 ) 01675 && (this->owner_manager_->is_owner (instance->instance_handle_, 01676 sample.header_.publication_id_)))) { 01677 #endif 01678 this->watchdog_->cancel_timer(instance); 01679 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01680 } 01681 #endif 01682 } 01683 instance = 0; 01684 this->dispose_unregister(sample, instance); 01685 } 01686 this->notify_read_conditions(); 01687 break; 01688 01689 case UNREGISTER_INSTANCE: { 01690 if (!check_historic(sample)) break; 01691 this->writer_activity(sample.header_); 01692 SubscriptionInstance* instance = 0; 01693 01694 if (this->watchdog_) { 01695 // Find the instance first for timer cancellation since 01696 // the instance may be deleted during dispose and can 01697 // not be accessed. 01698 ReceivedDataSample dup(sample); 01699 this->lookup_instance(dup, instance); 01700 if( instance != 0) { 01701 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01702 if (! this->is_exclusive_ownership_ 01703 || (this->is_exclusive_ownership_ 01704 && instance->instance_state_.is_last (sample.header_.publication_id_))) { 01705 #endif 01706 this->watchdog_->cancel_timer(instance); 01707 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01708 } 01709 #endif 01710 } 01711 } 01712 instance = 0; 01713 this->dispose_unregister(sample, instance); 01714 } 01715 this->notify_read_conditions(); 01716 break; 01717 01718 case DISPOSE_UNREGISTER_INSTANCE: { 01719 if (!check_historic(sample)) break; 01720 this->writer_activity(sample.header_); 01721 SubscriptionInstance* instance = 0; 01722 01723 if (this->watchdog_) { 01724 // Find the instance first for timer cancellation since 01725 // the instance may be deleted during dispose and can 01726 // not be accessed. 01727 ReceivedDataSample dup(sample); 01728 this->lookup_instance(dup, instance); 01729 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01730 if (! this->is_exclusive_ownership_ 01731 || (this->is_exclusive_ownership_ 01732 && (instance != 0 ) 01733 && (this->owner_manager_->is_owner (instance->instance_handle_, 01734 sample.header_.publication_id_))) 01735 || (this->is_exclusive_ownership_ 01736 && (instance != 0 ) 01737 && instance->instance_state_.is_last (sample.header_.publication_id_))) { 01738 #endif 01739 this->watchdog_->cancel_timer(instance); 01740 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01741 } 01742 #endif 01743 } 01744 instance = 0; 01745 this->dispose_unregister(sample, instance); 01746 } 01747 this->notify_read_conditions(); 01748 break; 01749 01750 case END_HISTORIC_SAMPLES: { 01751 if (sample.header_.message_length_ >= sizeof(RepoId)) { 01752 Serializer ser(sample.sample_); 01753 RepoId readerId = GUID_UNKNOWN; 01754 ser >> readerId; 01755 if (readerId != GUID_UNKNOWN && readerId != get_repo_id()) { 01756 break; // not our message 01757 } 01758 } 01759 if (DCPS_debug_level > 4) { 01760 ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n")); 01761 } 01762 // Going to acquire writers lock, release samples lock 01763 guard.release(); 01764 this->resume_sample_processing(sample.header_.publication_id_); 01765 if (DCPS_debug_level > 4) { 01766 GuidConverter pub_id(sample.header_.publication_id_); 01767 ACE_DEBUG(( 01768 LM_INFO, 01769 "(%P|%t) Resumed sample processing for durable writer %C\n", 01770 OPENDDS_STRING(pub_id).c_str())); 01771 } 01772 break; 01773 } 01774 01775 default: 01776 ACE_ERROR((LM_ERROR, 01777 "(%P|%t) ERROR: DataReaderImpl::data_received" 01778 "unexpected message_id = %d\n", 01779 sample.header_.message_id_)); 01780 break; 01781 } 01782 }
virtual void OpenDDS::DCPS::DataReaderImpl::dds_demarshal | ( | const ReceivedDataSample & | sample, | |
SubscriptionInstance *& | instance, | |||
bool & | is_new_instance, | |||
bool & | filtered, | |||
MarshalingType | marshaling_type | |||
) | [pure virtual] |
virtual void OpenDDS::DCPS::DataReaderImpl::dec_ref_data_element | ( | ReceivedDataElement * | r | ) | [pure virtual] |
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_contained_entities | ( | ) | [virtual] |
Definition at line 933 of file DataReaderImpl.cpp.
References read_conditions_, DDS::RETCODE_OK, and DDS::RETCODE_OUT_OF_RESOURCES.
00934 { 00935 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00936 DDS::RETCODE_OUT_OF_RESOURCES); 00937 read_conditions_.clear(); 00938 return DDS::RETCODE_OK; 00939 }
virtual void OpenDDS::DCPS::DataReaderImpl::delete_instance_map | ( | void * | map | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::OwnershipManager::unregister_reader().
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_readcondition | ( | DDS::ReadCondition_ptr | a_condition | ) | [virtual] |
Definition at line 923 of file DataReaderImpl.cpp.
References read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and DDS::RETCODE_PRECONDITION_NOT_MET.
00925 { 00926 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00927 DDS::RETCODE_OUT_OF_RESOURCES); 00928 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition); 00929 return read_conditions_.erase(rc) 00930 ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET; 00931 }
void OpenDDS::DCPS::DataReaderImpl::deliver_historic | ( | OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& | samples | ) | [private] |
deliver samples that were held by check_historic()
Definition at line 3333 of file DataReaderImpl.cpp.
References data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, and OPENDDS_MAP().
Referenced by resume_sample_processing().
03334 { 03335 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t; 03336 const iter_t end = samples.end(); 03337 for (iter_t iter = samples.begin(); iter != end; ++iter) { 03338 iter->second.header_.historic_sample_ = true; 03339 data_received(iter->second); 03340 } 03341 }
ACE_INLINE void OpenDDS::DCPS::DataReaderImpl::disable_transport | ( | ) |
Definition at line 40 of file DataReaderImpl.inl.
References transport_disabled_.
Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr().
00041 { 00042 this->transport_disabled_ = true; 00043 }
void OpenDDS::DCPS::DataReaderImpl::dispose_unregister | ( | const ReceivedDataSample & | sample, | |
SubscriptionInstance *& | instance | |||
) | [virtual] |
Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Definition at line 2393 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
Referenced by data_received().
02395 { 02396 if (DCPS_debug_level > 0) { 02397 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n")); 02398 } 02399 }
DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 664 of file DataReaderImpl.h.
00664 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 1226 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::deadline, depth_, domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), get_cf_topic(), OpenDDS::DCPS::ContentFilteredTopicImpl::get_filter_class_name(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::SubscriberImpl::get_qos(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::DataReaderQos::history, DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataReaderQos::liveliness, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, monitor_, n_chunks_, DDS::Duration_t::nanosec, qos_, rd_allocator_, OpenDDS::DCPS::SubscriberImpl::reader_enabled(), DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), requested_deadline_missed_status_, RequestedDeadlineWatchdog, DDS::DataReaderQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, transport_disabled_, DDS::VOLATILE_DURABILITY_QOS, and watchdog_.
Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr(), and OpenDDS::DCPS::SubscriberImpl::create_datareader().
01227 { 01228 //According spec: 01229 // - Calling enable on an already enabled Entity returns OK and has no 01230 // effect. 01231 // - Calling enable on an Entity whose factory is not enabled will fail 01232 // and return PRECONDITION_NOT_MET. 01233 01234 if (this->is_enabled()) { 01235 return DDS::RETCODE_OK; 01236 } 01237 01238 if (this->subscriber_servant_->is_enabled() == false) { 01239 return DDS::RETCODE_PRECONDITION_NOT_MET; 01240 } 01241 01242 if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) { 01243 // The spec says qos_.history.depth is "has no effect" 01244 // when history.kind = KEEP_ALL so use max_samples_per_instance 01245 depth_ = qos_.resource_limits.max_samples_per_instance; 01246 01247 } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS 01248 depth_ = qos_.history.depth; 01249 } 01250 01251 if (depth_ == DDS::LENGTH_UNLIMITED) { 01252 // DDS::LENGTH_UNLIMITED is negative so make it a positive 01253 // value that is for all intents and purposes unlimited 01254 // and we can use it for comparisons. 01255 // use 2147483647L because that is the greatest value a signed 01256 // CORBA::Long can have. 01257 // WARNING: The client risks running out of memory in this case. 01258 depth_ = 2147483647L; 01259 } 01260 01261 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 01262 n_chunks_ = qos_.resource_limits.max_samples; 01263 } 01264 01265 //else using value from Service_Participant 01266 01267 // enable the type specific part of this DataReader 01268 this->enable_specific(); 01269 01270 //Note: the QoS used to set n_chunks_ is Changable=No so 01271 // it is OK that we cannot change the size of our allocators. 01272 rd_allocator_ = new ReceivedDataAllocator(n_chunks_); 01273 01274 if (DCPS_debug_level >= 2) 01275 ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable" 01276 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01277 rd_allocator_, n_chunks_)); 01278 01279 if ((qos_.liveliness.lease_duration.sec != 01280 DDS::DURATION_INFINITE_SEC) && 01281 (qos_.liveliness.lease_duration.nanosec != 01282 DDS::DURATION_INFINITE_NSEC)) { 01283 liveliness_lease_duration_ = 01284 duration_to_time_value(qos_.liveliness.lease_duration); 01285 } 01286 01287 // Setup the requested deadline watchdog if the configured deadline 01288 // period is not the default (infinite). 01289 DDS::Duration_t const deadline_period = this->qos_.deadline.period; 01290 01291 if (this->watchdog_ == 0 01292 && (deadline_period.sec != DDS::DURATION_INFINITE_SEC 01293 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) { 01294 this->watchdog_ = 01295 new RequestedDeadlineWatchdog( 01296 this->sample_lock_, 01297 this->qos_.deadline, 01298 this, 01299 this->dr_local_objref_.in(), 01300 this->requested_deadline_missed_status_, 01301 this->last_deadline_missed_total_count_); 01302 } 01303 01304 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 01305 disco->pre_reader(this); 01306 01307 this->set_enabled(); 01308 01309 if (topic_servant_ && !transport_disabled_) { 01310 01311 try { 01312 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS, 01313 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 01314 } catch (const Transport::Exception&) { 01315 ACE_ERROR((LM_ERROR, 01316 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ") 01317 ACE_TEXT("Transport Exception.\n"))); 01318 return DDS::RETCODE_ERROR; 01319 01320 } 01321 01322 const TransportLocatorSeq& trans_conf_info = this->connection_info(); 01323 01324 CORBA::String_var filterClassName = ""; 01325 CORBA::String_var filterExpression = ""; 01326 DDS::StringSeq exprParams; 01327 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01328 DDS::ContentFilteredTopic_var cft = this->get_cf_topic(); 01329 if (cft) { 01330 OpenDDS::DCPS::ContentFilteredTopicImpl* impl = 01331 dynamic_cast<OpenDDS::DCPS::ContentFilteredTopicImpl*>(cft.in()); 01332 if (impl) { 01333 filterClassName = impl->get_filter_class_name(); 01334 } 01335 filterExpression = cft->get_filter_expression(); 01336 cft->get_expression_parameters(exprParams); 01337 } 01338 #endif 01339 01340 DDS::SubscriberQos sub_qos; 01341 this->subscriber_servant_->get_qos(sub_qos); 01342 01343 this->subscription_id_ = 01344 disco->add_subscription(this->domain_id_, 01345 this->participant_servant_->get_id(), 01346 this->topic_servant_->get_id(), 01347 this, 01348 this->qos_, 01349 trans_conf_info, 01350 sub_qos, 01351 filterClassName, 01352 filterExpression, 01353 exprParams); 01354 01355 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) { 01356 ACE_ERROR((LM_ERROR, 01357 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ") 01358 ACE_TEXT("add_subscription returned invalid id.\n"))); 01359 return DDS::RETCODE_ERROR; 01360 } 01361 } 01362 01363 if (topic_servant_) { 01364 const CORBA::String_var name = topic_servant_->get_name(); 01365 DDS::ReturnCode_t return_value = 01366 this->subscriber_servant_->reader_enabled(name.in(), this); 01367 01368 if (this->monitor_) { 01369 this->monitor_->report(); 01370 } 01371 01372 return return_value; 01373 } else { 01374 return DDS::RETCODE_OK; 01375 } 01376 }
void OpenDDS::DCPS::DataReaderImpl::enable_filtering | ( | ContentFilteredTopicImpl * | cft | ) |
Definition at line 3254 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader(), content_filtered_topic_, and OpenDDS::DCPS::TopicDescriptionImpl::update_reader_count().
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().
03255 { 03256 cft->update_reader_count(true); 03257 cft->add_reader(*this); 03258 content_filtered_topic_ = DDS::ContentFilteredTopic::_duplicate(cft); 03259 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable_specific | ( | ) | [protected, pure virtual] |
void OpenDDS::DCPS::DataReaderImpl::end_access | ( | ) |
Definition at line 3209 of file DataReaderImpl.cpp.
References coherent_, group_coherent_ordered_data_, post_read_or_take(), OpenDDS::DCPS::GroupRakeData::reset(), and sample_lock_.
03210 { 03211 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03212 this->coherent_ = false; 03213 this->group_coherent_ordered_data_.reset(); 03214 this->post_read_or_take(); 03215 }
bool OpenDDS::DCPS::DataReaderImpl::filter_instance | ( | SubscriptionInstance * | instance, | |
const PublicationId & | pubid | |||
) | [protected] |
Definition at line 2759 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, OpenDDS::DCPS::InstanceState::get_owner(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::SubscriptionInstance::last_accepted_, OPENDDS_STRING, qos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, DDS::DataReaderQos::time_based_filter, OpenDDS::DCPS::time_value_to_duration(), and writers_.
02761 { 02762 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02763 if (this->is_exclusive_ownership_) { 02764 02765 WriterMapType::iterator iter = writers_.find(pubid); 02766 02767 if (iter == writers_.end()) { 02768 if (DCPS_debug_level > 4) { 02769 // This may not be an error since it could happen that the sample 02770 // is delivered to the datareader after the write is dis-associated 02771 // with this datareader. 02772 GuidConverter reader_converter(subscription_id_); 02773 GuidConverter writer_converter(pubid); 02774 ACE_DEBUG((LM_DEBUG, 02775 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ") 02776 ACE_TEXT("reader %C is not associated with writer %C.\n"), 02777 OPENDDS_STRING(reader_converter).c_str(), 02778 OPENDDS_STRING(writer_converter).c_str())); 02779 } 02780 return true; 02781 } 02782 02783 02784 // Evaulate the owner of the instance if not selected and filter 02785 // current message if it's not from owner writer. 02786 if ( instance->instance_state_.get_owner () == GUID_UNKNOWN 02787 || ! iter->second->is_owner_evaluated (instance->instance_handle_)) { 02788 bool is_owner = this->owner_manager_->select_owner ( 02789 instance->instance_handle_, 02790 iter->second->writer_id_, 02791 iter->second->writer_qos_.ownership_strength.value, 02792 &instance->instance_state_); 02793 iter->second->set_owner_evaluated (instance->instance_handle_, true); 02794 02795 if (! is_owner) { 02796 if (DCPS_debug_level >= 1) { 02797 GuidConverter reader_converter(subscription_id_); 02798 GuidConverter writer_converter(pubid); 02799 GuidConverter owner_converter (instance->instance_state_.get_owner ()); 02800 ACE_DEBUG((LM_DEBUG, 02801 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ") 02802 ACE_TEXT("reader %C writer %C is not elected as owner %C\n"), 02803 OPENDDS_STRING(reader_converter).c_str(), 02804 OPENDDS_STRING(writer_converter).c_str(), 02805 OPENDDS_STRING(owner_converter).c_str())); 02806 } 02807 return true; 02808 } 02809 } 02810 else if (! (instance->instance_state_.get_owner () == pubid)) { 02811 if (DCPS_debug_level >= 1) { 02812 GuidConverter reader_converter(subscription_id_); 02813 GuidConverter writer_converter(pubid); 02814 GuidConverter owner_converter (instance->instance_state_.get_owner ()); 02815 ACE_DEBUG((LM_DEBUG, 02816 ACE_TEXT("(%P|%t) DataReaderImpl::filter_instance: ") 02817 ACE_TEXT("reader %C writer %C is not owner %C\n"), 02818 OPENDDS_STRING(reader_converter).c_str(), 02819 OPENDDS_STRING(writer_converter).c_str(), 02820 OPENDDS_STRING(owner_converter).c_str())); 02821 } 02822 return true; 02823 } 02824 } 02825 #else 02826 ACE_UNUSED_ARG(pubid); 02827 #endif 02828 02829 ACE_Time_Value now(ACE_OS::gettimeofday()); 02830 02831 // TIME_BASED_FILTER processing; expire data samples 02832 // if minimum separation is not met for instance. 02833 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC }; 02834 02835 if (this->qos_.time_based_filter.minimum_separation > zero) { 02836 DDS::Duration_t separation = 02837 time_value_to_duration(now - instance->last_accepted_); 02838 02839 if (separation < this->qos_.time_based_filter.minimum_separation) { 02840 return true; // Data filtered. 02841 } 02842 } 02843 02844 instance->last_accepted_ = now; 02845 02846 return false; 02847 }
bool OpenDDS::DCPS::DataReaderImpl::filter_sample | ( | const DataSampleHeader & | header | ) | [protected] |
Definition at line 2710 of file DataReaderImpl.cpp.
References always_get_history_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, header, qos_, OpenDDS::DCPS::time_to_time_value(), and DDS::VOLATILE_DURABILITY_QOS.
02711 { 02712 ACE_Time_Value now(ACE_OS::gettimeofday()); 02713 02714 // Expire historic data if QoS indicates VOLATILE. 02715 if (!always_get_history_ && header.historic_sample_ 02716 && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) { 02717 if (DCPS_debug_level >= 8) { 02718 ACE_DEBUG((LM_DEBUG, 02719 ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ") 02720 ACE_TEXT("Discarded historic data.\n"))); 02721 } 02722 02723 return true; // Data filtered. 02724 } 02725 02726 // The LIFESPAN_DURATION_FLAG is set when sample data is sent 02727 // with a non-default LIFESPAN duration value. 02728 if (header.lifespan_duration_) { 02729 // Finite lifespan. Check if data has expired. 02730 02731 DDS::Time_t const tmp = { 02732 header.source_timestamp_sec_ + header.lifespan_duration_sec_, 02733 header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_ 02734 }; 02735 02736 // We assume that the publisher host's clock and subcriber host's 02737 // clock are synchronized (allowed by the spec). 02738 ACE_Time_Value const expiration_time( 02739 OpenDDS::DCPS::time_to_time_value(tmp)); 02740 02741 if (now >= expiration_time) { 02742 if (DCPS_debug_level >= 8) { 02743 ACE_Time_Value const diff(now - expiration_time); 02744 ACE_DEBUG((LM_DEBUG, 02745 ACE_TEXT("OpenDDS (%P|%t) Received data ") 02746 ACE_TEXT("expired by %d seconds, %d microseconds.\n"), 02747 diff.sec(), 02748 diff.usec())); 02749 } 02750 02751 return true; // Data filtered. 02752 } 02753 } 02754 02755 return false; 02756 }
DDS::ContentFilteredTopic_ptr OpenDDS::DCPS::DataReaderImpl::get_cf_topic | ( | ) | const |
Definition at line 3262 of file DataReaderImpl.cpp.
References content_filtered_topic_.
Referenced by enable(), and get_topicdescription().
03263 { 03264 return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_); 03265 }
CORBA::Long OpenDDS::DCPS::DataReaderImpl::get_depth | ( | ) | const [inline] |
OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_dp_id | ( | ) |
Definition at line 2960 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::get_id(), and participant_servant_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02961 { 02962 return this->participant_servant_->get_id(); 02963 }
ACE_INLINE DDS::DataReader_ptr OpenDDS::DCPS::DataReaderImpl::get_dr_obj_ref | ( | ) |
Definition at line 10 of file DataReaderImpl.inl.
References dr_local_objref_.
00011 { 00012 return DDS::DataReader::_duplicate(dr_local_objref_.in()) ; 00013 }
SubscriptionInstance * OpenDDS::DCPS::DataReaderImpl::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) | [protected] |
Definition at line 2538 of file DataReaderImpl.cpp.
References instances_.
Referenced by release_instance().
02539 { 02540 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, 0); 02541 02542 SubscriptionInstanceMapType::iterator iter = instances_.find(handle); 02543 if (iter == instances_.end()) { 02544 ACE_DEBUG((LM_WARNING, 02545 ACE_TEXT("(%P|%t) WARNING: ") 02546 ACE_TEXT("DataReaderImpl::get_handle_instance: ") 02547 ACE_TEXT("lookup for 0x%x failed\n"), 02548 handle)); 02549 return 0; 02550 } // if (0 != instances_.find(handle, instance)) 02551 02552 return iter->second; 02553 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 289 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
00290 { 00291 return this->participant_servant_->id_to_handle(subscription_id_); 00292 }
void OpenDDS::DCPS::DataReaderImpl::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 2966 of file DataReaderImpl.cpp.
References instances_, and sample_lock_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02967 { 02968 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 02969 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02970 02971 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 02972 end = instances_.end(); iter != end; ++iter) { 02973 instance_handles.push_back(iter->first); 02974 } 02975 }
void OpenDDS::DCPS::DataReaderImpl::get_latency_stats | ( | OpenDDS::DCPS::LatencyStatisticsSeq & | stats | ) | [virtual] |
Definition at line 2491 of file DataReaderImpl.cpp.
References statistics_.
02493 { 02494 stats.length(static_cast<CORBA::ULong>(this->statistics_.size())); 02495 int index = 0; 02496 02497 for (StatsMapType::const_iterator current = this->statistics_.begin(); 02498 current != this->statistics_.end(); 02499 ++current, ++index) { 02500 stats[ index] = current->second.get_stats(); 02501 stats[ index].publication = current->first; 02502 } 02503 }
DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_listener | ( | ) | [virtual] |
Definition at line 1028 of file DataReaderImpl.cpp.
References listener_.
01029 { 01030 return DDS::DataReaderListener::_duplicate(listener_.in()); 01031 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_liveliness_changed_status | ( | DDS::LivelinessChangedStatus & | status | ) | [virtual] |
Definition at line 1062 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count_change, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().
01064 { 01065 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 01066 01067 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, 01068 false); 01069 status = liveliness_changed_status_; 01070 01071 liveliness_changed_status_.alive_count_change = 0; 01072 liveliness_changed_status_.not_alive_count_change = 0; 01073 01074 return DDS::RETCODE_OK; 01075 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publication_data | ( | DDS::PublicationBuiltinTopicData & | publication_data, | |
DDS::InstanceHandle_t | publication_handle | |||
) | [virtual] |
Definition at line 1186 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01189 { 01190 if (enabled_ == false) { 01191 ACE_ERROR_RETURN((LM_ERROR, 01192 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::") 01193 ACE_TEXT("get_matched_publication_data: ") 01194 ACE_TEXT("Entity is not enabled. \n")), 01195 DDS::RETCODE_NOT_ENABLED); 01196 } 01197 01198 01199 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01200 guard, 01201 this->publication_handle_lock_, 01202 DDS::RETCODE_ERROR); 01203 01204 01205 BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader, 01206 DDS::PublicationBuiltinTopicDataDataReader_var, 01207 DDS::PublicationBuiltinTopicDataSeq > hh; 01208 01209 DDS::PublicationBuiltinTopicDataSeq data; 01210 01211 DDS::ReturnCode_t ret 01212 = hh.instance_handle_to_bit_data(participant_servant_, 01213 BUILT_IN_PUBLICATION_TOPIC, 01214 publication_handle, 01215 data); 01216 01217 if (ret == DDS::RETCODE_OK) { 01218 publication_data = data[0]; 01219 } 01220 01221 return ret; 01222 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publications | ( | DDS::InstanceHandleSeq & | publication_handles | ) | [virtual] |
Definition at line 1155 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01157 { 01158 if (enabled_ == false) { 01159 ACE_ERROR_RETURN((LM_ERROR, 01160 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ") 01161 ACE_TEXT(" Entity is not enabled. \n")), 01162 DDS::RETCODE_NOT_ENABLED); 01163 } 01164 01165 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01166 guard, 01167 this->publication_handle_lock_, 01168 DDS::RETCODE_ERROR); 01169 01170 // Copy out the handles for the current set of publications. 01171 int index = 0; 01172 publication_handles.length(static_cast<CORBA::ULong>(this->id_to_handle_map_.size())); 01173 01174 for (RepoIdToHandleMap::iterator 01175 current = this->id_to_handle_map_.begin(); 01176 current != this->id_to_handle_map_.end(); 01177 ++current, ++index) { 01178 publication_handles[ index] = current->second; 01179 } 01180 01181 return DDS::RETCODE_OK; 01182 }
size_t OpenDDS::DCPS::DataReaderImpl::get_n_chunks | ( | ) | const [inline] |
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_next_handle | ( | const DDS::BuiltinTopicKey_t & | key | ) | [protected] |
Get an instance handle for a new instance.
Definition at line 2556 of file DataReaderImpl.cpp.
References domain_id_, get_topic_name(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), is_bit(), participant_servant_, and TheServiceParticipant.
02557 { 02558 if (is_bit()) { 02559 Discovery_rch disc = TheServiceParticipant->get_discovery(domain_id_); 02560 CORBA::String_var topic = get_topic_name(); 02561 RepoId id = disc->bit_key_to_repo_id(participant_servant_, topic, key); 02562 return participant_servant_->id_to_handle(id); 02563 02564 } else { 02565 return participant_servant_->id_to_handle(GUID_UNKNOWN); 02566 } 02567 }
void OpenDDS::DCPS::DataReaderImpl::get_ordered_data | ( | GroupRakeData & | data, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) |
Definition at line 3218 of file DataReaderImpl.cpp.
References group_coherent_ordered_data_, OpenDDS::DCPS::GroupRakeData::insert_sample(), instances_, and sample_lock_.
03222 { 03223 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03224 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 03225 03226 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 03227 iter != instances_.end(); ++iter) { 03228 SubscriptionInstance *ptr = iter->second; 03229 if ((ptr->instance_state_.view_state() & view_states) && 03230 (ptr->instance_state_.instance_state() & instance_states)) { 03231 size_t i(0); 03232 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 03233 item != 0; item = item->next_data_sample_) { 03234 if ((item->sample_state_ & sample_states) && !item->coherent_change_) { 03235 data.insert_sample(item, ptr, ++i); 03236 this->group_coherent_ordered_data_.insert_sample(item, ptr, ++i); 03237 } 03238 } 03239 } 03240 } 03241 }
Priority OpenDDS::DCPS::DataReaderImpl::get_priority_value | ( | const AssociationData & | data | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 666 of file DataReaderImpl.h.
References OpenDDS::DCPS::AssociationData::publication_transport_priority_.
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_qos | ( | DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 1011 of file DataReaderImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::InstanceState::schedule_release().
01013 { 01014 qos = qos_; 01015 return DDS::RETCODE_OK; 01016 }
ACE_Reactor_Timer_Interface * OpenDDS::DCPS::DataReaderImpl::get_reactor | ( | ) |
Definition at line 2948 of file DataReaderImpl.cpp.
References reactor_.
Referenced by OpenDDS::DCPS::InstanceState::cancel_release(), and OpenDDS::DCPS::InstanceState::schedule_release().
02949 { 02950 return this->reactor_; 02951 }
const RepoId& OpenDDS::DCPS::DataReaderImpl::get_repo_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 663 of file DataReaderImpl.h.
Referenced by data_received(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout().
00663 { return this->subscription_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_deadline_missed_status | ( | DDS::RequestedDeadlineMissedStatus & | status | ) | [virtual] |
Definition at line 1078 of file DataReaderImpl.cpp.
References last_deadline_missed_total_count_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, requested_deadline_missed_status_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.
01080 { 01081 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 01082 01083 set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS, 01084 false); 01085 01086 this->requested_deadline_missed_status_.total_count_change = 01087 this->requested_deadline_missed_status_.total_count 01088 - this->last_deadline_missed_total_count_; 01089 01090 // DDS::RequestedDeadlineMissedStatus::last_instance_handle field 01091 // is updated by the RequestedDeadlineWatchdog. 01092 01093 // Update for next status check. 01094 this->last_deadline_missed_total_count_ = 01095 this->requested_deadline_missed_status_.total_count; 01096 01097 status = requested_deadline_missed_status_; 01098 01099 return DDS::RETCODE_OK; 01100 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_incompatible_qos_status | ( | DDS::RequestedIncompatibleQosStatus & | status | ) | [virtual] |
Definition at line 1103 of file DataReaderImpl.cpp.
References DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::RequestedIncompatibleQosStatus::total_count_change.
01105 { 01106 01107 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe( 01108 this->publication_handle_lock_); 01109 01110 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 01111 false); 01112 status = requested_incompatible_qos_status_; 01113 requested_incompatible_qos_status_.total_count_change = 0; 01114 01115 return DDS::RETCODE_OK; 01116 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_lost_status | ( | DDS::SampleLostStatus & | status | ) | [virtual] |
Definition at line 1135 of file DataReaderImpl.cpp.
References DDS::RETCODE_OK, DDS::SAMPLE_LOST_STATUS, sample_lost_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleLostStatus::total_count_change.
01137 { 01138 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 01139 01140 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false); 01141 status = sample_lost_status_; 01142 sample_lost_status_.total_count_change = 0; 01143 return DDS::RETCODE_OK; 01144 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_rejected_status | ( | DDS::SampleRejectedStatus & | status | ) | [virtual] |
Definition at line 1050 of file DataReaderImpl.cpp.
References DDS::RETCODE_OK, DDS::SAMPLE_REJECTED_STATUS, sample_rejected_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleRejectedStatus::total_count_change.
01052 { 01053 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_); 01054 01055 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false); 01056 status = sample_rejected_status_; 01057 sample_rejected_status_.total_count_change = 0; 01058 return DDS::RETCODE_OK; 01059 }
DDS::Subscriber_ptr OpenDDS::DCPS::DataReaderImpl::get_subscriber | ( | ) | [virtual] |
Definition at line 1044 of file DataReaderImpl.cpp.
References subscriber_servant_.
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::DRMonitorImpl::report().
01045 { 01046 return DDS::Subscriber::_duplicate(subscriber_servant_); 01047 }
SubscriberImpl * OpenDDS::DCPS::DataReaderImpl::get_subscriber_servant | ( | ) | [protected] |
Definition at line 1811 of file DataReaderImpl.cpp.
References subscriber_servant_.
Referenced by post_read_or_take().
01812 { 01813 return subscriber_servant_; 01814 }
RepoId OpenDDS::DCPS::DataReaderImpl::get_subscription_id | ( | ) | const |
Definition at line 1816 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WriterInfoListener::subscription_id_.
Referenced by OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DRPeriodicMonitorImpl::report(), and OpenDDS::DCPS::DRMonitorImpl::report().
01817 { 01818 return subscription_id_; 01819 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_subscription_matched_status | ( | DDS::SubscriptionMatchedStatus & | status | ) | [virtual] |
Definition at line 1119 of file DataReaderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count_change, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, and DDS::SubscriptionMatchedStatus::total_count_change.
01121 { 01122 01123 ACE_Guard<ACE_Recursive_Thread_Mutex> justMe( 01124 this->publication_handle_lock_); 01125 01126 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false); 01127 status = subscription_match_status_; 01128 subscription_match_status_.total_count_change = 0; 01129 subscription_match_status_.current_count_change = 0; 01130 01131 return DDS::RETCODE_OK; 01132 }
OpenDDS::DCPS::RepoId OpenDDS::DCPS::DataReaderImpl::get_topic_id | ( | ) |
Definition at line 2954 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TopicImpl::get_id(), and topic_servant_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02955 { 02956 return this->topic_servant_->get_id(); 02957 }
char * OpenDDS::DCPS::DataReaderImpl::get_topic_name | ( | ) | const |
Definition at line 1822 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TopicDescriptionImpl::get_name(), and topic_servant_.
Referenced by get_next_handle().
01823 { 01824 return topic_servant_->get_name(); 01825 }
DDS::TopicDescription_ptr OpenDDS::DCPS::DataReaderImpl::get_topicdescription | ( | ) | [virtual] |
Definition at line 1033 of file DataReaderImpl.cpp.
References get_cf_topic(), and topic_desc_.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
01034 { 01035 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01036 DDS::ContentFilteredTopic_ptr cft = this->get_cf_topic(); 01037 if (cft) { 01038 return cft; // get_cf_topic has already _duplicated() 01039 } 01040 #endif 01041 return DDS::TopicDescription::_duplicate(topic_desc_.in()); 01042 }
void OpenDDS::DCPS::DataReaderImpl::get_writer_states | ( | WriterStatePairVec & | writer_states | ) |
Definition at line 2978 of file DataReaderImpl.cpp.
References writers_.
Referenced by OpenDDS::DCPS::DRMonitorImpl::report().
02979 { 02980 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 02981 read_guard, 02982 this->writers_lock_); 02983 for (WriterMapType::iterator iter = writers_.begin(); 02984 iter != writers_.end(); 02985 ++iter) { 02986 writer_states.push_back(WriterStatePair(iter->first, 02987 iter->second->get_state())); 02988 } 02989 }
bool OpenDDS::DCPS::DataReaderImpl::has_readcondition | ( | DDS::ReadCondition_ptr | a_condition | ) | [protected] |
Definition at line 916 of file DataReaderImpl.cpp.
References read_conditions_.
00917 { 00918 //sample lock already held 00919 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition); 00920 return read_conditions_.find(rc) != read_conditions_.end(); 00921 }
bool OpenDDS::DCPS::DataReaderImpl::have_instance_states | ( | DDS::InstanceStateMask | instance_states | ) | const |
Definition at line 1870 of file DataReaderImpl.cpp.
References instances_.
01872 { 01873 //!!!caller should have acquired sample_lock_ 01874 /// @TODO: determine correct failed lock return value. 01875 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01876 01877 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01878 iter != instances_.end(); 01879 ++iter) { 01880 SubscriptionInstance *ptr = iter->second; 01881 01882 if (ptr->instance_state_.instance_state() & instance_states) { 01883 return true; 01884 } 01885 } 01886 01887 return false; 01888 }
bool OpenDDS::DCPS::DataReaderImpl::have_sample_states | ( | DDS::SampleStateMask | sample_states | ) | const |
Definition at line 1827 of file DataReaderImpl.cpp.
References instances_.
01829 { 01830 //!!!caller should have acquired sample_lock_ 01831 /// @TODO: determine correct failed lock return value. 01832 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false); 01833 01834 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01835 iter != instances_.end(); 01836 ++iter) { 01837 SubscriptionInstance *ptr = iter->second; 01838 01839 for (ReceivedDataElement *item = ptr->rcvd_samples_.head_; 01840 item != 0; item = item->next_data_sample_) { 01841 if (item->sample_state_ & sample_states) { 01842 return true; 01843 } 01844 } 01845 } 01846 01847 return false; 01848 }
bool OpenDDS::DCPS::DataReaderImpl::have_view_states | ( | DDS::ViewStateMask | view_states | ) | const |
Definition at line 1851 of file DataReaderImpl.cpp.
References instances_.
01852 { 01853 //!!!caller should have acquired sample_lock_ 01854 /// @TODO: determine correct failed lock return value. 01855 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,false); 01856 01857 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01858 iter != instances_.end(); 01859 ++iter) { 01860 SubscriptionInstance *ptr = iter->second; 01861 01862 if (ptr->instance_state_.view_state() & view_states) { 01863 return true; 01864 } 01865 } 01866 01867 return false; 01868 }
void OpenDDS::DCPS::DataReaderImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 829 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.
00830 { 00831 topic_servant_->inconsistent_topic(); 00832 }
void OpenDDS::DCPS::DataReaderImpl::init | ( | TopicDescriptionImpl * | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
DDS::DataReaderListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant, | |||
SubscriberImpl * | subscriber, | |||
DDS::DataReader_ptr | dr_objref | |||
) |
Definition at line 227 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, domain_id_, dr_local_objref_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), initialized_, is_bit_, is_exclusive_ownership_, listener_, listener_mask_, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant_servant_, qos_, DDS::RETCODE_OK, subscriber_servant_, topic_desc_, and topic_servant_.
Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr(), OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00235 { 00236 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc); 00237 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) { 00238 topic_servant_ = a_topic; 00239 topic_servant_->_add_ref(); 00240 00241 topic_servant_->add_entity_ref(); 00242 } 00243 00244 CORBA::String_var topic_name = a_topic_desc->get_name(); 00245 00246 #if !defined (DDS_HAS_MINIMUM_BIT) 00247 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0 00248 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0 00249 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0 00250 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0; 00251 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00252 00253 qos_ = qos; 00254 00255 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00256 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS; 00257 #endif 00258 00259 listener_ = DDS::DataReaderListener::_duplicate(a_listener); 00260 listener_mask_ = mask; 00261 00262 // Only store the participant pointer, since it is our "grand" 00263 // parent, we will exist as long as it does 00264 participant_servant_ = participant; 00265 00266 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00267 if (is_exclusive_ownership_) { 00268 owner_manager_ = participant_servant_->ownership_manager (); 00269 } 00270 #endif 00271 00272 domain_id_ = participant_servant_->get_domain_id(); 00273 00274 // Only store the subscriber pointer, since it is our parent, we 00275 // will exist as long as it does. 00276 subscriber_servant_ = subscriber; 00277 dr_local_objref_ = DDS::DataReader::_duplicate(dr_objref); 00278 00279 if (this->subscriber_servant_->get_qos(this->subqos_) != ::DDS::RETCODE_OK) { 00280 ACE_DEBUG((LM_WARNING, 00281 ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ") 00282 ACE_TEXT("failed to get SubscriberQos\n"))); 00283 } 00284 00285 initialized_ = true; 00286 }
void OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [private] |
Definition at line 2365 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count, instances_, liveliness_changed_status_, and OpenDDS::DCPS::WriterInfo::writer_id_.
Referenced by writer_became_dead(), and writer_removed().
02367 { 02368 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02369 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 02370 next = iter; iter != instances_.end(); iter = next) { 02371 ++next; 02372 iter->second->instance_state_.writer_became_dead( 02373 info.writer_id_, liveliness_changed_status_.alive_count, when); 02374 } 02375 }
bool OpenDDS::DCPS::DataReaderImpl::is_bit | ( | ) | const |
Definition at line 2849 of file DataReaderImpl.cpp.
References is_bit_.
Referenced by get_next_handle().
02850 { 02851 return this->is_bit_; 02852 }
void OpenDDS::DCPS::DataReaderImpl::listener_add_ref | ( | ) | [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 680 of file DataReaderImpl.h.
DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::listener_for | ( | DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change. If this datareader has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/subscriber.
Definition at line 1921 of file DataReaderImpl.cpp.
References listener_, OpenDDS::DCPS::SubscriberImpl::listener_for(), listener_mask_, and subscriber_servant_.
Referenced by coherent_changes_completed(), OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().
01922 { 01923 // per 2.1.4.3.1 Listener Access to Plain Communication Status 01924 // use this entities factory if listener is mask not enabled 01925 // for this kind. 01926 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) { 01927 return subscriber_servant_->listener_for(kind); 01928 01929 } else { 01930 return DDS::DataReaderListener::_duplicate(listener_.in()); 01931 } 01932 }
void OpenDDS::DCPS::DataReaderImpl::listener_remove_ref | ( | ) | [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 681 of file DataReaderImpl.h.
void OpenDDS::DCPS::DataReaderImpl::liveliness_lost | ( | ) |
virtual void OpenDDS::DCPS::DataReaderImpl::lookup_instance | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance *& | instance | |||
) | [pure virtual] |
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::lookup_instance_generic | ( | const void * | data | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >.
Referenced by OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().
bool OpenDDS::DCPS::DataReaderImpl::lookup_instance_handles | ( | const WriterIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the publication repo ids.
Definition at line 2679 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.
Referenced by notify_latency(), notify_subscription_disconnected(), and notify_subscription_lost().
02681 { 02682 if (DCPS_debug_level > 9) { 02683 CORBA::ULong const size = ids.length(); 02684 const char* separator = ""; 02685 OPENDDS_STRING guids; 02686 02687 for (unsigned long i = 0; i < size; ++i) { 02688 guids += separator; 02689 guids += OPENDDS_STRING(GuidConverter(ids[i])); 02690 separator = ", "; 02691 } 02692 02693 ACE_DEBUG((LM_DEBUG, 02694 ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ") 02695 ACE_TEXT("searching for handles for writer Ids: %C.\n"), 02696 guids.c_str())); 02697 } 02698 02699 CORBA::ULong const num_wrts = ids.length(); 02700 hdls.length(num_wrts); 02701 02702 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 02703 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 02704 } 02705 02706 return true; 02707 }
void OpenDDS::DCPS::DataReaderImpl::notify_connection_deleted | ( | const RepoId & | peerId | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2666 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion().
02667 { 02668 DBG_ENTRY_LVL("DataReaderImpl","notify_connection_deleted",6); 02669 on_notification_of_connection_deletion(peerId); 02670 // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener 02671 // is given to this DataWriter then narrow() fails. 02672 DataReaderListener_var the_listener = DataReaderListener::_narrow(this->listener_.in()); 02673 02674 if (!CORBA::is_nil(the_listener.in())) 02675 the_listener->on_connection_deleted(this->dr_local_objref_.in()); 02676 }
void OpenDDS::DCPS::DataReaderImpl::notify_latency | ( | PublicationId | writer | ) |
Definition at line 2456 of file DataReaderImpl.cpp.
References budget_exceeded_status_, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, lookup_instance_handles(), OpenDDS::DCPS::BudgetExceededStatus::total_count, and OpenDDS::DCPS::BudgetExceededStatus::total_count_change.
Referenced by process_latency().
02457 { 02458 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02459 // is given to this DataReader then narrow() fails. 02460 DataReaderListener_var listener 02461 = DataReaderListener::_narrow(this->listener_.in()); 02462 02463 if (!CORBA::is_nil(listener.in())) { 02464 WriterIdSeq writerIds; 02465 writerIds.length(1); 02466 writerIds[ 0] = writer; 02467 02468 DDS::InstanceHandleSeq handles; 02469 this->lookup_instance_handles(writerIds, handles); 02470 02471 if (handles.length() >= 1) { 02472 this->budget_exceeded_status_.last_instance_handle = handles[ 0]; 02473 02474 } else { 02475 this->budget_exceeded_status_.last_instance_handle = -1; 02476 } 02477 02478 ++this->budget_exceeded_status_.total_count; 02479 ++this->budget_exceeded_status_.total_count_change; 02480 02481 listener->on_budget_exceeded( 02482 this->dr_local_objref_.in(), 02483 this->budget_exceeded_status_); 02484 02485 this->budget_exceeded_status_.total_count_change = 0; 02486 } 02487 }
void OpenDDS::DCPS::DataReaderImpl::notify_liveliness_change | ( | ) |
Definition at line 2878 of file DataReaderImpl.cpp.
References DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DCPS_debug_level, dr_local_objref_, listener_for(), listener_mask_, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::to_dds_string(), and writers_.
Referenced by writer_became_alive(), writer_became_dead(), and writer_removed().
02879 { 02880 // N.B. writers_lock_ should already be acquired when 02881 // this method is called. 02882 02883 DDS::DataReaderListener_var listener 02884 = listener_for(DDS::LIVELINESS_CHANGED_STATUS); 02885 02886 if (!CORBA::is_nil(listener.in())) { 02887 listener->on_liveliness_changed(dr_local_objref_.in(), 02888 liveliness_changed_status_); 02889 02890 liveliness_changed_status_.alive_count_change = 0; 02891 liveliness_changed_status_.not_alive_count_change = 0; 02892 } 02893 notify_status_condition(); 02894 02895 if (DCPS_debug_level > 9) { 02896 OPENDDS_STRING output_str; 02897 output_str += "subscription "; 02898 output_str += OPENDDS_STRING(GuidConverter(subscription_id_)); 02899 output_str += ", listener at: 0x"; 02900 output_str += to_dds_string(this->listener_.in ()); 02901 02902 for (WriterMapType::iterator current = this->writers_.begin(); 02903 current != this->writers_.end(); 02904 ++current) { 02905 RepoId id = current->first; 02906 output_str += "\n\tNOTIFY: writer[ "; 02907 output_str += OPENDDS_STRING(GuidConverter(id)); 02908 output_str += "] == "; 02909 output_str += current->second->get_state_str(); 02910 } 02911 02912 output_str + "\n"; 02913 ACE_DEBUG((LM_DEBUG, 02914 ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ") 02915 ACE_TEXT("listener at 0x%x, mask 0x%x.\n") 02916 ACE_TEXT("\tNOTIFY: %C\n"), 02917 listener.in (), 02918 listener_mask_, 02919 output_str.c_str())); 02920 } 02921 }
void OpenDDS::DCPS::DataReaderImpl::notify_read_conditions | ( | ) | [protected] |
Data has arrived into the cache, unblock waiting ReadConditions.
Definition at line 1799 of file DataReaderImpl.cpp.
References read_conditions_, and reverse_sample_lock_.
Referenced by data_received().
01800 { 01801 //sample lock is already held 01802 ReadConditionSet local_read_conditions = read_conditions_; 01803 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01804 01805 for (ReadConditionSet::iterator it = local_read_conditions.begin(), 01806 end = local_read_conditions.end(); it != end; ++it) { 01807 dynamic_cast<ConditionImpl*>(it->in())->signal_all(); 01808 } 01809 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_disconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2570 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, lookup_instance_handles(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.
02571 { 02572 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6); 02573 02574 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02575 // is given to this DataReader then narrow() fails. 02576 DataReaderListener_var the_listener 02577 = DataReaderListener::_narrow(this->listener_.in()); 02578 02579 if (!CORBA::is_nil(the_listener.in())) { 02580 SubscriptionLostStatus status; 02581 02582 // Since this callback may come after remove_association which removes 02583 // the writer from id_to_handle map, we can ignore this error. 02584 this->lookup_instance_handles(pubids, status.publication_handles); 02585 the_listener->on_subscription_disconnected(this->dr_local_objref_.in(), 02586 status); 02587 } 02588 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 2618 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.
02619 { 02620 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6); 02621 02622 if (!this->is_bit_) { 02623 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02624 // is given to this DataReader then narrow() fails. 02625 DataReaderListener_var the_listener 02626 = DataReaderListener::_narrow(this->listener_.in()); 02627 02628 if (!CORBA::is_nil(the_listener.in())) { 02629 SubscriptionLostStatus status; 02630 02631 CORBA::ULong len = handles.length(); 02632 status.publication_handles.length(len); 02633 02634 for (CORBA::ULong i = 0; i < len; ++ i) { 02635 status.publication_handles[i] = handles[i]; 02636 } 02637 02638 the_listener->on_subscription_lost(this->dr_local_objref_.in(), 02639 status); 02640 } 02641 } 02642 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2645 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, lookup_instance_handles(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.
Referenced by remove_associations_i().
02646 { 02647 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6); 02648 02649 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02650 // is given to this DataReader then narrow() fails. 02651 DataReaderListener_var the_listener 02652 = DataReaderListener::_narrow(this->listener_.in()); 02653 02654 if (!CORBA::is_nil(the_listener.in())) { 02655 SubscriptionLostStatus status; 02656 02657 // Since this callback may come after remove_association which removes 02658 // the writer from id_to_handle map, we can ignore this error. 02659 this->lookup_instance_handles(pubids, status.publication_handles); 02660 the_listener->on_subscription_lost(this->dr_local_objref_.in(), 02661 status); 02662 } 02663 }
void OpenDDS::DCPS::DataReaderImpl::notify_subscription_reconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2591 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.
02592 { 02593 DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6); 02594 02595 if (!this->is_bit_) { 02596 // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener 02597 // is given to this DataReader then narrow() fails. 02598 DataReaderListener_var the_listener 02599 = DataReaderListener::_narrow(this->listener_.in()); 02600 02601 if (!CORBA::is_nil(the_listener.in())) { 02602 SubscriptionLostStatus status; 02603 02604 // If it's reconnected then the reader should be in id_to_handle map otherwise 02605 // log with an error. 02606 if (this->lookup_instance_handles(pubids, status.publication_handles) == false) { 02607 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::notify_subscription_reconnected: " 02608 "lookup_instance_handles failed.\n")); 02609 } 02610 02611 the_listener->on_subscription_reconnected(this->dr_local_objref_.in(), 02612 status); 02613 } 02614 } 02615 }
int OpenDDS::DCPS::DataReaderImpl::num_zero_copies | ( | ) | [virtual] |
This method is used for a precondition check of delete_datareader.
Definition at line 2855 of file DataReaderImpl.cpp.
References instances_.
02856 { 02857 int loans = 0; 02858 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 02859 guard, 02860 this->sample_lock_, 02861 1 /* assume we have loans */); 02862 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,1); 02863 02864 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 02865 iter != instances_.end(); 02866 ++iter) { 02867 SubscriptionInstance *ptr = iter->second; 02868 02869 for (OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 02870 item != 0; item = item->next_data_sample_) { 02871 loans += item->zero_copy_cnt_.value(); 02872 } 02873 } 02874 02875 return loans; 02876 }
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP | ( | DDS::InstanceHandle_t | , | |
SubscriptionInstance * | ||||
) |
Referenced by deliver_historic(), and resume_sample_processing().
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
RcHandle< WriterInfo > | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
publications writing to this reader.
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
WriterStats | , | |||
GUID_tKeyLessThan | ||||
) |
Type of collection of statistics for writers to this reader.
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET_CMP | ( | DDS::ReadCondition_var | , | |
RCCompLess | ||||
) | [private] |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | void * | ) |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | WriterStatePair | ) |
typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
Referenced by signal_liveliness().
OwnershipManager* OpenDDS::DCPS::DataReaderImpl::ownership_manager | ( | ) | const [inline] |
Definition at line 458 of file DataReaderImpl.h.
Referenced by OpenDDS::DCPS::InstanceState::~InstanceState().
00458 { return owner_manager_; }
EntityImpl * OpenDDS::DCPS::DataReaderImpl::parent | ( | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 1785 of file DataReaderImpl.cpp.
References subscriber_servant_.
01786 { 01787 return this->subscriber_servant_; 01788 }
void OpenDDS::DCPS::DataReaderImpl::post_read_or_take | ( | ) | [protected] |
Definition at line 2923 of file DataReaderImpl.cpp.
References DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, get_subscriber_servant(), and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().
Referenced by end_access().
02924 { 02925 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 02926 get_subscriber_servant()->set_status_changed_flag( 02927 DDS::DATA_ON_READERS_STATUS, false); 02928 }
void OpenDDS::DCPS::DataReaderImpl::prepare_to_delete | ( | ) | [protected] |
Definition at line 2530 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::send_final_acks(), OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().
02531 { 02532 this->set_deleted(true); 02533 this->stop_associating(); 02534 this->send_final_acks(); 02535 }
void OpenDDS::DCPS::DataReaderImpl::process_latency | ( | const ReceivedDataSample & | sample | ) |
Definition at line 2401 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::ReceivedDataSample::header_, notify_latency(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, statistics_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::time_value_to_duration().
Referenced by data_received().
02402 { 02403 StatsMapType::iterator location 02404 = this->statistics_.find(sample.header_.publication_id_); 02405 02406 if (location != this->statistics_.end()) { 02407 // This starts as the current time. 02408 ACE_Time_Value latency = ACE_OS::gettimeofday(); 02409 02410 // The time interval starts at the send end. 02411 DDS::Duration_t then = { 02412 sample.header_.source_timestamp_sec_, 02413 sample.header_.source_timestamp_nanosec_ 02414 }; 02415 02416 // latency delay in ACE_Time_Value format. 02417 latency -= duration_to_time_value(then); 02418 02419 if (this->statistics_enabled()) { 02420 location->second.add_stat(latency); 02421 } 02422 02423 if (DCPS_debug_level > 9) { 02424 ACE_DEBUG((LM_DEBUG, 02425 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ") 02426 ACE_TEXT("measured latency of %dS, %dmS for current sample.\n"), 02427 latency.sec(), 02428 latency.msec())); 02429 } 02430 02431 // Check latency against the budget. 02432 if (time_value_to_duration(latency) 02433 > this->qos_.latency_budget.duration) { 02434 this->notify_latency(sample.header_.publication_id_); 02435 } 02436 02437 } else if (DCPS_debug_level > 0) { 02438 /// NB: This message is generated contemporaneously with a similar 02439 /// message from writer_activity(). That message is not marked 02440 /// as an error, so we follow that lead and leave this as an 02441 /// informational message, guarded by debug level. This seems 02442 /// to be due to late samples (samples delivered after an 02443 /// association has been torn down). We may want to promote this 02444 /// to a warning if other conditions causing this symptom are 02445 /// discovered. 02446 GuidConverter reader_converter(subscription_id_); 02447 GuidConverter writer_converter(sample.header_.publication_id_); 02448 ACE_DEBUG((LM_DEBUG, 02449 ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ") 02450 ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"), 02451 OPENDDS_STRING(reader_converter).c_str(), 02452 OPENDDS_STRING(writer_converter).c_str())); 02453 } 02454 }
virtual void OpenDDS::DCPS::DataReaderImpl::purge_data | ( | SubscriptionInstance * | instance | ) | [protected, pure virtual] |
ACE_INLINE unsigned int & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size | ( | ) |
Configure the size of the raw data collection buffer.
Definition at line 26 of file DataReaderImpl.inl.
References raw_latency_buffer_size_.
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00027 { 00028 return this->raw_latency_buffer_size_; 00029 }
ACE_INLINE OpenDDS::DCPS::DataCollector< double >::OnFull & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type | ( | ) |
Configure the type of the raw data collection buffer.
Definition at line 33 of file DataReaderImpl.inl.
References raw_latency_buffer_type_.
Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00034 { 00035 return this->raw_latency_buffer_type_; 00036 }
ACE_INLINE const OpenDDS::DCPS::DataReaderImpl::StatsMapType & OpenDDS::DCPS::DataReaderImpl::raw_latency_statistics | ( | ) | const |
Expose the statistics container.
Definition at line 19 of file DataReaderImpl.inl.
References statistics_.
00020 { 00021 return this->statistics_; 00022 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_generic | ( | GenericBundle & | gen, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
bool | adjust_ref_count | |||
) | [pure virtual] |
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_next_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | previous_instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
void OpenDDS::DCPS::DataReaderImpl::register_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | , | |||
const TransportLocatorSeq & | , | |||
DiscoveryListener * | ||||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 3367 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::register_for_writer().
03372 { 03373 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener); 03374 }
void OpenDDS::DCPS::DataReaderImpl::reject_coherent | ( | PublicationId & | writer_id, | |
RepoId & | publisher_id | |||
) |
Definition at line 3087 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, instances_, OPENDDS_STRING, reset_coherent_info(), and sample_lock_.
Referenced by verify_coherent_changes_completion().
03089 { 03090 if (::OpenDDS::DCPS::DCPS_debug_level > 0) { 03091 GuidConverter reader (this->subscription_id_); 03092 GuidConverter writer (writer_id); 03093 GuidConverter publisher (publisher_id); 03094 ACE_DEBUG((LM_DEBUG, 03095 ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()") 03096 ACE_TEXT(" reader %C writer %C publisher %C \n"), 03097 OPENDDS_STRING(reader).c_str(), 03098 OPENDDS_STRING(writer).c_str(), 03099 OPENDDS_STRING(publisher).c_str())); 03100 } 03101 03102 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 03103 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 03104 03105 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 03106 iter != this->instances_.end(); ++iter) { 03107 iter->second->rcvd_strategy_->reject_coherent( 03108 writer_id, publisher_id); 03109 } 03110 this->reset_coherent_info (writer_id, publisher_id); 03111 }
void OpenDDS::DCPS::DataReaderImpl::release_instance | ( | DDS::InstanceHandle_t | handle | ) |
Release the instance with the handle.
Definition at line 2102 of file DataReaderImpl.cpp.
References get_handle_instance(), instances_, monitor_, owner_manager_, purge_data(), release_instance_i(), OpenDDS::DCPS::OwnershipManager::remove_writers(), and OpenDDS::DCPS::Monitor::report().
Referenced by OpenDDS::DCPS::InstanceState::release().
02103 { 02104 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02105 if (this->is_exclusive_ownership_) { 02106 this->owner_manager_->remove_writers (handle); 02107 } 02108 #endif 02109 02110 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 02111 SubscriptionInstance* instance = this->get_handle_instance(handle); 02112 02113 if (instance == 0) { 02114 ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance " 02115 "could not find the instance by handle 0x%x\n", handle)); 02116 return; 02117 } 02118 02119 this->purge_data(instance); 02120 02121 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02122 this->instances_.erase(handle); 02123 } 02124 this->release_instance_i(handle); 02125 if (this->monitor_) { 02126 this->monitor_->report(); 02127 } 02128 }
virtual void OpenDDS::DCPS::DataReaderImpl::release_instance_i | ( | DDS::InstanceHandle_t | handle | ) | [protected, pure virtual] |
Referenced by release_instance().
void OpenDDS::DCPS::DataReaderImpl::remove_all_associations | ( | ) |
Definition at line 744 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, publication_handle_lock_, remove_associations(), OpenDDS::DCPS::TransportClient::stop_associating(), and writers_.
00745 { 00746 DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6); 00747 // stop pending associations 00748 this->stop_associating(); 00749 00750 OpenDDS::DCPS::WriterIdSeq writers; 00751 int size; 00752 00753 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00754 00755 { 00756 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00757 00758 size = static_cast<int>(writers_.size()); 00759 writers.length(size); 00760 00761 WriterMapType::iterator curr_writer = writers_.begin(); 00762 WriterMapType::iterator end_writer = writers_.end(); 00763 00764 int i = 0; 00765 00766 while (curr_writer != end_writer) { 00767 writers[i++] = curr_writer->first; 00768 ++curr_writer; 00769 } 00770 } 00771 00772 try { 00773 CORBA::Boolean dont_notify_lost = 0; 00774 00775 if (0 < size) { 00776 remove_associations(writers, dont_notify_lost); 00777 } 00778 00779 } catch (const CORBA::Exception&) { 00780 ACE_DEBUG((LM_WARNING, 00781 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00782 ACE_TEXT("caught exception from remove_associations.\n"))); 00783 } 00784 }
void OpenDDS::DCPS::DataReaderImpl::remove_associations | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 531 of file DataReaderImpl.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, remove_associations_i(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and writers_.
Referenced by remove_all_associations(), and remove_or_reschedule().
00533 { 00534 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6); 00535 00536 if (writers.length() == 0) { 00537 return; 00538 } 00539 00540 if (DCPS_debug_level >= 1) { 00541 GuidConverter reader_converter(subscription_id_); 00542 GuidConverter writer_converter(writers[0]); 00543 ACE_DEBUG((LM_DEBUG, 00544 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ") 00545 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00546 is_bit_, 00547 OPENDDS_STRING(reader_converter).c_str(), 00548 OPENDDS_STRING(writer_converter).c_str(), 00549 writers.length())); 00550 } 00551 if (!this->entity_deleted_.value()) { 00552 // stop pending associations for these writer ids 00553 this->stop_associating(writers.get_buffer(), writers.length()); 00554 00555 // writers which are considered non-active and can 00556 // be removed immediately 00557 WriterIdSeq non_active_writers; 00558 { 00559 CORBA::ULong wr_len = writers.length(); 00560 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00561 00562 for (CORBA::ULong i = 0; i < wr_len; i++) { 00563 PublicationId writer_id = writers[i]; 00564 00565 WriterMapType::iterator it = this->writers_.find(writer_id); 00566 if (it != this->writers_.end() && 00567 it->second->active(TheServiceParticipant->pending_timeout())) { 00568 remove_association_sweeper_->schedule_timer(it->second, notify_lost); 00569 } else { 00570 push_back(non_active_writers, writer_id); 00571 } 00572 } 00573 } 00574 remove_associations_i(non_active_writers, notify_lost); 00575 } else { 00576 remove_associations_i(writers, notify_lost); 00577 } 00578 }
void OpenDDS::DCPS::DataReaderImpl::remove_associations_i | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [protected, virtual] |
Definition at line 601 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dr_local_objref_, end_historic_sweeper_, id_to_handle_map_, is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.
Referenced by remove_associations(), and remove_or_reschedule().
00603 { 00604 DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6); 00605 00606 if (writers.length() == 0) { 00607 return; 00608 } 00609 00610 if (DCPS_debug_level >= 1) { 00611 GuidConverter reader_converter(subscription_id_); 00612 GuidConverter writer_converter(writers[0]); 00613 ACE_DEBUG((LM_DEBUG, 00614 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ") 00615 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00616 is_bit_, 00617 OPENDDS_STRING(reader_converter).c_str(), 00618 OPENDDS_STRING(writer_converter).c_str(), 00619 writers.length())); 00620 } 00621 DDS::InstanceHandleSeq handles; 00622 00623 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00624 00625 // This is used to hold the list of writers which were actually 00626 // removed, which is a proper subset of the writers which were 00627 // requested to be removed. 00628 WriterIdSeq updated_writers; 00629 00630 CORBA::ULong wr_len; 00631 00632 //Remove the writers from writer list. If the supplied writer 00633 //is not in the cached writers list then it is already removed. 00634 //We just need remove the writers in the list that have not been 00635 //removed. 00636 { 00637 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00638 00639 wr_len = writers.length(); 00640 00641 for (CORBA::ULong i = 0; i < wr_len; i++) { 00642 PublicationId writer_id = writers[i]; 00643 00644 WriterMapType::iterator it = this->writers_.find(writer_id); 00645 00646 if (it != this->writers_.end()) { 00647 it->second->removed(); 00648 end_historic_sweeper_->cancel_timer(it->second); 00649 remove_association_sweeper_->cancel_timer(it->second); 00650 } 00651 00652 if (this->writers_.erase(writer_id) == 0) { 00653 if (DCPS_debug_level >= 1) { 00654 GuidConverter converter(writer_id); 00655 ACE_DEBUG((LM_DEBUG, 00656 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ") 00657 ACE_TEXT("the writer local %C was already removed.\n"), 00658 OPENDDS_STRING(converter).c_str())); 00659 } 00660 00661 } else { 00662 push_back(updated_writers, writer_id); 00663 } 00664 } 00665 } 00666 00667 wr_len = updated_writers.length(); 00668 00669 // Return now if the supplied writers have been removed already. 00670 if (wr_len == 0) { 00671 return; 00672 } 00673 00674 if (!is_bit_) { 00675 // The writer should be in the id_to_handle map at this time. Note 00676 // it if it not there. 00677 if (this->lookup_instance_handles(updated_writers, handles) == false) { 00678 if (DCPS_debug_level > 4) { 00679 ACE_DEBUG((LM_DEBUG, 00680 ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ") 00681 ACE_TEXT("lookup_instance_handles failed.\n"))); 00682 } 00683 } 00684 00685 for (CORBA::ULong i = 0; i < wr_len; ++i) { 00686 id_to_handle_map_.erase(updated_writers[i]); 00687 } 00688 } 00689 00690 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) { 00691 { 00692 this->disassociate(updated_writers[i]); 00693 } 00694 } 00695 00696 // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing. 00697 if (!this->is_bit_) { 00698 // Derive the change in the number of publications writing to this reader. 00699 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00700 this->subscription_match_status_.current_count_change 00701 = matchedPublications - this->subscription_match_status_.current_count; 00702 00703 // Only process status if the number of publications has changed. 00704 if (this->subscription_match_status_.current_count_change != 0) { 00705 this->subscription_match_status_.current_count = matchedPublications; 00706 00707 /// Section 7.1.4.1: total_count will not decrement. 00708 00709 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00710 this->subscription_match_status_.last_publication_handle 00711 = handles[ wr_len - 1]; 00712 00713 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00714 00715 DDS::DataReaderListener_var listener 00716 = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00717 00718 if (!CORBA::is_nil(listener.in())) { 00719 listener->on_subscription_matched( 00720 dr_local_objref_.in(), 00721 this->subscription_match_status_); 00722 00723 // Client will look at it so next time it looks the change should be 0 00724 this->subscription_match_status_.total_count_change = 0; 00725 this->subscription_match_status_.current_count_change = 0; 00726 } 00727 notify_status_condition(); 00728 } 00729 } 00730 00731 // If this remove_association is invoked when the InfoRepo 00732 // detects a lost writer then make a callback to notify 00733 // subscription lost. 00734 if (notify_lost) { 00735 this->notify_subscription_lost(handles); 00736 } 00737 00738 if (this->monitor_) { 00739 this->monitor_->report(); 00740 } 00741 }
void OpenDDS::DCPS::DataReaderImpl::remove_or_reschedule | ( | const PublicationId & | pub_id | ) | [protected] |
Definition at line 581 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::push_back(), remove_associations(), remove_associations_i(), and writers_.
00582 { 00583 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00584 WriterMapType::iterator where = writers_.find(pub_id); 00585 if (writers_.end() != where) { 00586 WriterInfo& info = *where->second; 00587 WriterIdSeq writers; 00588 push_back(writers, pub_id); 00589 bool notify = info.notify_lost_; 00590 if (info.removal_deadline_ < ACE_OS::gettimeofday()) { 00591 write_guard.release(); 00592 remove_associations_i(writers, notify); 00593 } else { 00594 write_guard.release(); 00595 remove_associations(writers, notify); 00596 } 00597 } 00598 }
void OpenDDS::DCPS::DataReaderImpl::reschedule_deadline | ( | ) |
Definition at line 2930 of file DataReaderImpl.cpp.
References instances_.
Referenced by OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline().
02931 { 02932 if (this->watchdog_) { 02933 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 02934 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin(); 02935 iter != this->instances_.end(); 02936 ++iter) { 02937 if (iter->second->deadline_timer_id_ != -1) { 02938 if (this->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) { 02939 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::reschedule_deadline %p\n"), 02940 ACE_TEXT("reset_timer_interval"))); 02941 } 02942 } 02943 } 02944 } 02945 }
void OpenDDS::DCPS::DataReaderImpl::reset_coherent_info | ( | const PublicationId & | writer_id, | |
const RepoId & | publisher_id | |||
) |
Definition at line 3114 of file DataReaderImpl.cpp.
References writers_.
Referenced by reject_coherent().
03116 { 03117 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 03118 03119 WriterMapType::iterator itEnd = this->writers_.end(); 03120 for (WriterMapType::iterator it = this->writers_.begin(); 03121 it != itEnd; ++it) { 03122 if (it->second->writer_id_ == writer_id 03123 && it->second->publisher_id_ == publisher_id) { 03124 it->second->reset_coherent_info(); 03125 } 03126 } 03127 }
void OpenDDS::DCPS::DataReaderImpl::reset_latency_stats | ( | ) | [virtual] |
Definition at line 2507 of file DataReaderImpl.cpp.
References statistics_.
02508 { 02509 for (StatsMapType::iterator current = this->statistics_.begin(); 02510 current != this->statistics_.end(); 02511 ++current) { 02512 current->second.reset_stats(); 02513 } 02514 }
void OpenDDS::DCPS::DataReaderImpl::reset_ownership | ( | ::DDS::InstanceHandle_t | instance | ) |
Definition at line 3282 of file DataReaderImpl.cpp.
References writers_.
Referenced by OpenDDS::DCPS::InstanceState::reset_ownership().
03283 { 03284 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 03285 for (WriterMapType::iterator iter = writers_.begin(); 03286 iter != writers_.end(); 03287 ++iter) { 03288 iter->second->set_owner_evaluated(instance, false); 03289 } 03290 }
void OpenDDS::DCPS::DataReaderImpl::resume_sample_processing | ( | const PublicationId & | pub_id | ) | [private] |
when done handling historic samples, resume
Definition at line 3293 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), deliver_historic(), end_historic_sweeper_, OPENDDS_MAP(), and writers_.
Referenced by add_link(), data_received(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout().
03294 { 03295 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver; 03296 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 03297 WriterMapType::iterator where = writers_.find(pub_id); 03298 if (writers_.end() != where) { 03299 WriterInfo& info = *where->second; 03300 // Stop filtering these 03301 if (info.waiting_for_end_historic_samples_) { 03302 end_historic_sweeper_->cancel_timer(where->second); 03303 if (!info.historic_samples_.empty()) { 03304 info.last_historic_seq_ = info.historic_samples_.rbegin()->first; 03305 } 03306 to_deliver.swap(info.historic_samples_); 03307 write_guard.release(); 03308 deliver_historic(to_deliver); 03309 } 03310 } 03311 }
void OpenDDS::DCPS::DataReaderImpl::sample_info | ( | DDS::SampleInfo & | sample_info, | |
const ReceivedDataElement * | ptr | |||
) | [protected] |
Definition at line 1934 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, and OpenDDS::DCPS::ReceivedDataElement::sequence_.
01936 { 01937 01938 sample_info.sample_rank = 0; 01939 01940 // generation_rank = 01941 // (MRSIC.disposed_generation_count + 01942 // MRSIC.no_writers_generation_count) 01943 // - (S.disposed_generation_count + 01944 // S.no_writers_generation_count) 01945 // 01946 sample_info.generation_rank = 01947 (sample_info.disposed_generation_count + 01948 sample_info.no_writers_generation_count) - 01949 sample_info.generation_rank; 01950 01951 // absolute_generation_rank = 01952 // (MRS.disposed_generation_count + 01953 // MRS.no_writers_generation_count) 01954 // - (S.disposed_generation_count + 01955 // S.no_writers_generation_count) 01956 // 01957 sample_info.absolute_generation_rank = 01958 (static_cast<CORBA::Long>(ptr->disposed_generation_count_) + 01959 static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) - 01960 sample_info.absolute_generation_rank; 01961 01962 sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue(); 01963 }
virtual void OpenDDS::DCPS::DataReaderImpl::set_instance_state | ( | DDS::InstanceHandle_t | instance, | |
DDS::InstanceStateKind | state | |||
) | [pure virtual] |
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available().
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_listener | ( | DDS::DataReaderListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 1018 of file DataReaderImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
01021 { 01022 listener_mask_ = mask; 01023 //note: OK to duplicate a nil object ref 01024 listener_ = DDS::DataReaderListener::_duplicate(a_listener); 01025 return DDS::RETCODE_OK; 01026 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_qos | ( | const DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 941 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::ReactorInterceptor::destroy(), domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::SubscriberImpl::get_qos(), last_deadline_missed_total_count_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, participant_servant_, qos_, requested_deadline_missed_status_, RequestedDeadlineWatchdog, OpenDDS::DCPS::Watchdog::reset_interval(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, subscriber_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.
00943 { 00944 00945 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00946 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00947 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00948 00949 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00950 if (qos_ == qos) 00951 return DDS::RETCODE_OK; 00952 00953 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00954 return DDS::RETCODE_IMMUTABLE_POLICY; 00955 00956 } else { 00957 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00958 DDS::SubscriberQos subscriberQos; 00959 this->subscriber_servant_->get_qos(subscriberQos); 00960 const bool status = 00961 disco->update_subscription_qos( 00962 this->participant_servant_->get_domain_id(), 00963 this->participant_servant_->get_id(), 00964 this->subscription_id_, 00965 qos, 00966 subscriberQos); 00967 if (!status) { 00968 ACE_ERROR_RETURN((LM_ERROR, 00969 ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ") 00970 ACE_TEXT("qos not updated. \n")), 00971 DDS::RETCODE_ERROR); 00972 } 00973 } 00974 00975 // Reset the deadline timer if the period has changed. 00976 if (qos_.deadline.period.sec != qos.deadline.period.sec 00977 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00978 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00979 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00980 this->watchdog_ = 00981 new RequestedDeadlineWatchdog( 00982 this->sample_lock_, 00983 qos.deadline, 00984 this, 00985 this->dr_local_objref_.in(), 00986 this->requested_deadline_missed_status_, 00987 this->last_deadline_missed_total_count_); 00988 00989 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00990 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00991 this->watchdog_->cancel_all(); 00992 this->watchdog_->destroy(); 00993 this->watchdog_ = 0; 00994 00995 } else { 00996 this->watchdog_->reset_interval( 00997 duration_to_time_value(qos.deadline.period)); 00998 } 00999 } 01000 01001 qos_ = qos; 01002 01003 return DDS::RETCODE_OK; 01004 01005 } else { 01006 return DDS::RETCODE_INCONSISTENT_POLICY; 01007 } 01008 }
void OpenDDS::DCPS::DataReaderImpl::set_sample_lost_status | ( | const DDS::SampleLostStatus & | status | ) | [protected] |
Definition at line 2378 of file DataReaderImpl.cpp.
References sample_lost_status_.
02380 { 02381 //!!!caller should have acquired sample_lock_ 02382 sample_lost_status_ = status; 02383 }
void OpenDDS::DCPS::DataReaderImpl::set_sample_rejected_status | ( | const DDS::SampleRejectedStatus & | status | ) | [protected] |
Definition at line 2386 of file DataReaderImpl.cpp.
References sample_rejected_status_.
02388 { 02389 //!!!caller should have acquired sample_lock_ 02390 sample_rejected_status_ = status; 02391 }
void OpenDDS::DCPS::DataReaderImpl::set_subscriber_qos | ( | const DDS::SubscriberQos & | qos | ) |
Definition at line 3246 of file DataReaderImpl.cpp.
References subqos_.
03248 { 03249 this->subqos_ = qos; 03250 }
void OpenDDS::DCPS::DataReaderImpl::signal_liveliness | ( | const RepoId & | remote_participant | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 835 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_t::guidPrefix, instances_, OPENDDS_VECTOR(), and writers_.
00836 { 00837 RepoId prefix = remote_participant; 00838 prefix.entityId = EntityId_t(); 00839 00840 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00841 00842 typedef std::pair<RepoId, RcHandle<WriterInfo> > RepoWriterPair; 00843 typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet; 00844 WriterSet writers; 00845 00846 { 00847 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00848 for (WriterMapType::iterator pos = writers_.lower_bound(prefix), 00849 limit = writers_.end(); 00850 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix); 00851 ++pos) { 00852 writers.push_back(std::make_pair(pos->first, pos->second)); 00853 } 00854 } 00855 00856 ACE_Time_Value when = ACE_OS::gettimeofday(); 00857 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00858 pos != limit; 00859 ++pos) { 00860 pos->second->received_activity(when); 00861 } 00862 00863 if (!writers.empty()) { 00864 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 00865 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00866 pos != limit; 00867 ++pos) { 00868 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 00869 iter != instances_.end(); 00870 ++iter) { 00871 SubscriptionInstance *ptr = iter->second; 00872 ptr->instance_state_.lively(pos->first); 00873 } 00874 } 00875 } 00876 }
void OpenDDS::DCPS::DataReaderImpl::statistics_enabled | ( | CORBA::Boolean | statistics_enabled | ) | [virtual] |
Definition at line 2523 of file DataReaderImpl.cpp.
References statistics_enabled_.
02525 { 02526 this->statistics_enabled_ = statistics_enabled; 02527 }
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl::statistics_enabled | ( | ) | [virtual] |
Definition at line 2517 of file DataReaderImpl.cpp.
References statistics_enabled_.
02518 { 02519 return this->statistics_enabled_; 02520 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::take | ( | AbstractSamples & | samples, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [pure virtual] |
CORBA::Long OpenDDS::DCPS::DataReaderImpl::total_samples | ( | ) | const [protected] |
Definition at line 1965 of file DataReaderImpl.cpp.
References instances_.
01966 { 01967 //!!!caller should have acquired sample_lock_ 01968 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0); 01969 01970 CORBA::Long count(0); 01971 01972 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01973 iter != instances_.end(); 01974 ++iter) { 01975 SubscriptionInstance *ptr = iter->second; 01976 01977 count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size_); 01978 } 01979 01980 return count; 01981 }
void OpenDDS::DCPS::DataReaderImpl::transport_assoc_done | ( | int | flags, | |
const RepoId & | remote_id | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 412 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, OpenDDS::DCPS::TransportClient::ASSOC_OK, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dr_local_objref_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, liveliness_timer_, monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::Monitor::report(), sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.
00413 { 00414 if (!(flags & ASSOC_OK)) { 00415 if (DCPS_debug_level) { 00416 const GuidConverter conv(remote_id); 00417 ACE_DEBUG((LM_ERROR, 00418 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00419 ACE_TEXT("ERROR: transport layer failed to associate %C\n"), 00420 OPENDDS_STRING(conv).c_str())); 00421 } 00422 return; 00423 } 00424 00425 const bool active = flags & ASSOC_ACTIVE; 00426 { 00427 00428 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00429 00430 // LIVELINESS policy timers are managed here. 00431 if (liveliness_lease_duration_ != ACE_Time_Value::zero) { 00432 if (DCPS_debug_level >= 5) { 00433 GuidConverter converter(subscription_id_); 00434 ACE_DEBUG((LM_DEBUG, 00435 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00436 ACE_TEXT("starting/resetting liveliness timer for reader %C\n"), 00437 OPENDDS_STRING(converter).c_str())); 00438 } 00439 // this call will start the timer if it is not already set 00440 liveliness_timer_->check_liveliness(); 00441 } 00442 } 00443 // We no longer hold the publication_handle_lock_. 00444 00445 if (!is_bit_) { 00446 00447 DDS::InstanceHandle_t handle = participant_servant_->id_to_handle(remote_id); 00448 00449 // We acquire the publication_handle_lock_ for the remainder of our 00450 // processing. 00451 { 00452 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_); 00453 00454 // This insertion is idempotent. 00455 id_to_handle_map_.insert( 00456 RepoIdToHandleMap::value_type(remote_id, handle)); 00457 00458 if (DCPS_debug_level > 4) { 00459 GuidConverter converter(remote_id); 00460 ACE_DEBUG((LM_DEBUG, 00461 ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ") 00462 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"), 00463 OPENDDS_STRING(converter).c_str(), 00464 handle)); 00465 } 00466 00467 // We need to adjust these after the insertions have all completed 00468 // since insertions are not guaranteed to increase the number of 00469 // currently matched publications. 00470 const int matchedPublications = static_cast<int>(id_to_handle_map_.size()); 00471 subscription_match_status_.current_count_change = 00472 matchedPublications - subscription_match_status_.current_count; 00473 subscription_match_status_.current_count = matchedPublications; 00474 00475 ++subscription_match_status_.total_count; 00476 ++subscription_match_status_.total_count_change; 00477 00478 subscription_match_status_.last_publication_handle = handle; 00479 00480 set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00481 00482 DDS::DataReaderListener_var listener = 00483 listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00484 00485 if (!CORBA::is_nil(listener)) { 00486 listener->on_subscription_matched(dr_local_objref_, 00487 subscription_match_status_); 00488 00489 // TBD - why does the spec say to change this but not change 00490 // the ChangeFlagStatus after a listener call? 00491 00492 // Client will look at it so next time it looks the change should be 0 00493 subscription_match_status_.total_count_change = 0; 00494 subscription_match_status_.current_count_change = 0; 00495 } 00496 00497 notify_status_condition(); 00498 } 00499 00500 { 00501 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 00502 ACE_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00503 00504 if(!writers_.count(remote_id)){ 00505 return; 00506 } 00507 writers_[remote_id]->handle_ = handle; 00508 } 00509 } 00510 00511 if (!active) { 00512 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00513 00514 disco->association_complete(domain_id_, participant_servant_->get_id(), 00515 subscription_id_, remote_id); 00516 } 00517 00518 if (monitor_) { 00519 monitor_->report(); 00520 } 00521 }
void OpenDDS::DCPS::DataReaderImpl::unregister_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | ||||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 3377 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::TransportClient::unregister_for_writer().
03380 { 03381 TransportClient::unregister_for_writer(participant, readerid, writerid); 03382 }
void OpenDDS::DCPS::DataReaderImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 787 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, dr_local_objref_, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00788 { 00789 DDS::DataReaderListener_var listener = 00790 listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS); 00791 00792 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00793 guard, 00794 this->publication_handle_lock_); 00795 00796 00797 if (this->requested_incompatible_qos_status_.total_count == status.total_count) { 00798 // This test should make the method idempotent. 00799 return; 00800 } 00801 00802 set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 00803 true); 00804 00805 // copy status and increment change 00806 requested_incompatible_qos_status_.total_count = status.total_count; 00807 requested_incompatible_qos_status_.total_count_change += 00808 status.count_since_last_send; 00809 requested_incompatible_qos_status_.last_policy_id = 00810 status.last_policy_id; 00811 requested_incompatible_qos_status_.policies = status.policies; 00812 00813 if (!CORBA::is_nil(listener.in())) { 00814 listener->on_requested_incompatible_qos(dr_local_objref_.in(), 00815 requested_incompatible_qos_status_); 00816 00817 // TBD - why does the spec say to change total_count_change but not 00818 // change the ChangeFlagStatus after a listener call? 00819 00820 // client just looked at it so next time it looks the 00821 // change should be 0 00822 requested_incompatible_qos_status_.total_count_change = 0; 00823 } 00824 00825 notify_status_condition(); 00826 }
void OpenDDS::DCPS::DataReaderImpl::update_ownership_strength | ( | const PublicationId & | pub_id, | |
const CORBA::Long & | ownership_strength | |||
) |
Definition at line 2993 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, and writers_.
02995 { 02996 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 02997 read_guard, 02998 this->writers_lock_); 02999 for (WriterMapType::iterator iter = writers_.begin(); 03000 iter != writers_.end(); 03001 ++iter) { 03002 if (iter->second->writer_id_ == pub_id) { 03003 if (ownership_strength != iter->second->writer_qos_.ownership_strength.value) { 03004 if (DCPS_debug_level >= 1) { 03005 GuidConverter reader_converter(this->subscription_id_); 03006 GuidConverter writer_converter(pub_id); 03007 ACE_DEBUG((LM_DEBUG, 03008 ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ") 03009 ACE_TEXT("local %C update remote %C strength from %d to %d \n"), 03010 OPENDDS_STRING(reader_converter).c_str(), 03011 OPENDDS_STRING(writer_converter).c_str(), 03012 iter->second->writer_qos_.ownership_strength, ownership_strength)); 03013 } 03014 iter->second->writer_qos_.ownership_strength.value = ownership_strength; 03015 iter->second->clear_owner_evaluated (); 03016 } 03017 break; 03018 } 03019 } 03020 }
void OpenDDS::DCPS::DataReaderImpl::update_subscription_params | ( | const DDS::StringSeq & | params | ) | const |
Definition at line 3271 of file DataReaderImpl.cpp.
References domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), participant_servant_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and TheServiceParticipant.
03272 { 03273 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 03274 disco->update_subscription_params(participant_servant_->get_domain_id(), 03275 participant_servant_->get_id(), 03276 subscription_id_, 03277 params); 03278 }
bool OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion | ( | WriterInfo * | writer | ) | [private] |
Definition at line 3024 of file DataReaderImpl.cpp.
References accept_coherent(), OpenDDS::DCPS::SubscriberImpl::coherent_change_received(), OpenDDS::DCPS::WriterInfo::coherent_change_received(), coherent_changes_completed(), OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::WriterInfo::group_coherent_, DDS::INSTANCE_PRESENTATION_QOS, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::WriterInfo::publisher_id_, reject_coherent(), OpenDDS::DCPS::REJECTED, OpenDDS::DCPS::WriterInfo::reset_coherent_info(), state, subscriber_servant_, and OpenDDS::DCPS::WriterInfo::writer_id_.
Referenced by data_received().
03025 { 03026 if (this->subqos_.presentation.access_scope == ::DDS::INSTANCE_PRESENTATION_QOS 03027 || ! this->subqos_.presentation.coherent_access) { 03028 this->accept_coherent (writer->writer_id_, writer->publisher_id_); 03029 this->coherent_changes_completed (this); 03030 return true; 03031 } 03032 03033 // verify current coherent changes from single writer 03034 Coherent_State state = writer->coherent_change_received(); 03035 if (writer->group_coherent_) { // GROUP coherent 03036 if (state != NOT_COMPLETED_YET) { 03037 // verify if all readers received complete coherent changes in a group. 03038 this->subscriber_servant_->coherent_change_received ( 03039 writer->publisher_id_, this, state); 03040 } 03041 } 03042 else { // TOPIC coherent 03043 if (state == COMPLETED) { 03044 this->accept_coherent (writer->writer_id_, writer->publisher_id_); 03045 } 03046 else if (state == REJECTED) { 03047 this->reject_coherent (writer->writer_id_, writer->publisher_id_); 03048 } 03049 else {// NOT_COMPLETED 03050 return false; 03051 } 03052 03053 // decision made: either COMPLETED or REJECTED 03054 writer->reset_coherent_info (); 03055 } 03056 03057 return state == COMPLETED; 03058 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::wait_for_historical_data | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
void OpenDDS::DCPS::DataReaderImpl::writer_activity | ( | const DataSampleHeader & | header | ) |
update liveliness info for this writer.
Definition at line 1379 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, header, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::WriterInfoListener::subscription_id_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and writers_.
Referenced by data_received().
01380 { 01381 // caller should have the sample_lock_ !!! 01382 01383 RcHandle<WriterInfo> writer; 01384 01385 // The received_activity() has to be called outside the writers_lock_ 01386 // because it probably acquire writers_lock_ read lock recursively 01387 // (in handle_timeout). This could cause deadlock when there are writers 01388 // waiting. 01389 { 01390 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 01391 WriterMapType::iterator iter = writers_.find(header.publication_id_); 01392 01393 if (iter != writers_.end()) { 01394 writer = iter->second; 01395 01396 } else if (DCPS_debug_level > 4) { 01397 // This may not be an error since it could happen that the sample 01398 // is delivered to the datareader after the write is dis-associated 01399 // with this datareader. 01400 GuidConverter reader_converter(subscription_id_); 01401 GuidConverter writer_converter(header.publication_id_); 01402 ACE_DEBUG((LM_DEBUG, 01403 ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ") 01404 ACE_TEXT("reader %C is not associated with writer %C.\n"), 01405 OPENDDS_STRING(reader_converter).c_str(), 01406 OPENDDS_STRING(writer_converter).c_str())); 01407 } 01408 } 01409 01410 if (!writer.is_nil()) { 01411 ACE_Time_Value when = ACE_OS::gettimeofday(); 01412 writer->received_activity(when); 01413 01414 if ((header.message_id_ == SAMPLE_DATA) || 01415 (header.message_id_ == INSTANCE_REGISTRATION) || 01416 (header.message_id_ == UNREGISTER_INSTANCE) || 01417 (header.message_id_ == DISPOSE_INSTANCE) || 01418 (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) { 01419 01420 const SequenceNumber defaultSN; 01421 SequenceRange resetRange(defaultSN, header.sequence_); 01422 01423 if (writer->seen_data_ && !header.sequence_repair_) { 01424 // Data samples should be acknowledged prior to any 01425 // reader-side filtering to ensure discontinuities 01426 // are not unintentionally introduced. 01427 writer->ack_sequence(header.sequence_); 01428 01429 } else { 01430 // In order to properly track out of order delivery, 01431 // a baseline must be established based on the first 01432 // data sample received. 01433 writer->seen_data_ = true; 01434 writer->ack_sequence_.reset(); 01435 writer->ack_sequence_.insert(resetRange); 01436 } 01437 01438 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01439 if (header.coherent_change_) { 01440 if (writer->coherent_samples_ == 0) { 01441 writer->coherent_sample_sequence_.reset(); 01442 writer->coherent_sample_sequence_.insert(resetRange); 01443 } 01444 else { 01445 writer->coherent_sample_sequence_.insert(header.sequence_); 01446 } 01447 } 01448 #endif 01449 } 01450 } 01451 }
void OpenDDS::DCPS::DataReaderImpl::writer_became_alive | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [virtual] |
tell instances when a DataWriter transitions to being alive The writer state is inout parameter, it has to be set ALIVE before handle_timeout is called since some subroutine use the state.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2216 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, liveliness_timer_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, OpenDDS::DCPS::Monitor::report(), reverse_sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02218 { 02219 if (DCPS_debug_level >= 5) { 02220 GuidConverter reader_converter(subscription_id_); 02221 GuidConverter writer_converter(info.writer_id_); 02222 ACE_DEBUG((LM_DEBUG, 02223 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ") 02224 ACE_TEXT("reader %C from writer %C previous state %C.\n"), 02225 OPENDDS_STRING(reader_converter).c_str(), 02226 OPENDDS_STRING(writer_converter).c_str(), 02227 info.get_state_str().c_str())); 02228 } 02229 02230 // caller should already have the samples_lock_ !!! 02231 02232 // NOTE: each instance will change to ALIVE_STATE when they receive a sample 02233 02234 bool liveliness_changed = false; 02235 02236 if (info.state_ != WriterInfo::ALIVE) { 02237 liveliness_changed_status_.alive_count++; 02238 liveliness_changed_status_.alive_count_change++; 02239 liveliness_changed = true; 02240 } 02241 02242 if (info.state_ == WriterInfo::DEAD) { 02243 liveliness_changed_status_.not_alive_count--; 02244 liveliness_changed_status_.not_alive_count_change--; 02245 liveliness_changed = true; 02246 } 02247 02248 liveliness_changed_status_.last_publication_handle = info.handle_; 02249 02250 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02251 02252 if (liveliness_changed_status_.alive_count < 0) { 02253 ACE_ERROR((LM_ERROR, 02254 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ") 02255 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"), 02256 liveliness_changed_status_.alive_count)); 02257 return; 02258 } 02259 02260 if (liveliness_changed_status_.not_alive_count < 0) { 02261 ACE_ERROR((LM_ERROR, 02262 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ") 02263 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d .\n"), 02264 liveliness_changed_status_.not_alive_count)); 02265 return; 02266 } 02267 02268 // Change the state to ALIVE since handle_timeout may call writer_became_dead 02269 // which need the current state info. 02270 info.state_ = WriterInfo::ALIVE; 02271 02272 if (this->monitor_) { 02273 this->monitor_->report(); 02274 } 02275 02276 // Call listener only when there are liveliness status changes. 02277 if (liveliness_changed) { 02278 // Avoid possible deadlock by releasing sample_lock_. 02279 // See comments in <Topic>DataDataReaderImpl::notify_status_condition_no_sample_lock() 02280 // for information about the locks involved. 02281 ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 02282 this->notify_liveliness_change(); 02283 } 02284 02285 // this call will start the liveliness timer if it is not already set 02286 liveliness_timer_->check_liveliness(); 02287 }
void OpenDDS::DCPS::DataReaderImpl::writer_became_dead | ( | WriterInfo & | info, | |
const ACE_Time_Value & | when | |||
) | [virtual] |
tell instances when a DataWriter transitions to DEAD The writer state is inout parameter, the state is set to DEAD when it returns.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2290 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, OpenDDS::DCPS::WriterInfo::NOT_SET, notify_liveliness_change(), OPENDDS_STRING, owner_manager_, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::WriterInfo::seen_data_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02292 { 02293 if (DCPS_debug_level >= 5) { 02294 GuidConverter reader_converter(subscription_id_); 02295 GuidConverter writer_converter(info.writer_id_); 02296 ACE_DEBUG((LM_DEBUG, 02297 ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ") 02298 ACE_TEXT("reader %C from writer %C previous state %C.\n"), 02299 02300 OPENDDS_STRING(reader_converter).c_str(), 02301 OPENDDS_STRING(writer_converter).c_str(), 02302 info.get_state_str().c_str())); 02303 } 02304 02305 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02306 if (this->is_exclusive_ownership_) { 02307 this->owner_manager_->remove_writer (info.writer_id_); 02308 info.clear_owner_evaluated (); 02309 } 02310 #endif 02311 02312 // caller should already have the samples_lock_ !!! 02313 bool liveliness_changed = false; 02314 02315 if (info.state_ == OpenDDS::DCPS::WriterInfo::NOT_SET) { 02316 liveliness_changed_status_.not_alive_count++; 02317 liveliness_changed_status_.not_alive_count_change++; 02318 liveliness_changed = true; 02319 } 02320 02321 if (info.state_ == WriterInfo::ALIVE) { 02322 liveliness_changed_status_.alive_count--; 02323 liveliness_changed_status_.alive_count_change--; 02324 liveliness_changed_status_.not_alive_count++; 02325 liveliness_changed_status_.not_alive_count_change++; 02326 liveliness_changed = true; 02327 } 02328 02329 liveliness_changed_status_.last_publication_handle = info.handle_; 02330 02331 //update the state to DEAD. 02332 info.state_ = WriterInfo::DEAD; 02333 info.seen_data_ = false; 02334 02335 if (this->monitor_) { 02336 this->monitor_->report(); 02337 } 02338 02339 if (liveliness_changed_status_.alive_count < 0) { 02340 ACE_ERROR((LM_ERROR, 02341 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ") 02342 ACE_TEXT(" invalid liveliness_changed_status alive count - %d.\n"), 02343 liveliness_changed_status_.alive_count)); 02344 return; 02345 } 02346 02347 if (liveliness_changed_status_.not_alive_count < 0) { 02348 ACE_ERROR((LM_ERROR, 02349 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ") 02350 ACE_TEXT(" invalid liveliness_changed_status not alive count - %d.\n"), 02351 liveliness_changed_status_.not_alive_count)); 02352 return; 02353 } 02354 02355 instances_liveliness_update(info, when); 02356 02357 // Call listener only when there are liveliness status changes. 02358 if (liveliness_changed) { 02359 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02360 this->notify_liveliness_change(); 02361 } 02362 }
void OpenDDS::DCPS::DataReaderImpl::writer_removed | ( | WriterInfo & | info | ) | [virtual] |
tell instance when a DataWriter is removed. The liveliness status need update.
Reimplemented from OpenDDS::DCPS::WriterInfoListener.
Definition at line 2173 of file DataReaderImpl.cpp.
References OpenDDS::DCPS::WriterInfo::ALIVE, DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::handle_, instances_liveliness_update(), DDS::LivelinessChangedStatus::last_publication_handle, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, notify_liveliness_change(), OPENDDS_STRING, owner_manager_, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::WriterInfo::state_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::WriterInfo::writer_id_.
02174 { 02175 if (DCPS_debug_level >= 5) { 02176 GuidConverter reader_converter(subscription_id_); 02177 GuidConverter writer_converter(info.writer_id_); 02178 ACE_DEBUG((LM_DEBUG, 02179 ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ") 02180 ACE_TEXT("reader %C from writer %C.\n"), 02181 OPENDDS_STRING(reader_converter).c_str(), 02182 OPENDDS_STRING(writer_converter).c_str())); 02183 } 02184 02185 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 02186 if (this->is_exclusive_ownership_) { 02187 this->owner_manager_->remove_writer (info.writer_id_); 02188 info.clear_owner_evaluated (); 02189 } 02190 #endif 02191 02192 bool liveliness_changed = false; 02193 02194 if (info.state_ == WriterInfo::ALIVE) { 02195 -- liveliness_changed_status_.alive_count; 02196 -- liveliness_changed_status_.alive_count_change; 02197 liveliness_changed = true; 02198 } 02199 02200 if (info.state_ == WriterInfo::DEAD) { 02201 -- liveliness_changed_status_.not_alive_count; 02202 -- liveliness_changed_status_.not_alive_count_change; 02203 liveliness_changed = true; 02204 } 02205 02206 liveliness_changed_status_.last_publication_handle = info.handle_; 02207 instances_liveliness_update(info, ACE_OS::gettimeofday()); 02208 02209 if (liveliness_changed) { 02210 set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true); 02211 this->notify_liveliness_change(); 02212 } 02213 }
friend class ::DDS_TEST [friend] |
friend class EndHistoricSamplesMissedSweeper [friend] |
Definition at line 684 of file DataReaderImpl.h.
friend class InstanceState [friend] |
Definition at line 683 of file DataReaderImpl.h.
friend class QueryConditionImpl [friend] |
friend class RemoveAssociationSweeper< DataReaderImpl > [friend] |
Definition at line 685 of file DataReaderImpl.h.
friend class RequestedDeadlineWatchdog [friend] |
friend class SubscriberImpl [friend] |
Definition at line 193 of file DataReaderImpl.h.
bool OpenDDS::DCPS::DataReaderImpl::always_get_history_ [private] |
Definition at line 715 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), and notify_latency().
bool OpenDDS::DCPS::DataReaderImpl::coherent_ [protected] |
Is accessing to Group coherent changes ?
Definition at line 637 of file DataReaderImpl.h.
Referenced by begin_access(), and end_access().
DDS::ContentFilteredTopic_var OpenDDS::DCPS::DataReaderImpl::content_filtered_topic_ [protected] |
Definition at line 632 of file DataReaderImpl.h.
Referenced by cleanup(), enable_filtering(), and get_cf_topic().
CORBA::Long OpenDDS::DCPS::DataReaderImpl::depth_ [private] |
Definition at line 692 of file DataReaderImpl.h.
Referenced by enable(), get_next_handle(), init(), set_qos(), transport_assoc_done(), and update_subscription_params().
DDS::DataReader_var OpenDDS::DCPS::DataReaderImpl::dr_local_objref_ [private] |
Definition at line 694 of file DataReaderImpl.h.
Referenced by cleanup(), coherent_changes_completed(), get_dr_obj_ref(), init(), notify_liveliness_change(), remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().
Definition at line 695 of file DataReaderImpl.h.
Referenced by add_link(), cleanup(), remove_associations_i(), resume_sample_processing(), and ~DataReaderImpl().
Ordered group samples.
Definition at line 640 of file DataReaderImpl.h.
Referenced by end_access(), and get_ordered_data().
RepoIdToHandleMap OpenDDS::DCPS::DataReaderImpl::id_to_handle_map_ [private] |
Definition at line 706 of file DataReaderImpl.h.
Referenced by get_matched_publications(), remove_associations_i(), and transport_assoc_done().
bool OpenDDS::DCPS::DataReaderImpl::initialized_ [private] |
Flag indicates that the init() is called.
Definition at line 817 of file DataReaderImpl.h.
Referenced by init(), and ~DataReaderImpl().
SubscriptionInstanceMapType OpenDDS::DCPS::DataReaderImpl::instances_ [mutable, protected] |
: document why the instances_ container is mutable.
Definition at line 588 of file DataReaderImpl.h.
Referenced by accept_coherent(), cleanup(), contains_sample(), data_received(), get_handle_instance(), get_instance_handles(), get_ordered_data(), have_instance_states(), have_sample_states(), have_view_states(), instances_liveliness_update(), num_zero_copies(), reject_coherent(), release_instance(), reschedule_deadline(), signal_liveliness(), and total_samples().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::instances_lock_ [mutable, protected] |
Assume since the container is mutable(?!!?) it may need to use the lock while const. : remove the recursive nature of the instances_lock if not needed.
Definition at line 593 of file DataReaderImpl.h.
bool OpenDDS::DCPS::DataReaderImpl::is_bit_ [private] |
Flag indicates that this datareader is a builtin topic datareader.
Definition at line 814 of file DataReaderImpl.h.
Referenced by add_association(), init(), is_bit(), remove_associations(), remove_associations_i(), and transport_assoc_done().
bool OpenDDS::DCPS::DataReaderImpl::is_exclusive_ownership_ [protected] |
CORBA::Long OpenDDS::DCPS::DataReaderImpl::last_deadline_missed_total_count_ [private] |
Definition at line 807 of file DataReaderImpl.h.
Referenced by enable(), get_requested_deadline_missed_status(), and set_qos().
DDS::DataReaderListener_var OpenDDS::DCPS::DataReaderImpl::listener_ [private] |
Definition at line 691 of file DataReaderImpl.h.
Referenced by get_listener(), init(), listener_for(), and set_listener().
Definition at line 690 of file DataReaderImpl.h.
Referenced by init(), listener_for(), notify_liveliness_change(), and set_listener().
Definition at line 709 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_liveliness_changed_status(), instances_liveliness_update(), notify_liveliness_change(), writer_became_alive(), writer_became_dead(), and writer_removed().
Definition at line 805 of file DataReaderImpl.h.
Referenced by cleanup(), transport_assoc_done(), writer_became_alive(), and ~DataReaderImpl().
Monitor* OpenDDS::DCPS::DataReaderImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 846 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), enable(), release_instance(), remove_associations_i(), transport_assoc_done(), writer_became_alive(), and writer_became_dead().
size_t OpenDDS::DCPS::DataReaderImpl::n_chunks_ [private] |
Definition at line 628 of file DataReaderImpl.h.
Referenced by cleanup(), OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(), init(), release_instance(), writer_became_dead(), and writer_removed().
Definition at line 622 of file DataReaderImpl.h.
Referenced by get_dp_id(), get_instance_handle(), get_matched_publication_data(), get_next_handle(), init(), OpenDDS::DCPS::InstanceState::sample_info(), set_qos(), transport_assoc_done(), and update_subscription_params().
Periodic Monitor object for this entity.
Definition at line 849 of file DataReaderImpl.h.
Referenced by DataReaderImpl().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::publication_handle_lock_ [private] |
Definition at line 702 of file DataReaderImpl.h.
Referenced by add_association(), remove_all_associations(), remove_associations_i(), and transport_assoc_done().
Definition at line 610 of file DataReaderImpl.h.
Referenced by enable(), filter_instance(), filter_sample(), get_qos(), init(), and set_qos().
unsigned int OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size_ [private] |
Bound (or initial reservation) of raw latency buffer.
Definition at line 836 of file DataReaderImpl.h.
Referenced by add_association(), and raw_latency_buffer_size().
DataCollector<double>::OnFull OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type_ [private] |
Type of raw latency data buffer.
Definition at line 839 of file DataReaderImpl.h.
Referenced by add_association(), and raw_latency_buffer_type().
ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataReaderImpl::reactor_ [private] |
The orb's reactor to be used to register the liveliness timer.
Definition at line 731 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), and get_reactor().
ReadConditionSet OpenDDS::DCPS::DataReaderImpl::read_conditions_ [private] |
Definition at line 843 of file DataReaderImpl.h.
Referenced by create_querycondition(), create_readcondition(), delete_contained_entities(), delete_readcondition(), has_readcondition(), and notify_read_conditions().
RemoveAssociationSweeper<DataReaderImpl>* OpenDDS::DCPS::DataReaderImpl::remove_association_sweeper_ [private] |
Definition at line 696 of file DataReaderImpl.h.
Referenced by cleanup(), remove_associations(), remove_associations_i(), and ~DataReaderImpl().
DDS::RequestedDeadlineMissedStatus OpenDDS::DCPS::DataReaderImpl::requested_deadline_missed_status_ [private] |
Definition at line 710 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), enable(), get_requested_deadline_missed_status(), and set_qos().
DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::DataReaderImpl::requested_incompatible_qos_status_ [private] |
Definition at line 711 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_requested_incompatible_qos_status(), and update_incompatible_qos().
Definition at line 703 of file DataReaderImpl.h.
Definition at line 620 of file DataReaderImpl.h.
Referenced by coherent_changes_completed(), data_received(), notify_read_conditions(), and writer_became_alive().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::sample_lock_ [protected] |
lock protecting sample container as well as statuses.
Definition at line 617 of file DataReaderImpl.h.
Referenced by accept_coherent(), begin_access(), contains_sample(), end_access(), get_instance_handles(), get_ordered_data(), OpenDDS::DCPS::QueryConditionImpl::get_trigger_value(), reject_coherent(), and transport_assoc_done().
Definition at line 614 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_sample_lost_status(), and set_sample_lost_status().
Definition at line 613 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_sample_rejected_status(), and set_sample_rejected_status().
StatsMapType OpenDDS::DCPS::DataReaderImpl::statistics_ [private] |
Statistics for this reader, collected for each writer.
Definition at line 833 of file DataReaderImpl.h.
Referenced by add_association(), get_latency_stats(), process_latency(), raw_latency_statistics(), and reset_latency_stats().
bool OpenDDS::DCPS::DataReaderImpl::statistics_enabled_ [private] |
Flag indicating status of statistics gathering.
Definition at line 821 of file DataReaderImpl.h.
Referenced by statistics_enabled().
Definition at line 693 of file DataReaderImpl.h.
Referenced by coherent_changes_completed(), data_received(), enable(), get_subscriber(), get_subscriber_servant(), init(), listener_for(), parent(), set_qos(), and verify_coherent_changes_completion().
Definition at line 724 of file DataReaderImpl.h.
Definition at line 712 of file DataReaderImpl.h.
Referenced by DataReaderImpl(), get_subscription_matched_status(), remove_associations_i(), and transport_assoc_done().
DDS::TopicDescription_var OpenDDS::DCPS::DataReaderImpl::topic_desc_ [private] |
TopicImpl* OpenDDS::DCPS::DataReaderImpl::topic_servant_ [protected] |
Definition at line 623 of file DataReaderImpl.h.
Referenced by cleanup(), enable(), get_topic_id(), get_topic_name(), inconsistent_topic(), and init().
bool OpenDDS::DCPS::DataReaderImpl::transport_disabled_ [private] |
Watchdog responsible for reporting missed offered deadlines.
Definition at line 810 of file DataReaderImpl.h.
Referenced by cleanup(), data_received(), enable(), and set_qos().
WriterMapType OpenDDS::DCPS::DataReaderImpl::writers_ [private] |
Definition at line 827 of file DataReaderImpl.h.
Referenced by add_association(), add_link(), check_historic(), OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i(), cleanup(), coherent_change_received(), data_received(), filter_instance(), get_writer_states(), notify_liveliness_change(), remove_all_associations(), remove_associations(), remove_associations_i(), remove_or_reschedule(), reset_coherent_info(), reset_ownership(), resume_sample_processing(), signal_liveliness(), transport_assoc_done(), update_ownership_strength(), writer_activity(), and ~DataReaderImpl().
ACE_RW_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::writers_lock_ [private] |
RW lock for reading/writing publications.
Definition at line 830 of file DataReaderImpl.h.
Referenced by add_association(), add_link(), check_historic(), and OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i().