#include <DataReaderImpl_T.h>
Servant for DataReader interface of Traits::MessageType data type.
See the DDS specification, OMG formal/04-12-02, for a description of this interface.
Definition at line 28 of file DataReaderImpl_T.h.
typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageTypeMemoryBlock, ACE_Null_Mutex> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataAllocator |
Definition at line 72 of file DataReaderImpl_T.h.
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface |
Definition at line 74 of file DataReaderImpl_T.h.
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType |
Definition at line 38 of file DataReaderImpl_T.h.
typedef RcHandle<SharedInstanceMap> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::SharedInstanceMap_rch |
Definition at line 49 of file DataReaderImpl_T.h.
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType |
Definition at line 37 of file DataReaderImpl_T.h.
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T | ( | void | ) | [inline] |
Definition at line 76 of file DataReaderImpl_T.h.
00077 : filter_delayed_handler_(make_rch<FilterDelayedHandler>(ref(*this))) 00078 { 00079 }
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T | ( | void | ) | [inline, virtual] |
Definition at line 81 of file DataReaderImpl_T.h.
00082 { 00083 for (typename InstanceMap::iterator it = instance_map_.begin(); 00084 it != instance_map_.end(); ++it) 00085 { 00086 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second); 00087 this->purge_data(ptr); 00088 } 00089 //X SHH release the data samples in the instance_map_. 00090 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan | ( | void * | seq | ) | [inline, virtual] |
Definition at line 709 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00710 { 00711 MessageSequenceType& received_data = 00712 *static_cast< MessageSequenceType*> (seq); 00713 00714 if (!received_data.release()) 00715 { 00716 // this->release_loan(received_data); 00717 received_data.length(0); 00718 } 00719 return DDS::RETCODE_OK; 00720 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs | ( | const char * | method_name, | |
MessageSequenceType & | received_data, | |||
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples | |||
) | [inline, private] |
Common input read* & take* input processing and precondition checks.
Definition at line 2067 of file DataReaderImpl_T.h.
References ACE_TEXT(), DDS::LENGTH_UNLIMITED, LM_DEBUG, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
02072 { 02073 typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data); 02074 02075 // ---- start of preconditions common to read and take ----- 02076 // SPEC ref v1.2 7.1.2.5.3.8 #1 02077 // NOTE: We can't check maximum() or release() here since those are 02078 // implementation details of the sequences. In general, the 02079 // info_seq will have release() == true and maximum() == 0. 02080 // If we're in zero-copy mode, the received_data will have 02081 // release() == false and maximum() == 0. If it's not 02082 // zero-copy then received_data will have release == true() 02083 // and maximum() == anything. 02084 if (received_data.length() != info_seq.length()) 02085 { 02086 ACE_DEBUG((LM_DEBUG, 02087 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 02088 ACE_TEXT("PRECONDITION_NOT_MET sample and info input ") 02089 ACE_TEXT("sequences do not match.\n"), 02090 TraitsType::type_name(), 02091 method_name )); 02092 return DDS::RETCODE_PRECONDITION_NOT_MET; 02093 } 02094 02095 //SPEC ref v1.2 7.1.2.5.3.8 #4 02096 if ((received_data.maximum() > 0) && (received_data.release() == false)) 02097 { 02098 ACE_DEBUG((LM_DEBUG, 02099 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 02100 ACE_TEXT("PRECONDITION_NOT_MET mismatch of ") 02101 ACE_TEXT("maximum %d and owns %d\n"), 02102 TraitsType::type_name(), 02103 method_name, 02104 received_data.maximum(), 02105 received_data.release() )); 02106 02107 return DDS::RETCODE_PRECONDITION_NOT_MET; 02108 } 02109 02110 if (received_data.maximum() == 0) 02111 { 02112 // not in SPEC but needed. 02113 if (max_samples == DDS::LENGTH_UNLIMITED) 02114 { 02115 max_samples = 02116 static_cast< ::CORBA::Long> (received_data_p.max_slots()); 02117 } 02118 } 02119 else 02120 { 02121 if (max_samples == DDS::LENGTH_UNLIMITED) 02122 { 02123 //SPEC ref v1.2 7.1.2.5.3.8 #5a 02124 max_samples = received_data.maximum(); 02125 } 02126 else if ( 02127 max_samples > static_cast< ::CORBA::Long> (received_data.maximum())) 02128 { 02129 //SPEC ref v1.2 7.1.2.5.3.8 #5c 02130 ACE_DEBUG((LM_DEBUG, 02131 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 02132 ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"), 02133 TraitsType::type_name(), 02134 method_name, 02135 max_samples, 02136 received_data.maximum())); 02137 return DDS::RETCODE_PRECONDITION_NOT_MET; 02138 } 02139 //else 02140 //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below. 02141 } 02142 02143 // The spec does not say what to do in this case but it appears to be a good thing. 02144 // Note: max_slots is the greater of the sequence's maximum and init_size. 02145 if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples) 02146 { 02147 max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots()); 02148 } 02149 //---- end of preconditions common to read and take ----- 02150 02151 return DDS::RETCODE_OK; 02152 }
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered | ( | DDS::SampleStateMask | sample_states, | |
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
const OpenDDS::DCPS::FilterEvaluator & | evaluator, | |||
const DDS::StringSeq & | params | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 728 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, item(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, and OpenDDS::DCPS::InstanceState::view_state().
00733 { 00734 using namespace OpenDDS::DCPS; 00735 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false); 00736 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false); 00737 00738 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 00739 end = instances_.end(); iter != end; ++iter) { 00740 SubscriptionInstance& inst = *iter->second; 00741 00742 if ((inst.instance_state_.view_state() & view_states) && 00743 (inst.instance_state_.instance_state() & instance_states)) { 00744 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0; 00745 item = item->next_data_sample_) { 00746 if (item->sample_state_ & sample_states 00747 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00748 && !item->coherent_change_ 00749 #endif 00750 && item->registered_data_) { 00751 if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) { 00752 return true; 00753 } 00754 } 00755 } 00756 } 00757 } 00758 00759 return false; 00760 }
unique_ptr<DataAllocator>& OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator | ( | ) | [inline, private] |
Definition at line 2381 of file DataReaderImpl_T.h.
02381 { return filter_delayed_handler_->data_allocator_; }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance_rch & | instance, | |||
bool & | just_registered, | |||
bool & | filtered, | |||
OpenDDS::DCPS::MarshalingType | marshaling_type | |||
) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 1010 of file DataReaderImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::Serializer::use_rti_serialization().
01015 { 01016 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator); 01017 const bool cdr = sample.header_.cdr_encapsulation_; 01018 01019 OpenDDS::DCPS::Serializer ser( 01020 sample.sample_.get(), 01021 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 01022 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE); 01023 01024 if (cdr) { 01025 ACE_CDR::ULong header; 01026 if (!(ser >> header)) { 01027 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ") 01028 ACE_TEXT("deserialization header failed, dropping sample.\n"), 01029 TraitsType::type_name())); 01030 return; 01031 } 01032 01033 if (Serializer::use_rti_serialization()) { 01034 // Start counting byte-offset AFTER header 01035 ser.reset_alignment(); 01036 } 01037 } 01038 01039 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) { 01040 ser >> OpenDDS::DCPS::KeyOnly< MessageType>(*data); 01041 } else { 01042 ser >> *data; 01043 } 01044 01045 if (!ser.good_bit()) { 01046 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ") 01047 ACE_TEXT("deserialization failed, dropping sample.\n"), 01048 TraitsType::type_name())); 01049 return; 01050 } 01051 01052 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01053 if (!sample.header_.content_filter_) { // if this is true, the writer has already filtered 01054 using OpenDDS::DCPS::ContentFilteredTopicImpl; 01055 if (content_filtered_topic_) { 01056 if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA 01057 && !content_filtered_topic_->filter(static_cast<MessageType&>(*data))) { 01058 filtered = true; 01059 return; 01060 } 01061 } 01062 } 01063 #endif 01064 01065 store_instance_data(move(data), sample.header_, instance, just_registered, filtered); 01066 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance_rch & | instance | |||
) | [inline, protected, virtual] |
!! caller should already have the sample_lock_
Reimplemented from OpenDDS::DCPS::DataReaderImpl.
Definition at line 1068 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, and OpenDDS::DCPS::KEY_ONLY_MARSHALING.
01070 { 01071 //!!! caller should already have the sample_lock_ 01072 01073 // The data sample in this dispose message does not contain any valid data. 01074 // What it needs here is the key value to identify the instance to dispose. 01075 // The demarshal push this "sample" to received sample list so the user 01076 // can be notified the dispose event. 01077 bool just_registered = false; 01078 bool filtered = false; 01079 OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING; 01080 if (sample.header_.key_fields_only_) { 01081 marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING; 01082 } 01083 this->dds_demarshal(sample, instance, just_registered, filtered, marshaling); 01084 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific | ( | ) | [inline, virtual] |
Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 96 of file DataReaderImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and DDS::RETCODE_OK.
00097 { 00098 data_allocator().reset(new DataAllocator(get_n_chunks ())); 00099 if (OpenDDS::DCPS::DCPS_debug_level >= 2) 00100 ACE_DEBUG((LM_DEBUG, 00101 ACE_TEXT("(%P|%t) %CDataReaderImpl::") 00102 ACE_TEXT("enable_specific-data") 00103 ACE_TEXT(" Cached_Allocator_With_Overflow ") 00104 ACE_TEXT("%x with %d chunks\n"), 00105 TraitsType::type_name(), 00106 data_allocator().get(), 00107 this->get_n_chunks ())); 00108 00109 return DDS::RETCODE_OK; 00110 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data | ( | unique_ptr< MessageTypeWithAllocator > | instance_data, | |
const DataSampleHeader & | header, | |||
SubscriptionInstance_rch | instance_ptr, | |||
bool | is_dispose_msg, | |||
bool | is_unregister_msg | |||
) | [inline, private] |
Definition at line 1815 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::ReceivedDataElement::coherent_change_, DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), item(), DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::NOT_READ_SAMPLE_STATE, OpenDDS::DCPS::DataSampleHeader::publication_id_, DDS::REJECTED_BY_SAMPLES_LIMIT, DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::SAMPLE_LOST_STATUS, DDS::SAMPLE_REJECTED_STATUS, OpenDDS::DCPS::ReceivedDataElement::sample_state_, and OpenDDS::DCPS::DataSampleHeader::sequence_.
01817 { 01818 if ((this->qos_.resource_limits.max_samples_per_instance != 01819 DDS::LENGTH_UNLIMITED) && 01820 (instance_ptr->rcvd_samples_.size_ >= 01821 this->qos_.resource_limits.max_samples_per_instance)) { 01822 01823 // According to spec 1.2, Samples that contain no data do not 01824 // count towards the limits imposed by the RESOURCE_LIMITS QoS policy 01825 // so do not remove the oldest sample when unregister/dispose 01826 // message arrives. 01827 01828 if (!is_dispose_msg && !is_unregister_msg 01829 && instance_ptr->rcvd_samples_.head_->sample_state_ 01830 == DDS::NOT_READ_SAMPLE_STATE) 01831 { 01832 // for now the implemented QoS means that if the head sample 01833 // is NOT_READ then none are read. 01834 // TBD - in future we will reads may not read in order so 01835 // just looking at the head will not be enough. 01836 DDS::DataReaderListener_var listener 01837 = listener_for(DDS::SAMPLE_REJECTED_STATUS); 01838 01839 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true); 01840 01841 sample_rejected_status_.last_reason = 01842 DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT; 01843 ++sample_rejected_status_.total_count; 01844 ++sample_rejected_status_.total_count_change; 01845 sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_; 01846 01847 if (!CORBA::is_nil(listener.in())) 01848 { 01849 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01850 01851 listener->on_sample_rejected(this, sample_rejected_status_); 01852 sample_rejected_status_.total_count_change = 0; 01853 } // do we want to do something if listener is nil??? 01854 notify_status_condition_no_sample_lock(); 01855 return; 01856 } 01857 else if (!is_dispose_msg && !is_unregister_msg) 01858 { 01859 // Discard the oldest previously-read sample 01860 OpenDDS::DCPS::ReceivedDataElement *item = 01861 instance_ptr->rcvd_samples_.head_; 01862 instance_ptr->rcvd_samples_.remove(item); 01863 item->dec_ref(); 01864 } 01865 } 01866 else if (this->qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) 01867 { 01868 CORBA::Long total_samples = 0; 01869 { 01870 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 01871 for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01872 iter != instances_.end(); 01873 ++iter) { 01874 OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second; 01875 01876 total_samples += (CORBA::Long) ptr->rcvd_samples_.size_; 01877 } 01878 } 01879 01880 if (total_samples >= this->qos_.resource_limits.max_samples) 01881 { 01882 // According to spec 1.2, Samples that contain no data do not 01883 // count towards the limits imposed by the RESOURCE_LIMITS QoS policy 01884 // so do not remove the oldest sample when unregister/dispose 01885 // message arrives. 01886 01887 if (!is_dispose_msg && !is_unregister_msg 01888 && instance_ptr->rcvd_samples_.head_->sample_state_ 01889 == DDS::NOT_READ_SAMPLE_STATE) 01890 { 01891 // for now the implemented QoS means that if the head sample 01892 // is NOT_READ then none are read. 01893 // TBD - in future we will reads may not read in order so 01894 // just looking at the head will not be enough. 01895 DDS::DataReaderListener_var listener 01896 = listener_for(DDS::SAMPLE_REJECTED_STATUS); 01897 01898 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true); 01899 01900 sample_rejected_status_.last_reason = 01901 DDS::REJECTED_BY_SAMPLES_LIMIT; 01902 ++sample_rejected_status_.total_count; 01903 ++sample_rejected_status_.total_count_change; 01904 sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_; 01905 if (!CORBA::is_nil(listener.in())) 01906 { 01907 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01908 01909 listener->on_sample_rejected(this, sample_rejected_status_); 01910 sample_rejected_status_.total_count_change = 0; 01911 } // do we want to do something if listener is nil??? 01912 notify_status_condition_no_sample_lock(); 01913 01914 return; 01915 } 01916 else if (!is_dispose_msg && !is_unregister_msg) 01917 { 01918 // Discard the oldest previously-read sample 01919 OpenDDS::DCPS::ReceivedDataElement *item = 01920 instance_ptr->rcvd_samples_.head_; 01921 instance_ptr->rcvd_samples_.remove(item); 01922 item->dec_ref(); 01923 } 01924 } 01925 } 01926 01927 if (is_dispose_msg || is_unregister_msg) 01928 { 01929 instance_data.reset(); 01930 } 01931 01932 bool event_notify = false; 01933 01934 if (is_dispose_msg) { 01935 event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_); 01936 } 01937 01938 if (is_unregister_msg) { 01939 if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) { 01940 event_notify = true; 01941 } 01942 } 01943 01944 if (!is_dispose_msg && !is_unregister_msg) { 01945 event_notify = true; 01946 instance_ptr->instance_state_.data_was_received(header.publication_id_); 01947 } 01948 01949 if (!event_notify) { 01950 return; 01951 } 01952 01953 OpenDDS::DCPS::ReceivedDataElement *ptr = 01954 new (*rd_allocator_.get()) OpenDDS::DCPS::ReceivedDataElementWithType<MessageTypeWithAllocator>(header,instance_data.release(), &this->sample_lock_); 01955 01956 ptr->disposed_generation_count_ = 01957 instance_ptr->instance_state_.disposed_generation_count(); 01958 ptr->no_writers_generation_count_ = 01959 instance_ptr->instance_state_.no_writers_generation_count(); 01960 01961 instance_ptr->last_sequence_ = header.sequence_; 01962 01963 instance_ptr->rcvd_strategy_->add(ptr); 01964 01965 if (! is_dispose_msg && ! is_unregister_msg 01966 && instance_ptr->rcvd_samples_.size_ > get_depth()) 01967 { 01968 OpenDDS::DCPS::ReceivedDataElement* head_ptr = 01969 instance_ptr->rcvd_samples_.head_; 01970 01971 instance_ptr->rcvd_samples_.remove(head_ptr); 01972 01973 if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE) 01974 { 01975 DDS::DataReaderListener_var listener 01976 = listener_for (DDS::SAMPLE_LOST_STATUS); 01977 01978 ++sample_lost_status_.total_count; 01979 ++sample_lost_status_.total_count_change; 01980 01981 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true); 01982 01983 if (!CORBA::is_nil(listener.in())) 01984 { 01985 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01986 01987 listener->on_sample_lost(this, sample_lost_status_); 01988 01989 sample_lost_status_.total_count_change = 0; 01990 } 01991 01992 notify_status_condition_no_sample_lock(); 01993 } 01994 01995 head_ptr->dec_ref(); 01996 } 01997 01998 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01999 if (! ptr->coherent_change_) { 02000 #endif 02001 RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant (); 02002 if (!sub) 02003 return; 02004 02005 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true); 02006 02007 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true); 02008 02009 DDS::SubscriberListener_var sub_listener = 02010 sub->listener_for(DDS::DATA_ON_READERS_STATUS); 02011 if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_) 02012 { 02013 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 02014 02015 sub_listener->on_data_on_readers(sub.in()); 02016 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false); 02017 } 02018 else 02019 { 02020 sub->notify_status_condition(); 02021 02022 DDS::DataReaderListener_var listener = 02023 listener_for (DDS::DATA_AVAILABLE_STATUS); 02024 02025 if (!CORBA::is_nil(listener.in())) 02026 { 02027 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 02028 02029 listener->on_data_available(this); 02030 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 02031 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false); 02032 } 02033 else 02034 { 02035 notify_status_condition_no_sample_lock(); 02036 } 02037 } 02038 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 02039 } 02040 #endif 02041 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value | ( | MessageType & | key_holder, | |
DDS::InstanceHandle_t | handle | |||
) | [inline, virtual] |
Definition at line 671 of file DataReaderImpl_T.h.
References DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00674 { 00675 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00676 guard, 00677 this->sample_lock_, 00678 DDS::RETCODE_ERROR); 00679 00680 typename InstanceMap::iterator const the_end = instance_map_.end (); 00681 for (typename InstanceMap::iterator it = instance_map_.begin (); 00682 it != the_end; 00683 ++it) 00684 { 00685 if (it->second == handle) 00686 { 00687 key_holder = it->first; 00688 return DDS::RETCODE_OK; 00689 } 00690 } 00691 00692 return DDS::RETCODE_BAD_PARAMETER; 00693 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance_rch & | instance | |||
) | [inline, virtual] |
!! caller should already have the sample_lock_
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 930 of file DataReaderImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_ERROR, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::Serializer::use_rti_serialization().
00932 { 00933 //!!! caller should already have the sample_lock_ 00934 00935 MessageType data; 00936 00937 const bool cdr = sample.header_.cdr_encapsulation_; 00938 00939 OpenDDS::DCPS::Serializer ser( 00940 sample.sample_.get(), 00941 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 00942 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00943 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00944 00945 if (cdr) { 00946 ACE_CDR::ULong header; 00947 if (!(ser >> header)) { 00948 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ") 00949 ACE_TEXT("deserialization header failed.\n"), 00950 TraitsType::type_name())); 00951 return; 00952 } 00953 00954 if (Serializer::use_rti_serialization()) { 00955 // Start counting byte-offset AFTER header 00956 ser.reset_alignment(); 00957 } 00958 } 00959 00960 if (sample.header_.key_fields_only_) { 00961 ser >> OpenDDS::DCPS::KeyOnly< MessageType>(data); 00962 } else { 00963 ser >> data; 00964 } 00965 00966 if (!ser.good_bit()) { 00967 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ") 00968 ACE_TEXT("deserialization failed.\n"), 00969 TraitsType::type_name())); 00970 return; 00971 } 00972 00973 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL); 00974 typename InstanceMap::const_iterator const it = instance_map_.find(data); 00975 if (it != instance_map_.end()) { 00976 handle = it->second; 00977 } 00978 00979 if (handle == DDS::HANDLE_NIL) { 00980 instance.reset(); 00981 } else { 00982 instance = get_handle_instance(handle); 00983 } 00984 }
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance | ( | const MessageType & | instance_data | ) | [inline, virtual] |
Definition at line 695 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL.
00696 { 00697 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data); 00698 00699 if (it == instance_map_.end()) 00700 { 00701 return DDS::HANDLE_NIL; 00702 } 00703 else 00704 { 00705 return it->second; 00706 } 00707 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic | ( | const void * | data | ) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 791 of file DataReaderImpl_T.h.
References lookup_instance().
00792 { 00793 return lookup_instance(*static_cast<const MessageType*>(data)); 00794 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock | ( | ) | [inline, private] |
Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.
Definition at line 2046 of file DataReaderImpl_T.h.
02047 { 02048 // This member function avoids a deadlock condition which otherwise 02049 // could occur as follows: 02050 // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and 02051 // eventually DataReaderImpl::sample_lock_ to lock in call to 02052 // DataReaderImpl::contains_samples(). 02053 // Thread2: Call to DataReaderImpl::data_received() 02054 // causes DataReaderImpl::sample_lock_ to lock and eventually 02055 // during notify of status condition a call to WaitSet::signal() 02056 // causes WaitSet::lock_ to lock. 02057 // Because the DataReaderImpl::sample_lock_ is not needed during 02058 // status notification this member function is used in 02059 // store_instance_data() to release sample_lock_ before making 02060 // the notification. 02061 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 02062 notify_status_condition(); 02063 }
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP_T | ( | MessageType | , | |
DDS::InstanceHandle_t | , | |||
typename TraitsType::LessThanType | ||||
) |
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data | ( | OpenDDS::DCPS::SubscriptionInstance_rch | instance | ) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 1086 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::ReceivedDataElement::dec_ref().
01087 { 01088 filter_delayed_handler_->drop_sample(instance->instance_handle_); 01089 01090 01091 instance->instance_state_.cancel_release(); 01092 01093 while (instance->rcvd_samples_.size_ > 0) 01094 { 01095 OpenDDS::DCPS::ReceivedDataElement* head = 01096 instance->rcvd_samples_.remove_head(); 01097 head->dec_ref(); 01098 } 01099 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change | ( | const DDS::DataReaderQos & | qos | ) | [inline, virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderImpl.
Definition at line 986 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::duration_to_time_value(), DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, OpenDDS::DCPS::DataReaderImpl::qos_change(), DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, and DDS::DataReaderQos::time_based_filter.
00987 { 00988 // reliability is not changeable, just time_based_filter 00989 if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 00990 if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) { 00991 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC }; 00992 if (qos_.time_based_filter.minimum_separation != zero) { 00993 if (qos.time_based_filter.minimum_separation != zero) { 00994 const ACE_Time_Value new_interval = duration_to_time_value(qos.time_based_filter.minimum_separation); 00995 filter_delayed_handler_->reset_interval(new_interval); 00996 } else { 00997 filter_delayed_handler_->cancel(); 00998 } 00999 } 01000 // else no existing timers to change/cancel 01001 } 01002 // else no qos change so nothing to change 01003 } 01004 01005 DataReaderImpl::qos_change(qos); 01006 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 112 of file DataReaderImpl_T.h.
References read(), DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00119 { 00120 DDS::ReturnCode_t const precond = 00121 check_inputs("read", received_data, info_seq, max_samples); 00122 if (DDS::RETCODE_OK != precond) 00123 { 00124 return precond; 00125 } 00126 00127 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00128 guard, 00129 this->sample_lock_, 00130 DDS::RETCODE_ERROR); 00131 00132 return read_i(received_data, info_seq, max_samples, sample_states, 00133 view_states, instance_states, 0); 00134 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic | ( | OpenDDS::DCPS::DataReaderImpl::GenericBundle & | gen, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
bool | adjust_ref_count = false | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 762 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, DDS::LENGTH_UNLIMITED, DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_.
00767 { 00768 00769 MessageSequenceType data; 00770 DDS::ReturnCode_t rc; 00771 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00772 guard, 00773 this->sample_lock_, 00774 DDS::RETCODE_ERROR); 00775 { 00776 rc = read_i(data, gen.info_, 00777 DDS::LENGTH_UNLIMITED, 00778 sample_states, view_states, instance_states, 0); 00779 if (true == adjust_ref_count ) { 00780 data.increment_references(); 00781 } 00782 } 00783 gen.samples_.reserve(data.length()); 00784 for (CORBA::ULong i = 0; i < data.length(); ++i) { 00785 gen.samples_.push_back(&data[i]); 00786 } 00787 return rc; 00788 00789 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1120 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_READ, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.
01132 { 01133 #ifdef OPENDDS_NO_QUERY_CONDITION 01134 ACE_UNUSED_ARG(ignored); 01135 #endif 01136 01137 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01138 01139 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01140 if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS 01141 && ! this->coherent_) { 01142 return DDS::RETCODE_PRECONDITION_NOT_MET; 01143 } 01144 01145 bool group_coherent_ordered 01146 = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS 01147 && this->subqos_.presentation.coherent_access 01148 && this->subqos_.presentation.ordered_access; 01149 01150 if (group_coherent_ordered && this->coherent_) { 01151 max_samples = 1; 01152 } 01153 #endif 01154 01155 OpenDDS::DCPS::RakeResults< MessageSequenceType > 01156 results(this, received_data, info_seq, max_samples, 01157 this->subqos_.presentation, 01158 #ifndef OPENDDS_NO_QUERY_CONDITION 01159 a_condition, 01160 #endif 01161 OpenDDS::DCPS::DDS_OPERATION_READ); 01162 01163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01164 if (! group_coherent_ordered) { 01165 #endif 01166 for (typename InstanceMap::iterator it = instance_map_.begin(), 01167 the_end = instance_map_.end(); it != the_end; ++it) 01168 { 01169 DDS::InstanceHandle_t handle = it->second; 01170 01171 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle); 01172 01173 if ((inst->instance_state_.view_state() & view_states) && 01174 (inst->instance_state_.instance_state() & instance_states)) 01175 { 01176 size_t i(0); 01177 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_; 01178 item != 0; item = item->next_data_sample_) 01179 { 01180 if (item->sample_state_ & sample_states 01181 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01182 && !item->coherent_change_ 01183 #endif 01184 ) 01185 { 01186 results.insert_sample(item, inst, ++i); 01187 } 01188 } 01189 } 01190 } 01191 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01192 } 01193 else { 01194 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data(); 01195 results.insert_sample(item.rde_, item.si_, item.index_in_instance_); 01196 } 01197 #endif 01198 01199 results.copy_to_user(); 01200 01201 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA; 01202 if (received_data.length()) 01203 { 01204 ret = DDS::RETCODE_OK; 01205 if (received_data.maximum() == 0) //using ZeroCopy 01206 { 01207 received_data_p.set_loaner(this); 01208 } 01209 } 01210 01211 post_read_or_take(); 01212 01213 return ret; 01214 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 399 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00407 { 00408 DDS::ReturnCode_t const precond = 00409 check_inputs("read_instance", received_data, info_seq, max_samples); 00410 if (DDS::RETCODE_OK != precond) 00411 { 00412 return precond; 00413 } 00414 00415 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00416 guard, 00417 this->sample_lock_, 00418 DDS::RETCODE_ERROR); 00419 return read_instance_i(received_data, info_seq, max_samples, a_handle, 00420 sample_states, view_states, instance_states, 0); 00421 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 821 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.
00825 { 00826 MessageSequenceType dataseq; 00827 DDS::SampleInfoSeq infoseq; 00828 const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq, 00829 DDS::LENGTH_UNLIMITED, instance, sample_states, view_states, 00830 instance_states, 0); 00831 if (rc != DDS::RETCODE_NO_DATA) 00832 { 00833 const CORBA::ULong last = dataseq.length() - 1; 00834 data = new MessageType(dataseq[last]); 00835 info = infoseq[last]; 00836 } 00837 return rc; 00838 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1312 of file DataReaderImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DDS_OPERATION_READ, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::InstanceState::instance_state_string(), item(), LM_DEBUG, OPENDDS_STRING, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().
01325 { 01326 #ifdef OPENDDS_NO_QUERY_CONDITION 01327 ACE_UNUSED_ARG(ignored); 01328 #endif 01329 01330 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01331 01332 OpenDDS::DCPS::RakeResults< MessageSequenceType > 01333 results(this, received_data, info_seq, max_samples, 01334 this->subqos_.presentation, 01335 #ifndef OPENDDS_NO_QUERY_CONDITION 01336 a_condition, 01337 #endif 01338 OpenDDS::DCPS::DDS_OPERATION_READ); 01339 01340 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle); 01341 if (!inst) return DDS::RETCODE_BAD_PARAMETER; 01342 01343 InstanceState& state_obj = inst->instance_state_; 01344 bool valid_view_state = state_obj.view_state() & view_states; 01345 bool valid_instance_state = state_obj.instance_state() & instance_states; 01346 if (valid_view_state && valid_instance_state) 01347 { 01348 size_t i(0); 01349 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_; 01350 item; item = item->next_data_sample_) 01351 { 01352 if (item->sample_state_ & sample_states 01353 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01354 && !item->coherent_change_ 01355 #endif 01356 ) 01357 { 01358 results.insert_sample(item, inst, ++i); 01359 } 01360 } 01361 } 01362 else 01363 { 01364 if (OpenDDS::DCPS::DCPS_debug_level >= 8) { 01365 OPENDDS_STRING msg; 01366 if (!valid_view_state) { 01367 msg += "view state is not valid"; 01368 if (!valid_instance_state) { 01369 msg += " and "; 01370 } 01371 } 01372 if (!valid_instance_state) { 01373 msg = msg 01374 + "instance state is " 01375 + state_obj.instance_state_string() 01376 + " while the validity mask is " 01377 + InstanceState::instance_state_string(instance_states); 01378 } 01379 GuidConverter conv(get_subscription_id()); 01380 ACE_DEBUG((LM_DEBUG, 01381 ACE_TEXT( 01382 "(%P|%t) DataReaderImpl_T::read_instance_i: " 01383 "will return no data reading sub %C because:\n %C\n" 01384 ), 01385 OPENDDS_STRING(conv).c_str(), msg.c_str() 01386 )); 01387 } 01388 } 01389 01390 results.copy_to_user(); 01391 01392 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA; 01393 if (received_data.length()) 01394 { 01395 ret = DDS::RETCODE_OK; 01396 if (received_data.maximum() == 0) //using ZeroCopy 01397 { 01398 received_data_p.set_loaner(this); 01399 } 01400 } 01401 01402 post_read_or_take(); 01403 return ret; 01404 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 447 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00453 { 00454 DDS::ReturnCode_t const precond = 00455 check_inputs("read_instance_w_condition", received_data, info_seq, 00456 max_samples); 00457 if (DDS::RETCODE_OK != precond) 00458 { 00459 return precond; 00460 } 00461 00462 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00463 DDS::RETCODE_ERROR); 00464 00465 if (!has_readcondition(a_condition)) 00466 { 00467 return DDS::RETCODE_PRECONDITION_NOT_MET; 00468 } 00469 00470 #ifndef OPENDDS_NO_QUERY_CONDITION 00471 DDS::QueryCondition_ptr query_condition = 00472 dynamic_cast< DDS::QueryCondition_ptr >(a_condition); 00473 #endif 00474 00475 return read_instance_i(received_data, info_seq, max_samples, a_handle, 00476 a_condition->get_sample_state_mask(), 00477 a_condition->get_view_state_mask(), 00478 a_condition->get_instance_state_mask(), 00479 #ifndef OPENDDS_NO_QUERY_CONDITION 00480 query_condition 00481 #else 00482 0 00483 #endif 00484 ); 00485 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 527 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00535 { 00536 DDS::ReturnCode_t const precond = 00537 check_inputs("read_next_instance", received_data, info_seq, max_samples); 00538 if (DDS::RETCODE_OK != precond) 00539 { 00540 return precond; 00541 } 00542 00543 return read_next_instance_i(received_data, info_seq, max_samples, a_handle, 00544 sample_states, view_states, instance_states, 0); 00545 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic | ( | void *& | data, | |
DDS::SampleInfo & | info, | |||
DDS::InstanceHandle_t | previous_instance, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 840 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.
00844 { 00845 MessageSequenceType dataseq; 00846 DDS::SampleInfoSeq infoseq; 00847 const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq, 00848 DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states, 00849 instance_states, 0); 00850 if (rc != DDS::RETCODE_NO_DATA) 00851 { 00852 const CORBA::ULong last = dataseq.length() - 1; 00853 data = new MessageType(dataseq[last]); 00854 info = infoseq[last]; 00855 } 00856 return rc; 00857 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1471 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and status.
01484 { 01485 #ifdef OPENDDS_NO_QUERY_CONDITION 01486 ACE_UNUSED_ARG(ignored); 01487 #endif 01488 01489 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL); 01490 01491 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01492 guard, 01493 this->sample_lock_, 01494 DDS::RETCODE_ERROR); 01495 01496 typename InstanceMap::iterator it; 01497 typename InstanceMap::iterator const the_end = instance_map_.end (); 01498 01499 if (a_handle == DDS::HANDLE_NIL) 01500 { 01501 it = instance_map_.begin (); 01502 } 01503 else 01504 { 01505 for (it = instance_map_.begin (); 01506 it != the_end; 01507 ++it) 01508 { 01509 if (a_handle == it->second) 01510 { 01511 ++it; 01512 break; 01513 } 01514 } 01515 } 01516 01517 for (; it != the_end; ++it) 01518 { 01519 handle = it->second; 01520 DDS::ReturnCode_t const status = 01521 read_instance_i(received_data, info_seq, max_samples, handle, 01522 sample_states, view_states, instance_states, 01523 #ifndef OPENDDS_NO_QUERY_CONDITION 01524 a_condition); 01525 #else 01526 0); 01527 #endif 01528 if (status != DDS::RETCODE_NO_DATA) 01529 { 01530 post_read_or_take(); 01531 return status; 01532 } 01533 } 01534 01535 post_read_or_take(); 01536 return DDS::RETCODE_NO_DATA; 01537 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 567 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00573 { 00574 DDS::ReturnCode_t const precond = 00575 check_inputs("read_next_instance_w_condition", received_data, info_seq, 00576 max_samples); 00577 if (DDS::RETCODE_OK != precond) 00578 { 00579 return precond; 00580 } 00581 00582 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00583 DDS::RETCODE_ERROR); 00584 00585 if (!has_readcondition(a_condition)) 00586 { 00587 return DDS::RETCODE_PRECONDITION_NOT_MET; 00588 } 00589 00590 #ifndef OPENDDS_NO_QUERY_CONDITION 00591 DDS::QueryCondition_ptr query_condition = 00592 dynamic_cast< DDS::QueryCondition_ptr >(a_condition); 00593 #endif 00594 00595 return read_next_instance_i(received_data, info_seq, max_samples, a_handle, 00596 a_condition->get_sample_state_mask(), 00597 a_condition->get_view_state_mask(), 00598 a_condition->get_instance_state_mask(), 00599 #ifndef OPENDDS_NO_QUERY_CONDITION 00600 query_condition 00601 #else 00602 0 00603 #endif 00604 ); 00605 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample | ( | MessageType & | received_data, | |
DDS::SampleInfo & | sample_info | |||
) | [inline, virtual] |
Definition at line 225 of file DataReaderImpl_T.h.
References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, item(), DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.
00228 { 00229 00230 bool found_data = false; 00231 00232 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00233 guard, 00234 this->sample_lock_, 00235 DDS::RETCODE_ERROR); 00236 00237 typename InstanceMap::iterator const the_end = instance_map_.end (); 00238 for (typename InstanceMap::iterator it = instance_map_.begin (); 00239 it != the_end; 00240 ++it) 00241 { 00242 DDS::InstanceHandle_t handle = it->second; 00243 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle); 00244 00245 bool mrg = false; //most_recent_generation 00246 00247 if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) && 00248 (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE)) 00249 { 00250 for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_; 00251 item != 0; 00252 item = item->next_data_sample_) 00253 { 00254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00255 if (item->coherent_change_) continue; 00256 #endif 00257 00258 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE) 00259 { 00260 if (item->registered_data_ != 0) 00261 { 00262 received_data = 00263 *static_cast< MessageType *> (item->registered_data_); 00264 } 00265 ptr->instance_state_.sample_info(sample_info, item); 00266 00267 item->sample_state_ = DDS::READ_SAMPLE_STATE; 00268 00269 00270 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item); 00271 00272 found_data = true; 00273 } 00274 if (found_data) 00275 { 00276 break; 00277 } 00278 } 00279 } 00280 00281 if (found_data) 00282 { 00283 if (mrg) ptr->instance_state_.accessed(); 00284 00285 // Get the sample_ranks, generation_ranks, and 00286 // absolute_generation_ranks for this info_seq 00287 this->sample_info(sample_info, ptr->rcvd_samples_.tail_); 00288 00289 break; 00290 } 00291 } 00292 post_read_or_take(); 00293 return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA; 00294 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | sample_info, | |||
::CORBA::Long | max_samples, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 160 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00165 { 00166 DDS::ReturnCode_t const precond = 00167 check_inputs("read_w_condition", received_data, sample_info, max_samples); 00168 if (DDS::RETCODE_OK != precond) 00169 { 00170 return precond; 00171 } 00172 00173 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00174 DDS::RETCODE_ERROR); 00175 00176 if (!has_readcondition(a_condition)) 00177 { 00178 return DDS::RETCODE_PRECONDITION_NOT_MET; 00179 } 00180 00181 return read_i(received_data, sample_info, max_samples, 00182 a_condition->get_sample_state_mask(), 00183 a_condition->get_view_state_mask(), 00184 a_condition->get_instance_state_mask(), 00185 #ifndef OPENDDS_NO_QUERY_CONDITION 00186 dynamic_cast< DDS::QueryCondition_ptr >(a_condition)); 00187 #else 00188 0); 00189 #endif 00190 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i | ( | DDS::InstanceHandle_t | handle | ) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 1101 of file DataReaderImpl_T.h.
01102 { 01103 typename InstanceMap::iterator const the_end = instance_map_.end (); 01104 typename InstanceMap::iterator it = instance_map_.begin (); 01105 while (it != the_end) 01106 { 01107 if (it->second == handle) 01108 { 01109 typename InstanceMap::iterator curIt = it; 01110 ++ it; 01111 instance_map_.erase (curIt); 01112 } 01113 else 01114 ++ it; 01115 } 01116 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan | ( | MessageSequenceType & | received_data | ) | [inline] |
Definition at line 722 of file DataReaderImpl_T.h.
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq | |||
) | [inline, virtual] |
Definition at line 647 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00650 { 00651 // Some incomplete tests to see that the data and info are from the 00652 // same read. 00653 if (received_data.length() != info_seq.length()) 00654 { 00655 return DDS::RETCODE_PRECONDITION_NOT_MET; 00656 } 00657 00658 if (received_data.release()) 00659 { 00660 // nothing to do because this is not zero-copy data 00661 return DDS::RETCODE_OK; 00662 } 00663 else 00664 { 00665 info_seq.length(0); 00666 received_data.length(0); 00667 } 00668 return DDS::RETCODE_OK; 00669 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state | ( | DDS::InstanceHandle_t | instance, | |
DDS::InstanceStateKind | state | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 907 of file DataReaderImpl_T.h.
References DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::DISPOSE_INSTANCE, get_key_value(), header, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00909 { 00910 using namespace OpenDDS::DCPS; 00911 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 00912 00913 SubscriptionInstance_rch si = get_handle_instance(instance); 00914 if (si && state != DDS::ALIVE_INSTANCE_STATE) { 00915 DataSampleHeader header; 00916 const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) 00917 ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE; 00918 header.message_id_ = static_cast<char>(msg); 00919 bool just_registered, filtered; 00920 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator); 00921 get_key_value(*data, instance); 00922 store_instance_data(move(data), header, si, just_registered, filtered); 00923 if (!filtered) 00924 { 00925 notify_read_conditions(); 00926 } 00927 } 00928 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data | ( | unique_ptr< MessageTypeWithAllocator > | instance_data, | |
const OpenDDS::DCPS::DataSampleHeader & | header, | |||
OpenDDS::DCPS::SubscriptionInstance_rch & | instance_ptr, | |||
bool & | just_registered, | |||
bool & | filtered | |||
) | [inline, private] |
!! caller should already have the sample_lock_
Definition at line 1605 of file DataReaderImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, OpenDDS::DCPS::INSTANCE_REGISTRATION, CORBA::is_nil(), OpenDDS::DCPS::keyFromSample(), DDS::LENGTH_UNLIMITED, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ref(), DDS::REJECTED_BY_INSTANCES_LIMIT, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::SAMPLE_DATA, DDS::SAMPLE_REJECTED_STATUS, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
01611 { 01612 const bool is_dispose_msg = 01613 header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE || 01614 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE; 01615 const bool is_unregister_msg = 01616 header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE || 01617 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE; 01618 01619 // not filtering any data, except what is specifically identified as filtered below 01620 filtered = false; 01621 01622 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL); 01623 01624 //!!! caller should already have the sample_lock_ 01625 //We will unlock it before calling into listeners 01626 01627 typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data); 01628 01629 if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end()) 01630 { 01631 return; 01632 } 01633 01634 01635 if (it == instance_map_.end()) 01636 { 01637 std::size_t instances_size = 0; 01638 { 01639 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_); 01640 instances_size = instances_.size(); 01641 } 01642 if ((this->qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) && 01643 ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances)) 01644 { 01645 DDS::DataReaderListener_var listener 01646 = listener_for (DDS::SAMPLE_REJECTED_STATUS); 01647 01648 set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true); 01649 01650 sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT; 01651 ++sample_rejected_status_.total_count; 01652 ++sample_rejected_status_.total_count_change; 01653 sample_rejected_status_.last_instance_handle = handle; 01654 01655 if (!CORBA::is_nil(listener.in())) 01656 { 01657 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01658 01659 listener->on_sample_rejected(this, sample_rejected_status_); 01660 sample_rejected_status_.total_count_change = 0; 01661 } // do we want to do something if listener is nil??? 01662 notify_status_condition_no_sample_lock(); 01663 01664 return; 01665 } 01666 01667 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01668 SharedInstanceMap_rch inst; 01669 bool new_handle = true; 01670 if (this->is_exclusive_ownership_) { 01671 OwnershipManagerPtr owner_manager = this->ownership_manager(); 01672 01673 if (!owner_manager || owner_manager->instance_lock_acquire () != 0) { 01674 ACE_ERROR ((LM_ERROR, 01675 ACE_TEXT("(%P|%t) ") 01676 ACE_TEXT("%CDataReaderImpl::") 01677 ACE_TEXT("store_instance_data, ") 01678 ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name())); 01679 return; 01680 } 01681 01682 inst = dynamic_rchandle_cast<SharedInstanceMap>( 01683 owner_manager->get_instance_map(this->topic_servant_->type_name(), this)); 01684 if (inst != 0) { 01685 typename InstanceMap::const_iterator const iter = inst->find(*instance_data); 01686 if (iter != inst->end ()) { 01687 handle = iter->second; 01688 new_handle = false; 01689 } 01690 } 01691 } 01692 #endif 01693 01694 just_registered = true; 01695 DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get())); 01696 handle = handle == DDS::HANDLE_NIL ? this->get_next_handle( key) : handle; 01697 OpenDDS::DCPS::SubscriptionInstance_rch instance = 01698 OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>( 01699 this, 01700 this->qos_, 01701 ref(this->instances_lock_), 01702 handle); 01703 01704 instance->instance_handle_ = handle; 01705 01706 { 01707 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_); 01708 int ret = OpenDDS::DCPS::bind(instances_, handle, instance); 01709 01710 if (ret != 0) 01711 { 01712 ACE_ERROR ((LM_ERROR, 01713 ACE_TEXT("(%P|%t) ") 01714 ACE_TEXT("%CDataReaderImpl::") 01715 ACE_TEXT("store_instance_data, ") 01716 ACE_TEXT("insert handle failed. \n"), TraitsType::type_name())); 01717 return; 01718 } 01719 } 01720 01721 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01722 OwnershipManagerPtr owner_manager = this->ownership_manager(); 01723 01724 if (owner_manager) { 01725 if (!inst) { 01726 inst = make_rch<SharedInstanceMap>(); 01727 owner_manager->set_instance_map( 01728 this->topic_servant_->type_name(), 01729 inst, 01730 this); 01731 } 01732 01733 if (new_handle) { 01734 std::pair<typename InstanceMap::iterator, bool> bpair = 01735 inst->insert(typename InstanceMap::value_type(*instance_data, 01736 handle)); 01737 if (bpair.second == false) 01738 { 01739 ACE_ERROR ((LM_ERROR, 01740 ACE_TEXT("(%P|%t) ") 01741 ACE_TEXT("%CDataReaderImpl::") 01742 ACE_TEXT("store_instance_data, ") 01743 ACE_TEXT("insert to participant scope %C failed. \n"), TraitsType::type_name(), TraitsType::type_name())); 01744 return; 01745 } 01746 } 01747 01748 if (owner_manager->instance_lock_release () != 0) { 01749 ACE_ERROR ((LM_ERROR, 01750 ACE_TEXT("(%P|%t) ") 01751 ACE_TEXT("%CDataReaderImpl::") 01752 ACE_TEXT("store_instance_data, ") 01753 ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name())); 01754 return; 01755 } 01756 } 01757 #endif 01758 01759 std::pair<typename InstanceMap::iterator, bool> bpair = 01760 instance_map_.insert(typename InstanceMap::value_type(*instance_data, 01761 handle)); 01762 if (bpair.second == false) 01763 { 01764 ACE_ERROR ((LM_ERROR, 01765 ACE_TEXT("(%P|%t) ") 01766 ACE_TEXT("%CDataReaderImpl::") 01767 ACE_TEXT("store_instance_data, ") 01768 ACE_TEXT("insert %C failed. \n"), TraitsType::type_name(), TraitsType::type_name())); 01769 return; 01770 } 01771 } 01772 else 01773 { 01774 just_registered = false; 01775 handle = it->second; 01776 } 01777 01778 if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION) 01779 { 01780 instance_ptr = get_handle_instance(handle); 01781 01782 if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA) 01783 { 01784 filtered = ownership_filter_instance(instance_ptr, header.publication_id_); 01785 01786 ACE_Time_Value filter_time_expired; 01787 if (!filtered && 01788 time_based_filter_instance(instance_ptr, filter_time_expired)) { 01789 filtered = true; 01790 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 01791 filter_delayed_handler_->delay_sample(handle, move(instance_data), header, just_registered, filter_time_expired); 01792 01793 } 01794 } else { 01795 // nothing time based filtered now 01796 filter_delayed_handler_->clear_sample(handle); 01797 01798 } 01799 01800 if (filtered) 01801 { 01802 return; 01803 } 01804 } 01805 01806 finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg); 01807 } 01808 else 01809 { 01810 instance_ptr = this->get_handle_instance(handle); 01811 instance_ptr->instance_state_.lively(header.publication_id_); 01812 } 01813 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data | ( | const MessageType & | sample, | |
DDS::ViewStateKind | view | |||
) | [inline] |
Definition at line 861 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL, header, OpenDDS::DCPS::INSTANCE_REGISTRATION, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), DDS::NOT_NEW_VIEW_STATE, and OpenDDS::DCPS::SAMPLE_DATA.
Referenced by OpenDDS::RTPS::Spdp::handle_participant_data(), OpenDDS::DCPS::StaticEndpointManager::init_bit(), and OpenDDS::RTPS::Spdp::match_unauthenticated().
00863 { 00864 using namespace OpenDDS::DCPS; 00865 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, 00866 DDS::HANDLE_NIL); 00867 00868 #ifndef OPENDDS_NO_MULTI_TOPIC 00869 DDS::TopicDescription_var descr = get_topicdescription(); 00870 if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) { 00871 if (!mt->filter(sample)) { 00872 return DDS::HANDLE_NIL; 00873 } 00874 } 00875 #endif 00876 00877 get_subscriber_servant()->data_received(this); 00878 00879 DDS::InstanceHandle_t inst = lookup_instance(sample); 00880 bool filtered = false; 00881 SubscriptionInstance_rch instance; 00882 00883 // Call store_instance_data() once or twice, depending on if we need to 00884 // process the INSTANCE_REGISTRATION. In either case, store_instance_data() 00885 // owns the memory for the sample and it must come from the correct allocator. 00886 for (int i = 0; i < 2; ++i) { 00887 if (i == 0 && inst != DDS::HANDLE_NIL) continue; 00888 00889 DataSampleHeader header; 00890 const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION; 00891 header.message_id_ = static_cast<char>(msg); 00892 bool just_registered; 00893 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample)); 00894 store_instance_data(move(data), header, instance, just_registered, filtered); 00895 if (instance) inst = instance->instance_handle_; 00896 } 00897 00898 if (!filtered) { 00899 if (view == DDS::NOT_NEW_VIEW_STATE) { 00900 if (instance) instance->instance_state_.accessed(); 00901 } 00902 notify_read_conditions(); 00903 } 00904 return inst; 00905 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take | ( | OpenDDS::DCPS::AbstractSamples & | samples, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 796 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::AbstractSamples::push_back(), OpenDDS::DCPS::AbstractSamples::reserve(), and DDS::RETCODE_ERROR.
00800 { 00801 00802 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00803 guard, 00804 this->sample_lock_, 00805 DDS::RETCODE_ERROR); 00806 00807 MessageSequenceType data; 00808 DDS::SampleInfoSeq infos; 00809 DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED, 00810 sample_states, view_states, instance_states, 0); 00811 00812 samples.reserve(data.length()); 00813 00814 for (CORBA::ULong i = 0; i < data.length(); ++i) { 00815 samples.push_back(infos[i], &data[i]); 00816 } 00817 00818 return rc; 00819 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 136 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00143 { 00144 DDS::ReturnCode_t const precond = 00145 check_inputs("take", received_data, info_seq, max_samples); 00146 if (DDS::RETCODE_OK != precond) 00147 { 00148 return precond; 00149 } 00150 00151 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00152 guard, 00153 this->sample_lock_, 00154 DDS::RETCODE_ERROR); 00155 00156 return take_i(received_data, info_seq, max_samples, sample_states, 00157 view_states, instance_states, 0); 00158 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1216 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_TAKE, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.
01228 { 01229 #ifdef OPENDDS_NO_QUERY_CONDITION 01230 ACE_UNUSED_ARG(ignored); 01231 #endif 01232 01233 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01234 01235 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01236 if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS 01237 && ! this->coherent_) { 01238 return DDS::RETCODE_PRECONDITION_NOT_MET; 01239 } 01240 01241 bool group_coherent_ordered 01242 = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS 01243 && this->subqos_.presentation.coherent_access 01244 && this->subqos_.presentation.ordered_access; 01245 01246 if (group_coherent_ordered && this->coherent_) { 01247 max_samples = 1; 01248 } 01249 #endif 01250 01251 OpenDDS::DCPS::RakeResults< MessageSequenceType > 01252 results(this, received_data, info_seq, max_samples, 01253 this->subqos_.presentation, 01254 #ifndef OPENDDS_NO_QUERY_CONDITION 01255 a_condition, 01256 #endif 01257 OpenDDS::DCPS::DDS_OPERATION_TAKE); 01258 01259 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01260 if (! group_coherent_ordered) { 01261 #endif 01262 01263 for (typename InstanceMap::iterator it = instance_map_.begin(), 01264 the_end = instance_map_.end(); it != the_end; ++it) 01265 { 01266 DDS::InstanceHandle_t handle = it->second; 01267 01268 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle); 01269 01270 if ((inst->instance_state_.view_state() & view_states) && 01271 (inst->instance_state_.instance_state() & instance_states)) 01272 { 01273 size_t i(0); 01274 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_; 01275 item != 0; item = item->next_data_sample_) 01276 { 01277 if (item->sample_state_ & sample_states 01278 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01279 && !item->coherent_change_ 01280 #endif 01281 ) 01282 { 01283 results.insert_sample(item, inst, ++i); 01284 } 01285 } 01286 } 01287 } 01288 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01289 } 01290 else { 01291 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data(); 01292 results.insert_sample(item.rde_, item.si_, item.index_in_instance_); 01293 } 01294 #endif 01295 01296 results.copy_to_user(); 01297 01298 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA; 01299 if (received_data.length()) 01300 { 01301 ret = DDS::RETCODE_OK; 01302 if (received_data.maximum() == 0) //using ZeroCopy 01303 { 01304 received_data_p.set_loaner(this); 01305 } 01306 } 01307 01308 post_read_or_take(); 01309 return ret; 01310 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 423 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00431 { 00432 DDS::ReturnCode_t const precond = 00433 check_inputs("take_instance", received_data, info_seq, max_samples); 00434 if (DDS::RETCODE_OK != precond) 00435 { 00436 return precond; 00437 } 00438 00439 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00440 guard, 00441 this->sample_lock_, 00442 DDS::RETCODE_ERROR); 00443 return take_instance_i(received_data, info_seq, max_samples, a_handle, 00444 sample_states, view_states, instance_states, 0); 00445 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1406 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.
01419 { 01420 #ifdef OPENDDS_NO_QUERY_CONDITION 01421 ACE_UNUSED_ARG(ignored); 01422 #endif 01423 01424 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01425 01426 OpenDDS::DCPS::RakeResults< MessageSequenceType > 01427 results(this, received_data, info_seq, max_samples, 01428 this->subqos_.presentation, 01429 #ifndef OPENDDS_NO_QUERY_CONDITION 01430 a_condition, 01431 #endif 01432 OpenDDS::DCPS::DDS_OPERATION_TAKE); 01433 01434 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle); 01435 if (!inst) return DDS::RETCODE_BAD_PARAMETER; 01436 01437 if ((inst->instance_state_.view_state() & view_states) && 01438 (inst->instance_state_.instance_state() & instance_states)) 01439 { 01440 size_t i(0); 01441 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_; 01442 item; item = item->next_data_sample_) 01443 { 01444 if (item->sample_state_ & sample_states 01445 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01446 && !item->coherent_change_ 01447 #endif 01448 ) 01449 { 01450 results.insert_sample(item, inst, ++i); 01451 } 01452 } 01453 } 01454 01455 results.copy_to_user(); 01456 01457 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA; 01458 if (received_data.length()) 01459 { 01460 ret = DDS::RETCODE_OK; 01461 if (received_data.maximum() == 0) //using ZeroCopy 01462 { 01463 received_data_p.set_loaner(this); 01464 } 01465 } 01466 01467 post_read_or_take(); 01468 return ret; 01469 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 487 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00493 { 00494 DDS::ReturnCode_t const precond = 00495 check_inputs("take_instance_w_condition", received_data, info_seq, 00496 max_samples); 00497 if (DDS::RETCODE_OK != precond) 00498 { 00499 return precond; 00500 } 00501 00502 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00503 DDS::RETCODE_ERROR); 00504 00505 if (!has_readcondition(a_condition)) 00506 { 00507 return DDS::RETCODE_PRECONDITION_NOT_MET; 00508 } 00509 00510 #ifndef OPENDDS_NO_QUERY_CONDITION 00511 DDS::QueryCondition_ptr query_condition = 00512 dynamic_cast< DDS::QueryCondition_ptr >(a_condition); 00513 #endif 00514 00515 return take_instance_i(received_data, info_seq, max_samples, a_handle, 00516 a_condition->get_sample_state_mask(), 00517 a_condition->get_view_state_mask(), 00518 a_condition->get_instance_state_mask(), 00519 #ifndef OPENDDS_NO_QUERY_CONDITION 00520 query_condition 00521 #else 00522 0 00523 #endif 00524 ); 00525 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [inline, virtual] |
Definition at line 547 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00555 { 00556 DDS::ReturnCode_t const precond = 00557 check_inputs("take_next_instance", received_data, info_seq, max_samples); 00558 if (DDS::RETCODE_OK != precond) 00559 { 00560 return precond; 00561 } 00562 00563 return take_next_instance_i(received_data, info_seq, max_samples, a_handle, 00564 sample_states, view_states, instance_states, 0); 00565 }
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states, | |||
DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1539 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and status.
01552 { 01553 #ifdef OPENDDS_NO_QUERY_CONDITION 01554 ACE_UNUSED_ARG(ignored); 01555 #endif 01556 01557 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL); 01558 01559 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01560 guard, 01561 this->sample_lock_, 01562 DDS::RETCODE_ERROR); 01563 01564 typename InstanceMap::iterator it; 01565 typename InstanceMap::iterator const the_end = instance_map_.end (); 01566 01567 if (a_handle == DDS::HANDLE_NIL) 01568 { 01569 it = instance_map_.begin (); 01570 } 01571 else 01572 { 01573 for (it = instance_map_.begin (); it != the_end; ++it) 01574 { 01575 if (a_handle == it->second) 01576 { 01577 ++it; 01578 break; 01579 } 01580 } 01581 } 01582 01583 for (; it != the_end; ++it) 01584 { 01585 handle = it->second; 01586 DDS::ReturnCode_t const status = 01587 take_instance_i(received_data, info_seq, max_samples, handle, 01588 sample_states, view_states, instance_states, 01589 #ifndef OPENDDS_NO_QUERY_CONDITION 01590 a_condition); 01591 #else 01592 0); 01593 #endif 01594 if (status != DDS::RETCODE_NO_DATA) 01595 { 01596 total_samples(); // see if we are empty 01597 post_read_or_take(); 01598 return status; 01599 } 01600 } 01601 post_read_or_take(); 01602 return DDS::RETCODE_NO_DATA; 01603 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
DDS::InstanceHandle_t | a_handle, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 607 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00613 { 00614 DDS::ReturnCode_t const precond = 00615 check_inputs("take_next_instance_w_condition", received_data, info_seq, 00616 max_samples); 00617 if (DDS::RETCODE_OK != precond) 00618 { 00619 return precond; 00620 } 00621 00622 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00623 DDS::RETCODE_ERROR); 00624 00625 if (!has_readcondition(a_condition)) 00626 { 00627 return DDS::RETCODE_PRECONDITION_NOT_MET; 00628 } 00629 00630 #ifndef OPENDDS_NO_QUERY_CONDITION 00631 DDS::QueryCondition_ptr query_condition = 00632 dynamic_cast< DDS::QueryCondition_ptr >(a_condition); 00633 #endif 00634 00635 return take_next_instance_i(received_data, info_seq, max_samples, a_handle, 00636 a_condition->get_sample_state_mask(), 00637 a_condition->get_view_state_mask(), 00638 a_condition->get_instance_state_mask(), 00639 #ifndef OPENDDS_NO_QUERY_CONDITION 00640 query_condition 00641 #else 00642 0 00643 #endif 00644 ); 00645 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample | ( | MessageType & | received_data, | |
DDS::SampleInfo & | sample_info | |||
) | [inline, virtual] |
Definition at line 296 of file DataReaderImpl_T.h.
References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::ReceivedDataElement::coherent_change_, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), item(), OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::ReceivedDataElement::registered_data_, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::ReceivedDataElement::sample_state_.
00299 { 00300 bool found_data = false; 00301 00302 00303 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00304 guard, 00305 this->sample_lock_, 00306 DDS::RETCODE_ERROR); 00307 00308 typename InstanceMap::iterator const the_end = instance_map_.end (); 00309 for (typename InstanceMap::iterator it = instance_map_.begin (); 00310 it != the_end; 00311 ++it) 00312 { 00313 DDS::InstanceHandle_t handle = it->second; 00314 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle); 00315 00316 bool mrg = false; //most_recent_generation 00317 00318 OpenDDS::DCPS::ReceivedDataElement *tail = 0; 00319 if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) && 00320 (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE)) 00321 { 00322 00323 OpenDDS::DCPS::ReceivedDataElement *next; 00324 tail = 0; 00325 OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 00326 while (item) 00327 { 00328 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00329 if (item->coherent_change_) 00330 { 00331 item = item->next_data_sample_; 00332 continue; 00333 } 00334 #endif 00335 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE) 00336 { 00337 if (item->registered_data_ != 0) 00338 { 00339 received_data = 00340 *static_cast< MessageType *> (item->registered_data_); 00341 } 00342 ptr->instance_state_.sample_info(sample_info, item); 00343 00344 item->sample_state_ = DDS::READ_SAMPLE_STATE; 00345 00346 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item); 00347 00348 if (item == ptr->rcvd_samples_.tail_) 00349 { 00350 tail = ptr->rcvd_samples_.tail_; 00351 item = item->next_data_sample_; 00352 } 00353 else 00354 { 00355 next = item->next_data_sample_; 00356 00357 ptr->rcvd_samples_.remove(item); 00358 item->dec_ref(); 00359 00360 item = next; 00361 } 00362 00363 found_data = true; 00364 } 00365 if (found_data) 00366 { 00367 break; 00368 } 00369 } 00370 } 00371 00372 if (found_data) 00373 { 00374 if (mrg) ptr->instance_state_.accessed(); 00375 00376 // 00377 // Get the sample_ranks, generation_ranks, and 00378 // absolute_generation_ranks for this info_seq 00379 // 00380 if (tail) 00381 { 00382 this->sample_info(sample_info, tail); 00383 00384 ptr->rcvd_samples_.remove(tail); 00385 tail->dec_ref(); 00386 } 00387 else 00388 { 00389 this->sample_info(sample_info, ptr->rcvd_samples_.tail_); 00390 } 00391 00392 break; 00393 } 00394 } 00395 post_read_or_take(); 00396 return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA; 00397 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition | ( | MessageSequenceType & | received_data, | |
DDS::SampleInfoSeq & | sample_info, | |||
::CORBA::Long | max_samples, | |||
DDS::ReadCondition_ptr | a_condition | |||
) | [inline, virtual] |
Definition at line 192 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00197 { 00198 DDS::ReturnCode_t const precond = 00199 check_inputs("take_w_condition", received_data, sample_info, max_samples); 00200 if (DDS::RETCODE_OK != precond) 00201 { 00202 return precond; 00203 } 00204 00205 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00206 DDS::RETCODE_ERROR); 00207 00208 if (!has_readcondition(a_condition)) 00209 { 00210 return DDS::RETCODE_PRECONDITION_NOT_MET; 00211 } 00212 00213 return take_i(received_data, sample_info, max_samples, 00214 a_condition->get_sample_state_mask(), 00215 a_condition->get_view_state_mask(), 00216 a_condition->get_instance_state_mask(), 00217 #ifndef OPENDDS_NO_QUERY_CONDITION 00218 dynamic_cast< DDS::QueryCondition_ptr >(a_condition) 00219 #else 00220 0 00221 #endif 00222 ); 00223 }
RcHandle<FilterDelayedHandler> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_handler_ [private] |
Definition at line 2383 of file DataReaderImpl_T.h.
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_ [private] |
Definition at line 2385 of file DataReaderImpl_T.h.