#include <DataReaderImpl_T.h>
Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
See the DDS specification, OMG formal/04-12-02, for a description of this interface.
Definition at line 22 of file DataReaderImpl_T.h.
typedef ::OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageType, ACE_Null_Mutex> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataAllocator |
Definition at line 36 of file DataReaderImpl_T.h.
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface |
Definition at line 38 of file DataReaderImpl_T.h.
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType |
Definition at line 32 of file DataReaderImpl_T.h.
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType |
Definition at line 31 of file DataReaderImpl_T.h.
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T | ( | void | ) | [inline] |
Constructor.
Definition at line 41 of file DataReaderImpl_T.h.
00042 : data_allocator_ (0) 00043 { 00044 }
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T | ( | void | ) | [inline, virtual] |
Destructor.
Definition at line 47 of file DataReaderImpl_T.h.
00048 { 00049 for (typename InstanceMap::iterator it = instance_map_.begin(); 00050 it != instance_map_.end(); ++it) 00051 { 00052 OpenDDS::DCPS::SubscriptionInstance* ptr = 00053 get_handle_instance(it->second); 00054 this->purge_data(ptr); 00055 } 00056 00057 delete data_allocator_; 00058 //X SHH release the data samples in the instance_map_. 00059 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan | ( | void * | seq | ) | [inline, virtual] |
This method provides virtual access to type specific code that is used when loans are automatically returned. The destructor of the sequence supporing zero-copy read calls this method on the datareader that provided the loan.
seq | - The sequence of loaned values. |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 599 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00600 { 00601 MessageSequenceType& received_data = 00602 *static_cast< MessageSequenceType*> (seq); 00603 00604 if (!received_data.release()) 00605 { 00606 // this->release_loan(received_data); 00607 received_data.length(0); 00608 } 00609 return ::DDS::RETCODE_OK; 00610 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs | ( | const char * | method_name, | |
MessageSequenceType & | received_data, | |||
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples | |||
) | [inline, private] |
Common input read* & take* input processing and precondition checks.
Definition at line 1926 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
01931 { 01932 typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data); 01933 01934 // ---- start of preconditions common to read and take ----- 01935 // SPEC ref v1.2 7.1.2.5.3.8 #1 01936 // NOTE: We can't check maximum() or release() here since those are 01937 // implementation details of the sequences. In general, the 01938 // info_seq will have release() == true and maximum() == 0. 01939 // If we're in zero-copy mode, the received_data will have 01940 // release() == false and maximum() == 0. If it's not 01941 // zero-copy then received_data will have release == true() 01942 // and maximum() == anything. 01943 if (received_data.length() != info_seq.length()) 01944 { 01945 ACE_DEBUG((LM_DEBUG, 01946 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 01947 ACE_TEXT("PRECONDITION_NOT_MET sample and info input ") 01948 ACE_TEXT("sequences do not match.\n"), 01949 TraitsType::type_name(), 01950 method_name )); 01951 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 01952 } 01953 01954 //SPEC ref v1.2 7.1.2.5.3.8 #4 01955 if ((received_data.maximum() > 0) && (received_data.release() == false)) 01956 { 01957 ACE_DEBUG((LM_DEBUG, 01958 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 01959 ACE_TEXT("PRECONDITION_NOT_MET mismatch of ") 01960 ACE_TEXT("maximum %d and owns %d\n"), 01961 TraitsType::type_name(), 01962 method_name, 01963 received_data.maximum(), 01964 received_data.release() )); 01965 01966 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 01967 } 01968 01969 if (received_data.maximum() == 0) 01970 { 01971 // not in SPEC but needed. 01972 if (max_samples == ::DDS::LENGTH_UNLIMITED) 01973 { 01974 max_samples = 01975 static_cast< ::CORBA::Long> (received_data_p.max_slots()); 01976 } 01977 } 01978 else 01979 { 01980 if (max_samples == ::DDS::LENGTH_UNLIMITED) 01981 { 01982 //SPEC ref v1.2 7.1.2.5.3.8 #5a 01983 max_samples = received_data.maximum(); 01984 } 01985 else if ( 01986 max_samples > static_cast< ::CORBA::Long> (received_data.maximum())) 01987 { 01988 //SPEC ref v1.2 7.1.2.5.3.8 #5c 01989 ACE_DEBUG((LM_DEBUG, 01990 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ") 01991 ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"), 01992 TraitsType::type_name(), 01993 method_name, 01994 max_samples, 01995 received_data.maximum())); 01996 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 01997 } 01998 //else 01999 //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below. 02000 } 02001 02002 // The spec does not say what to do in this case but it appears to be a good thing. 02003 // Note: max_slots is the greater of the sequence's maximum and init_size. 02004 if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples) 02005 { 02006 max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots()); 02007 } 02008 //---- end of preconditions common to read and take ----- 02009 02010 return ::DDS::RETCODE_OK; 02011 }
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered | ( | ::DDS::SampleStateMask | sample_states, | |
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
const OpenDDS::DCPS::FilterEvaluator & | evaluator, | |||
const ::DDS::StringSeq & | params | |||
) | [inline] |
Definition at line 649 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::FilterEvaluator::eval().
00654 { 00655 using namespace OpenDDS::DCPS; 00656 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false); 00657 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false); 00658 00659 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(), 00660 end = instances_.end(); iter != end; ++iter) { 00661 SubscriptionInstance& inst = *iter->second; 00662 00663 if ((inst.instance_state_.view_state() & view_states) && 00664 (inst.instance_state_.instance_state() & instance_states)) { 00665 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0; 00666 item = item->next_data_sample_) { 00667 if (item->sample_state_ & sample_states 00668 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00669 && !item->coherent_change_ 00670 #endif 00671 ) { 00672 if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) { 00673 return true; 00674 } 00675 } 00676 } 00677 } 00678 } 00679 00680 return false; 00681 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance *& | instance, | |||
bool & | just_registered, | |||
bool & | filtered, | |||
OpenDDS::DCPS::MarshalingType | marshaling_type | |||
) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 895 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::content_filter_, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::Serializer::reset_alignment(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::Serializer::use_rti_serialization().
00900 { 00901 MessageType* data; 00902 00903 ACE_NEW_MALLOC_NORETURN(data, 00904 static_cast< MessageType *>( 00905 data_allocator_->malloc(sizeof(MessageType))), 00906 MessageType); 00907 00908 const bool cdr = sample.header_.cdr_encapsulation_; 00909 00910 OpenDDS::DCPS::Serializer ser( 00911 sample.sample_, 00912 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 00913 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00914 00915 if (cdr) { 00916 ACE_CDR::ULong header; 00917 ser >> header; 00918 } 00919 00920 if (cdr && Serializer::use_rti_serialization()) { 00921 // Start counting byte-offset AFTER header 00922 ser.reset_alignment(); 00923 } 00924 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) { 00925 ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(*data); 00926 } else { 00927 ser >> *data; 00928 } 00929 00930 00931 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00932 if (!sample.header_.content_filter_) { // if this is true, the writer has already filtered 00933 using OpenDDS::DCPS::ContentFilteredTopicImpl; 00934 if (ContentFilteredTopicImpl* cft = 00935 dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in())) { 00936 if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA 00937 && !cft->filter(*data)) { 00938 filtered = true; 00939 return; 00940 } 00941 } 00942 } 00943 #endif 00944 00945 store_instance_data(data, sample.header_, instance, just_registered, filtered); 00946 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dec_ref_data_element | ( | ::OpenDDS::DCPS::ReceivedDataElement * | item | ) | [inline] |
Definition at line 617 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::ReceivedDataElement::dec_ref(), and OpenDDS::DCPS::ReceivedDataElement::registered_data_.
00618 { 00619 using ::OpenDDS::DCPS::ReceivedDataElement; 00620 00621 if (0 == item->dec_ref()) 00622 { 00623 if (item->registered_data_ != 0) 00624 { 00625 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00626 guard, 00627 this->sample_lock_); 00628 00629 MessageType* const ptr 00630 = static_cast< MessageType* >(item->registered_data_); 00631 ACE_DES_FREE (ptr, 00632 data_allocator_->free, 00633 MessageType ); 00634 } 00635 00636 ACE_DES_FREE (item, 00637 rd_allocator_->free, 00638 ReceivedDataElement); 00639 } 00640 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::delete_instance_map | ( | void * | map | ) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 642 of file DataReaderImpl_T.h.
00643 { 00644 InstanceMap* instances = reinterpret_cast <InstanceMap* > (map); 00645 delete instances; 00646 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance *& | instance | |||
) | [inline, protected, virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderImpl.
Definition at line 948 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, and OpenDDS::DCPS::KEY_ONLY_MARSHALING.
00950 { 00951 //!!! caller should already have the sample_lock_ 00952 00953 // The data sample in this dispose message does not contain any valid data. 00954 // What it needs here is the key value to identify the instance to dispose. 00955 // The demarshal push this "sample" to received sample list so the user 00956 // can be notified the dispose event. 00957 bool just_registered = false; 00958 bool filtered = false; 00959 OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING; 00960 if (sample.header_.key_fields_only_) { 00961 marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING; 00962 } 00963 this->dds_demarshal(sample, instance, just_registered, filtered, marshaling); 00964 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific | ( | ) | [inline, virtual] |
Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 65 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DCPS_debug_level, and DDS::RETCODE_OK.
00066 { 00067 data_allocator_ = new DataAllocator(get_n_chunks ()); 00068 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00069 ACE_DEBUG((LM_DEBUG, 00070 ACE_TEXT("(%P|%t) %CDataReaderImpl::") 00071 ACE_TEXT("enable_specific-data") 00072 ACE_TEXT(" Cached_Allocator_With_Overflow ") 00073 ACE_TEXT("%x with %d chunks\n"), 00074 TraitsType::type_name(), 00075 data_allocator_, 00076 this->get_n_chunks ())); 00077 00078 return ::DDS::RETCODE_OK; 00079 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value | ( | MessageType & | key_holder, | |
::DDS::InstanceHandle_t | handle | |||
) | [inline] |
Definition at line 560 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00563 { 00564 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00565 guard, 00566 this->sample_lock_, 00567 ::DDS::RETCODE_ERROR); 00568 00569 typename InstanceMap::iterator const the_end = instance_map_.end (); 00570 for (typename InstanceMap::iterator it = instance_map_.begin (); 00571 it != the_end; 00572 ++it) 00573 { 00574 if (it->second == handle) 00575 { 00576 key_holder = it->first; 00577 return ::DDS::RETCODE_OK; 00578 } 00579 } 00580 00581 return ::DDS::RETCODE_ERROR; 00582 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance | ( | const OpenDDS::DCPS::ReceivedDataSample & | sample, | |
OpenDDS::DCPS::SubscriptionInstance *& | instance | |||
) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 849 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, DDS::HANDLE_NIL, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::Serializer::use_rti_serialization().
00851 { 00852 //!!! caller should already have the sample_lock_ 00853 00854 MessageType data; 00855 00856 const bool cdr = sample.header_.cdr_encapsulation_; 00857 00858 ::OpenDDS::DCPS::Serializer ser( 00859 sample.sample_, 00860 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 00861 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00862 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00863 00864 if (cdr) { 00865 ACE_CDR::ULong header; 00866 ser >> header; 00867 } 00868 00869 if (cdr && Serializer::use_rti_serialization()) { 00870 // Start counting byte-offset AFTER header 00871 ser.reset_alignment(); 00872 } 00873 if (sample.header_.key_fields_only_) { 00874 ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(data); 00875 } else { 00876 ser >> data; 00877 } 00878 00879 00880 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL); 00881 typename InstanceMap::const_iterator const it = instance_map_.find(data); 00882 if (it != instance_map_.end()) { 00883 handle = it->second; 00884 } 00885 00886 if (handle == ::DDS::HANDLE_NIL) { 00887 instance = 0; 00888 } else { 00889 instance = get_handle_instance(handle); 00890 } 00891 }
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance | ( | const MessageType & | instance_data | ) | [inline] |
Definition at line 584 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL.
00586 { 00587 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data); 00588 00589 if (it == instance_map_.end()) 00590 { 00591 return ::DDS::HANDLE_NIL; 00592 } 00593 else 00594 { 00595 return it->second; 00596 } 00597 }
::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic | ( | const void * | data | ) | [inline, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 712 of file DataReaderImpl_T.h.
References lookup_instance().
00713 { 00714 return lookup_instance(*static_cast<const MessageType*>(data)); 00715 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock | ( | ) | [inline, private] |
Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.
Definition at line 1905 of file DataReaderImpl_T.h.
01906 { 01907 // This member function avoids a deadlock condition which otherwise 01908 // could occur as follows: 01909 // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and 01910 // eventually DataReaderImpl::sample_lock_ to lock in call to 01911 // DataReaderImpl::contains_samples(). 01912 // Thread2: Call to DataReaderImpl::data_received() 01913 // causes DataReaderImpl::sample_lock_ to lock and eventually 01914 // during notify of status condition a call to WaitSet::signal() 01915 // causes WaitSet::lock_ to lock. 01916 // Because the DataReaderImpl::sample_lock_ is not needed during 01917 // status notification this member function is used in 01918 // store_instance_data() to release sample_lock_ before making 01919 // the notification. 01920 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01921 notify_status_condition(); 01922 }
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP | ( | MessageType | , | |
::DDS::InstanceHandle_t | , | |||
typename TraitsType::LessThanType | ||||
) |
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data | ( | OpenDDS::DCPS::SubscriptionInstance * | instance | ) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::DataReaderImpl.
Definition at line 966 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::InstanceState::cancel_release(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, OpenDDS::DCPS::ReceivedDataElementList::remove_head(), and OpenDDS::DCPS::ReceivedDataElementList::size_.
00967 { 00968 instance->instance_state_.cancel_release(); 00969 00970 while (instance->rcvd_samples_.size_ > 0) 00971 { 00972 OpenDDS::DCPS::ReceivedDataElement* head = 00973 instance->rcvd_samples_.remove_head(); 00974 dec_ref_data_element(head); 00975 } 00976 00977 delete instance; 00978 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 81 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00088 { 00089 ::DDS::ReturnCode_t const precond = 00090 check_inputs("read", received_data, info_seq, max_samples); 00091 if (::DDS::RETCODE_OK != precond) 00092 { 00093 return precond; 00094 } 00095 00096 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00097 guard, 00098 this->sample_lock_, 00099 ::DDS::RETCODE_ERROR); 00100 00101 return read_i(received_data, info_seq, max_samples, sample_states, 00102 view_states, instance_states, 0); 00103 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic | ( | OpenDDS::DCPS::DataReaderImpl::GenericBundle & | gen, | |
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
bool | adjust_ref_count = false | |||
) | [inline] |
Definition at line 683 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, DDS::LENGTH_UNLIMITED, DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_.
00688 { 00689 00690 MessageSequenceType data; 00691 ::DDS::ReturnCode_t rc; 00692 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00693 guard, 00694 this->sample_lock_, 00695 ::DDS::RETCODE_ERROR); 00696 { 00697 rc = read_i(data, gen.info_, 00698 ::DDS::LENGTH_UNLIMITED, 00699 sample_states, view_states, instance_states, 0); 00700 if (true == adjust_ref_count ) { 00701 data.increment_references(); 00702 } 00703 } 00704 gen.samples_.reserve(data.length()); 00705 for (CORBA::ULong i = 0; i < data.length(); ++i) { 00706 gen.samples_.push_back(&data[i]); 00707 } 00708 return rc; 00709 00710 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 999 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DDS_OPERATION_READ, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.
01011 { 01012 #ifdef OPENDDS_NO_QUERY_CONDITION 01013 ACE_UNUSED_ARG(ignored); 01014 #endif 01015 01016 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01017 01018 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01019 if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS 01020 && ! this->coherent_) { 01021 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 01022 } 01023 01024 bool group_coherent_ordered 01025 = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS 01026 && this->subqos_.presentation.coherent_access 01027 && this->subqos_.presentation.ordered_access; 01028 01029 if (group_coherent_ordered && this->coherent_) { 01030 max_samples = 1; 01031 } 01032 #endif 01033 01034 ::OpenDDS::DCPS::RakeResults< MessageSequenceType > 01035 results(this, received_data, info_seq, max_samples, 01036 this->subqos_.presentation, 01037 #ifndef OPENDDS_NO_QUERY_CONDITION 01038 a_condition, 01039 #endif 01040 ::OpenDDS::DCPS::DDS_OPERATION_READ); 01041 01042 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01043 if (! group_coherent_ordered) { 01044 #endif 01045 for (typename InstanceMap::iterator it = instance_map_.begin(), 01046 the_end = instance_map_.end(); it != the_end; ++it) 01047 { 01048 ::DDS::InstanceHandle_t handle = it->second; 01049 01050 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle); 01051 01052 if ((inst->instance_state_.view_state() & view_states) && 01053 (inst->instance_state_.instance_state() & instance_states)) 01054 { 01055 size_t i(0); 01056 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_; 01057 item != 0; item = item->next_data_sample_) 01058 { 01059 if (item->sample_state_ & sample_states 01060 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01061 && !item->coherent_change_ 01062 #endif 01063 ) 01064 { 01065 results.insert_sample(item, inst, ++i); 01066 } 01067 } 01068 } 01069 } 01070 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01071 } 01072 else { 01073 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data(); 01074 results.insert_sample(item.rde_, item.si_, item.index_in_instance_); 01075 } 01076 #endif 01077 01078 results.copy_to_user(); 01079 01080 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA; 01081 if (received_data.length()) 01082 { 01083 ret = ::DDS::RETCODE_OK; 01084 if (received_data.maximum() == 0) //using ZeroCopy 01085 { 01086 received_data_p.set_loaner(this); 01087 } 01088 } 01089 01090 post_read_or_take(); 01091 01092 return ret; 01093 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 368 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00376 { 00377 ::DDS::ReturnCode_t const precond = 00378 check_inputs("read_instance", received_data, info_seq, max_samples); 00379 if (::DDS::RETCODE_OK != precond) 00380 { 00381 return precond; 00382 } 00383 00384 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00385 guard, 00386 this->sample_lock_, 00387 ::DDS::RETCODE_ERROR); 00388 return read_instance_i(received_data, info_seq, max_samples, a_handle, 00389 sample_states, view_states, instance_states, 0); 00390 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic | ( | void *& | data, | |
::DDS::SampleInfo & | info, | |||
::DDS::InstanceHandle_t | instance, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 743 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.
00747 { 00748 MessageSequenceType dataseq; 00749 ::DDS::SampleInfoSeq infoseq; 00750 ::DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq, 00751 ::DDS::LENGTH_UNLIMITED, instance, sample_states, view_states, 00752 instance_states, 0); 00753 if (rc == ::DDS::RETCODE_NO_DATA) return rc; 00754 const CORBA::ULong last = dataseq.length() - 1; 00755 data = new MessageType(dataseq[last]); 00756 info = infoseq[last]; 00757 return rc; 00758 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1191 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DDS_OPERATION_READ, OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().
01204 { 01205 #ifdef OPENDDS_NO_QUERY_CONDITION 01206 ACE_UNUSED_ARG(ignored); 01207 #endif 01208 01209 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01210 01211 ::OpenDDS::DCPS::RakeResults< MessageSequenceType > 01212 results(this, received_data, info_seq, max_samples, 01213 this->subqos_.presentation, 01214 #ifndef OPENDDS_NO_QUERY_CONDITION 01215 a_condition, 01216 #endif 01217 ::OpenDDS::DCPS::DDS_OPERATION_READ); 01218 01219 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle); 01220 if (inst == 0) return ::DDS::RETCODE_BAD_PARAMETER; 01221 01222 if ((inst->instance_state_.view_state() & view_states) && 01223 (inst->instance_state_.instance_state() & instance_states)) 01224 { 01225 size_t i(0); 01226 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_; 01227 item; item = item->next_data_sample_) 01228 { 01229 if (item->sample_state_ & sample_states 01230 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01231 && !item->coherent_change_ 01232 #endif 01233 ) 01234 { 01235 results.insert_sample(item, inst, ++i); 01236 } 01237 } 01238 } 01239 01240 results.copy_to_user(); 01241 01242 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA; 01243 if (received_data.length()) 01244 { 01245 ret = ::DDS::RETCODE_OK; 01246 if (received_data.maximum() == 0) //using ZeroCopy 01247 { 01248 received_data_p.set_loaner(this); 01249 } 01250 } 01251 01252 post_read_or_take(); 01253 return ret; 01254 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 416 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00424 { 00425 ::DDS::ReturnCode_t const precond = 00426 check_inputs("read_next_instance", received_data, info_seq, max_samples); 00427 if (::DDS::RETCODE_OK != precond) 00428 { 00429 return precond; 00430 } 00431 00432 return read_next_instance_i(received_data, info_seq, max_samples, a_handle, 00433 sample_states, view_states, instance_states, 0); 00434 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic | ( | void *& | data, | |
::DDS::SampleInfo & | info, | |||
::DDS::InstanceHandle_t | previous_instance, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 760 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.
00764 { 00765 MessageSequenceType dataseq; 00766 ::DDS::SampleInfoSeq infoseq; 00767 ::DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq, 00768 ::DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states, 00769 instance_states, 0); 00770 if (rc == ::DDS::RETCODE_NO_DATA) return rc; 00771 const CORBA::ULong last = dataseq.length() - 1; 00772 data = new MessageType(dataseq[last]); 00773 info = infoseq[last]; 00774 return rc; 00775 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1320 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, and DDS::RETCODE_NO_DATA.
01333 { 01334 #ifdef OPENDDS_NO_QUERY_CONDITION 01335 ACE_UNUSED_ARG(ignored); 01336 #endif 01337 01338 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL); 01339 01340 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01341 guard, 01342 this->sample_lock_, 01343 ::DDS::RETCODE_ERROR); 01344 01345 typename InstanceMap::iterator it; 01346 typename InstanceMap::iterator const the_end = instance_map_.end (); 01347 01348 if (a_handle == ::DDS::HANDLE_NIL) 01349 { 01350 it = instance_map_.begin (); 01351 } 01352 else 01353 { 01354 for (it = instance_map_.begin (); 01355 it != the_end; 01356 ++it) 01357 { 01358 if (a_handle == it->second) 01359 { 01360 ++it; 01361 break; 01362 } 01363 } 01364 } 01365 01366 for (; it != the_end; ++it) 01367 { 01368 handle = it->second; 01369 ::DDS::ReturnCode_t const status = 01370 read_instance_i(received_data, info_seq, max_samples, handle, 01371 sample_states, view_states, instance_states, 01372 #ifndef OPENDDS_NO_QUERY_CONDITION 01373 a_condition); 01374 #else 01375 0); 01376 #endif 01377 if (status != ::DDS::RETCODE_NO_DATA) 01378 { 01379 post_read_or_take(); 01380 return status; 01381 } 01382 } 01383 01384 post_read_or_take(); 01385 return ::DDS::RETCODE_NO_DATA; 01386 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::ReadCondition_ptr | a_condition | |||
) | [inline] |
Definition at line 456 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00462 { 00463 ::DDS::ReturnCode_t const precond = 00464 check_inputs("read_next_instance_w_condition", received_data, info_seq, 00465 max_samples); 00466 if (::DDS::RETCODE_OK != precond) 00467 { 00468 return precond; 00469 } 00470 00471 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00472 ::DDS::RETCODE_ERROR); 00473 00474 if (!has_readcondition(a_condition)) 00475 { 00476 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00477 } 00478 00479 #ifndef OPENDDS_NO_QUERY_CONDITION 00480 ::DDS::QueryCondition_ptr query_condition = 00481 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition); 00482 #endif 00483 00484 return read_next_instance_i(received_data, info_seq, max_samples, a_handle, 00485 a_condition->get_sample_state_mask(), 00486 a_condition->get_view_state_mask(), 00487 a_condition->get_instance_state_mask(), 00488 #ifndef OPENDDS_NO_QUERY_CONDITION 00489 query_condition 00490 #else 00491 0 00492 #endif 00493 ); 00494 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample | ( | MessageType & | received_data, | |
::DDS::SampleInfo & | sample_info | |||
) | [inline] |
Definition at line 194 of file DataReaderImpl_T.h.
References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.
00197 { 00198 00199 bool found_data = false; 00200 00201 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00202 guard, 00203 this->sample_lock_, 00204 ::DDS::RETCODE_ERROR); 00205 00206 typename InstanceMap::iterator const the_end = instance_map_.end (); 00207 for (typename InstanceMap::iterator it = instance_map_.begin (); 00208 it != the_end; 00209 ++it) 00210 { 00211 ::DDS::InstanceHandle_t handle = it->second; 00212 OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle); 00213 00214 bool mrg = false; //most_recent_generation 00215 00216 if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) && 00217 (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE)) 00218 { 00219 for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_; 00220 item != 0; 00221 item = item->next_data_sample_) 00222 { 00223 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00224 if (item->coherent_change_) continue; 00225 #endif 00226 00227 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE) 00228 { 00229 if (item->registered_data_ != 0) 00230 { 00231 received_data = 00232 *static_cast< MessageType *> (item->registered_data_); 00233 } 00234 ptr->instance_state_.sample_info(sample_info, item); 00235 00236 item->sample_state_ = ::DDS::READ_SAMPLE_STATE; 00237 00238 00239 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item); 00240 00241 found_data = true; 00242 } 00243 if (found_data) 00244 { 00245 break; 00246 } 00247 } 00248 } 00249 00250 if (found_data) 00251 { 00252 if (mrg) ptr->instance_state_.accessed(); 00253 00254 // Get the sample_ranks, generation_ranks, and 00255 // absolute_generation_ranks for this info_seq 00256 this->sample_info(sample_info, ptr->rcvd_samples_.tail_); 00257 00258 break; 00259 } 00260 } 00261 post_read_or_take(); 00262 return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA; 00263 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | sample_info, | |||
::CORBA::Long | max_samples, | |||
::DDS::ReadCondition_ptr | a_condition | |||
) | [inline] |
Definition at line 129 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00134 { 00135 ::DDS::ReturnCode_t const precond = 00136 check_inputs("read_w_condition", received_data, sample_info, max_samples); 00137 if (::DDS::RETCODE_OK != precond) 00138 { 00139 return precond; 00140 } 00141 00142 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00143 ::DDS::RETCODE_ERROR); 00144 00145 if (!has_readcondition(a_condition)) 00146 { 00147 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00148 } 00149 00150 return read_i(received_data, sample_info, max_samples, 00151 a_condition->get_sample_state_mask(), 00152 a_condition->get_view_state_mask(), 00153 a_condition->get_instance_state_mask(), 00154 #ifndef OPENDDS_NO_QUERY_CONDITION 00155 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition)); 00156 #else 00157 0); 00158 #endif 00159 }
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i | ( | ::DDS::InstanceHandle_t | handle | ) | [inline, protected, virtual] |
Definition at line 980 of file DataReaderImpl_T.h.
00981 { 00982 typename InstanceMap::iterator const the_end = instance_map_.end (); 00983 typename InstanceMap::iterator it = instance_map_.begin (); 00984 while (it != the_end) 00985 { 00986 if (it->second == handle) 00987 { 00988 typename InstanceMap::iterator curIt = it; 00989 ++ it; 00990 instance_map_.erase (curIt); 00991 } 00992 else 00993 ++ it; 00994 } 00995 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan | ( | MessageSequenceType & | received_data | ) | [inline] |
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq | |||
) | [inline] |
Definition at line 536 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00539 { 00540 // Some incomplete tests to see that the data and info are from the 00541 // same read. 00542 if (received_data.length() != info_seq.length()) 00543 { 00544 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00545 } 00546 00547 if (received_data.release()) 00548 { 00549 // nothing to do because this is not zero-copy data 00550 return ::DDS::RETCODE_OK; 00551 } 00552 else 00553 { 00554 info_seq.length(0); 00555 received_data.length(0); 00556 } 00557 return ::DDS::RETCODE_OK; 00558 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state | ( | ::DDS::InstanceHandle_t | instance, | |
::DDS::InstanceStateKind | state | |||
) | [inline] |
Definition at line 827 of file DataReaderImpl_T.h.
References DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::DISPOSE_INSTANCE, get_key_value(), header, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
00829 { 00830 using namespace OpenDDS::DCPS; 00831 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_); 00832 00833 SubscriptionInstance* si = get_handle_instance(instance); 00834 if (si && state != ::DDS::ALIVE_INSTANCE_STATE) { 00835 DataSampleHeader header; 00836 header.message_id_ = (state == ::DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) 00837 ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE; 00838 bool just_registered, filtered; 00839 MessageType* data; 00840 ACE_NEW_MALLOC_NORETURN(data, 00841 static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))), 00842 MessageType); 00843 get_key_value(*data, instance); 00844 store_instance_data(data, header, si, just_registered, filtered); 00845 notify_read_conditions(); 00846 } 00847 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data | ( | MessageType * | instance_data, | |
const OpenDDS::DCPS::DataSampleHeader & | header, | |||
OpenDDS::DCPS::SubscriptionInstance *& | instance_ptr, | |||
bool & | just_registered, | |||
bool & | filtered | |||
) | [inline, private] |
Definition at line 1454 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::ReceivedDataStrategy::add(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::ReceivedDataElement::coherent_change_, DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, OpenDDS::DCPS::InstanceState::data_was_received(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::InstanceState::dispose_was_received(), OpenDDS::DCPS::InstanceState::disposed_generation_count(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::HANDLE_NIL, OpenDDS::DCPS::ReceivedDataElementList::head_, header, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::keyFromSample(), OpenDDS::DCPS::SubscriptionInstance::last_sequence_, DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::SubscriberImpl::listener_for(), OpenDDS::DCPS::InstanceState::lively(), OpenDDS::DCPS::InstanceState::no_writers_generation_count(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::NOT_READ_SAMPLE_STATE, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, OpenDDS::DCPS::SubscriptionInstance::rcvd_strategy_, DDS::REJECTED_BY_INSTANCES_LIMIT, DDS::REJECTED_BY_SAMPLES_LIMIT, DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT, OpenDDS::DCPS::ReceivedDataElementList::remove(), OpenDDS::DCPS::SAMPLE_DATA, DDS::SAMPLE_LOST_STATUS, DDS::SAMPLE_REJECTED_STATUS, OpenDDS::DCPS::ReceivedDataElement::sample_state_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::ReceivedDataElementList::size_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and OpenDDS::DCPS::InstanceState::unregister_was_received().
01460 { 01461 const bool is_dispose_msg = 01462 header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE || 01463 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE; 01464 const bool is_unregister_msg = 01465 header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE || 01466 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE; 01467 01468 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL); 01469 01470 //!!! caller should already have the sample_lock_ 01471 //We will unlock it before calling into listeners 01472 01473 typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data); 01474 01475 if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end()) 01476 { 01477 ACE_DES_FREE (instance_data, 01478 data_allocator_->free, 01479 MessageType ); 01480 instance_data = 0; 01481 return; 01482 } 01483 01484 01485 if (it == instance_map_.end()) 01486 { 01487 std::size_t instances_size = 0; 01488 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 01489 instances_size = instances_.size(); 01490 } 01491 if ((this->qos_.resource_limits.max_instances != ::DDS::LENGTH_UNLIMITED) && 01492 ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances)) 01493 { 01494 01495 ::DDS::DataReaderListener_var listener 01496 = listener_for (::DDS::SAMPLE_REJECTED_STATUS); 01497 01498 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true); 01499 01500 sample_rejected_status_.last_reason = 01501 ::DDS::REJECTED_BY_INSTANCES_LIMIT; 01502 ++sample_rejected_status_.total_count; 01503 ++sample_rejected_status_.total_count_change; 01504 sample_rejected_status_.last_instance_handle = handle; 01505 01506 ::DDS::DataReader_var dr = get_dr_obj_ref(); 01507 if (!CORBA::is_nil(listener.in())) 01508 { 01509 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01510 01511 listener->on_sample_rejected(dr.in (), 01512 sample_rejected_status_); 01513 } // do we want to do something if listener is nil??? 01514 notify_status_condition_no_sample_lock(); 01515 01516 ACE_DES_FREE (instance_data, 01517 data_allocator_->free, 01518 MessageType ); 01519 01520 return; 01521 } 01522 01523 // first find the instance mapin the participant instance map. 01524 // if the instance map for the type is not registered, then 01525 // create the instance map. 01526 // if the instance map for the type exists, then find the 01527 // handle of the instance. If the instance is not registered 01528 // 01529 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01530 InstanceMap* inst = 0; 01531 bool new_handle = true; 01532 if (this->is_exclusive_ownership_) { 01533 if (this->owner_manager_->instance_lock_acquire () != 0) { 01534 ACE_ERROR ((LM_ERROR, 01535 ACE_TEXT("(%P|%t) ") 01536 ACE_TEXT("%CDataReaderImpl::") 01537 ACE_TEXT("store_instance_data, ") 01538 ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name())); 01539 return; 01540 } 01541 01542 inst = (InstanceMap*)( 01543 this->owner_manager_->get_instance_map(this->topic_servant_->type_name(), this)); 01544 if (inst != 0) { 01545 typename InstanceMap::const_iterator const iter = inst->find(*instance_data); 01546 if (iter != inst->end ()) { 01547 handle = iter->second; 01548 new_handle = false; 01549 } 01550 } 01551 } 01552 #endif 01553 01554 just_registered = true; 01555 OpenDDS::DCPS::SubscriptionInstance* instance = 0; 01556 ::DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(instance_data); 01557 handle = handle == ::DDS::HANDLE_NIL ? this->get_next_handle( key) : handle; 01558 ACE_NEW (instance, 01559 OpenDDS::DCPS::SubscriptionInstance(this, 01560 this->qos_, 01561 this->instances_lock_, 01562 handle)); 01563 01564 instance->instance_handle_ = handle; 01565 01566 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 01567 int ret = OpenDDS::DCPS::bind(instances_, handle, instance); 01568 01569 if (ret != 0) 01570 { 01571 ACE_ERROR ((LM_ERROR, 01572 ACE_TEXT("(%P|%t) ") 01573 ACE_TEXT("%CDataReaderImpl::") 01574 ACE_TEXT("store_instance_data, ") 01575 ACE_TEXT("insert handle failed. \n"), TraitsType::type_name())); 01576 return; 01577 } 01578 } 01579 01580 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 01581 if (this->is_exclusive_ownership_) { 01582 if (inst == 0) { 01583 inst = new InstanceMap (); 01584 this->owner_manager_->set_instance_map( 01585 this->topic_servant_->type_name(), reinterpret_cast <void* > (inst), this); 01586 } 01587 01588 if (new_handle) { 01589 std::pair<typename InstanceMap::iterator, bool> bpair = 01590 inst->insert(typename InstanceMap::value_type(*instance_data, 01591 handle)); 01592 if (bpair.second == false) 01593 { 01594 ACE_ERROR ((LM_ERROR, 01595 ACE_TEXT("(%P|%t) ") 01596 ACE_TEXT("%CDataReaderImpl::") 01597 ACE_TEXT("store_instance_data, ") 01598 ACE_TEXT("insert to participant scope %s failed. \n"), TraitsType::type_name(), TraitsType::type_name())); 01599 return; 01600 } 01601 } 01602 01603 if (this->owner_manager_->instance_lock_release () != 0) { 01604 ACE_ERROR ((LM_ERROR, 01605 ACE_TEXT("(%P|%t) ") 01606 ACE_TEXT("%CDataReaderImpl::") 01607 ACE_TEXT("store_instance_data, ") 01608 ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name())); 01609 return; 01610 } 01611 } 01612 #endif 01613 01614 std::pair<typename InstanceMap::iterator, bool> bpair = 01615 instance_map_.insert(typename InstanceMap::value_type(*instance_data, 01616 handle)); 01617 if (bpair.second == false) 01618 { 01619 ACE_ERROR ((LM_ERROR, 01620 ACE_TEXT("(%P|%t) ") 01621 ACE_TEXT("%CDataReaderImpl::") 01622 ACE_TEXT("store_instance_data, ") 01623 ACE_TEXT("insert %s failed. \n"), TraitsType::type_name(), TraitsType::type_name())); 01624 return; 01625 } 01626 } 01627 else 01628 { 01629 just_registered = false; 01630 handle = it->second; 01631 } 01632 01633 if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION) 01634 { 01635 instance_ptr = get_handle_instance(handle); 01636 01637 if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA) 01638 { 01639 // Check instance based QoS policy filters 01640 // (i.e. OWNERSHIP, TIME_BASED_FILTER) 01641 filtered = this->filter_instance(instance_ptr,header.publication_id_); 01642 01643 if (filtered) 01644 { 01645 ACE_DES_FREE (instance_data, 01646 data_allocator_->free, 01647 MessageType ); 01648 return; 01649 } 01650 } 01651 01652 if ((this->qos_.resource_limits.max_samples_per_instance != 01653 ::DDS::LENGTH_UNLIMITED) && 01654 (instance_ptr->rcvd_samples_.size_ >= 01655 this->qos_.resource_limits.max_samples_per_instance)) 01656 { 01657 01658 // According to spec 1.2, Samples that contain no data do not 01659 // count towards the limits imposed by the RESOURCE_LIMITS QoS policy 01660 // so do not remove the oldest sample when unregister/dispose 01661 // message arrives. 01662 01663 if (! is_dispose_msg && ! is_unregister_msg 01664 && instance_ptr->rcvd_samples_.head_->sample_state_ 01665 == ::DDS::NOT_READ_SAMPLE_STATE) 01666 { 01667 // for now the implemented QoS means that if the head sample 01668 // is NOT_READ then none are read. 01669 // TBD - in future we will reads may not read in order so 01670 // just looking at the head will not be enough. 01671 ::DDS::DataReaderListener_var listener 01672 = listener_for (::DDS::SAMPLE_REJECTED_STATUS); 01673 01674 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true); 01675 01676 sample_rejected_status_.last_reason = 01677 ::DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT; 01678 ++sample_rejected_status_.total_count; 01679 ++sample_rejected_status_.total_count_change; 01680 sample_rejected_status_.last_instance_handle = handle; 01681 01682 ::DDS::DataReader_var dr = get_dr_obj_ref(); 01683 if (!CORBA::is_nil(listener.in())) 01684 { 01685 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01686 01687 listener->on_sample_rejected(dr.in (), 01688 sample_rejected_status_); 01689 } // do we want to do something if listener is nil??? 01690 notify_status_condition_no_sample_lock(); 01691 01692 ACE_DES_FREE (instance_data, 01693 data_allocator_->free, 01694 MessageType ); 01695 01696 return; 01697 } 01698 else if (! is_dispose_msg && ! is_unregister_msg) 01699 { 01700 // Discard the oldest previously-read sample 01701 OpenDDS::DCPS::ReceivedDataElement *item = 01702 instance_ptr->rcvd_samples_.head_; 01703 instance_ptr->rcvd_samples_.remove(item); 01704 dec_ref_data_element(item); 01705 } 01706 } 01707 else if (this->qos_.resource_limits.max_samples != ::DDS::LENGTH_UNLIMITED) 01708 { 01709 CORBA::Long total_samples = 0; 01710 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_); 01711 for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin(); 01712 iter != instances_.end(); 01713 ++iter) { 01714 OpenDDS::DCPS::SubscriptionInstance *ptr = iter->second; 01715 01716 total_samples += (CORBA::Long) ptr->rcvd_samples_.size_; 01717 } 01718 } 01719 01720 if(total_samples >= this->qos_.resource_limits.max_samples) 01721 { 01722 // According to spec 1.2, Samples that contain no data do not 01723 // count towards the limits imposed by the RESOURCE_LIMITS QoS policy 01724 // so do not remove the oldest sample when unregister/dispose 01725 // message arrives. 01726 01727 if (! is_dispose_msg && ! is_unregister_msg 01728 && instance_ptr->rcvd_samples_.head_->sample_state_ 01729 == ::DDS::NOT_READ_SAMPLE_STATE) 01730 { 01731 // for now the implemented QoS means that if the head sample 01732 // is NOT_READ then none are read. 01733 // TBD - in future we will reads may not read in order so 01734 // just looking at the head will not be enough. 01735 ::DDS::DataReaderListener_var listener 01736 = listener_for (::DDS::SAMPLE_REJECTED_STATUS); 01737 01738 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true); 01739 01740 sample_rejected_status_.last_reason = 01741 ::DDS::REJECTED_BY_SAMPLES_LIMIT; 01742 ++sample_rejected_status_.total_count; 01743 ++sample_rejected_status_.total_count_change; 01744 sample_rejected_status_.last_instance_handle = handle; 01745 ::DDS::DataReader_var dr = get_dr_obj_ref(); 01746 if (!CORBA::is_nil(listener.in())) 01747 { 01748 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01749 01750 listener->on_sample_rejected(dr.in (), 01751 sample_rejected_status_); 01752 } // do we want to do something if listener is nil??? 01753 notify_status_condition_no_sample_lock(); 01754 01755 ACE_DES_FREE (instance_data, 01756 data_allocator_->free, 01757 MessageType ); 01758 01759 return; 01760 } 01761 else if (! is_dispose_msg && ! is_unregister_msg) 01762 { 01763 // Discard the oldest previously-read sample 01764 OpenDDS::DCPS::ReceivedDataElement *item = 01765 instance_ptr->rcvd_samples_.head_; 01766 instance_ptr->rcvd_samples_.remove(item); 01767 dec_ref_data_element(item); 01768 } 01769 } 01770 } 01771 01772 if (is_dispose_msg || is_unregister_msg) 01773 { 01774 ACE_DES_FREE (instance_data, 01775 data_allocator_->free, 01776 MessageType ); 01777 instance_data = 0; 01778 } 01779 01780 bool event_notify = false; 01781 01782 if (is_dispose_msg) { 01783 event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_); 01784 } 01785 01786 if (is_unregister_msg) { 01787 if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) { 01788 event_notify = true; 01789 } 01790 } 01791 01792 if (!is_dispose_msg && !is_unregister_msg) { 01793 event_notify = true; 01794 instance_ptr->instance_state_.data_was_received(header.publication_id_); 01795 } 01796 01797 if (!event_notify) { 01798 return; 01799 } 01800 01801 OpenDDS::DCPS::ReceivedDataElement *ptr; 01802 ACE_NEW_MALLOC (ptr, 01803 static_cast<OpenDDS::DCPS::ReceivedDataElement *> ( 01804 rd_allocator_->malloc ( 01805 sizeof (OpenDDS::DCPS::ReceivedDataElement))), 01806 OpenDDS::DCPS::ReceivedDataElement(header, 01807 instance_data)); 01808 01809 ptr->disposed_generation_count_ = 01810 instance_ptr->instance_state_.disposed_generation_count(); 01811 ptr->no_writers_generation_count_ = 01812 instance_ptr->instance_state_.no_writers_generation_count(); 01813 01814 instance_ptr->last_sequence_ = header.sequence_; 01815 01816 instance_ptr->rcvd_strategy_->add(ptr); 01817 01818 if (! is_dispose_msg && ! is_unregister_msg 01819 && instance_ptr->rcvd_samples_.size_ > get_depth()) 01820 { 01821 OpenDDS::DCPS::ReceivedDataElement* head_ptr = 01822 instance_ptr->rcvd_samples_.head_; 01823 01824 instance_ptr->rcvd_samples_.remove(head_ptr); 01825 01826 if (head_ptr->sample_state_ == ::DDS::NOT_READ_SAMPLE_STATE) 01827 { 01828 ::DDS::DataReaderListener_var listener 01829 = listener_for (::DDS::SAMPLE_LOST_STATUS); 01830 01831 ++sample_lost_status_.total_count; 01832 ++sample_lost_status_.total_count_change; 01833 01834 set_status_changed_flag(::DDS::SAMPLE_LOST_STATUS, true); 01835 01836 ::DDS::DataReader_var dr = get_dr_obj_ref(); 01837 if (!CORBA::is_nil(listener.in())) 01838 { 01839 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01840 01841 listener->on_sample_lost(dr.in (), sample_lost_status_); 01842 } 01843 01844 notify_status_condition_no_sample_lock(); 01845 } 01846 01847 dec_ref_data_element(head_ptr); 01848 } 01849 01850 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01851 if (! ptr->coherent_change_) { 01852 #endif 01853 OpenDDS::DCPS::SubscriberImpl* sub = get_subscriber_servant (); 01854 01855 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true); 01856 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true); 01857 01858 ::DDS::SubscriberListener_var sub_listener = 01859 sub->listener_for(::DDS::DATA_ON_READERS_STATUS); 01860 if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_) 01861 { 01862 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01863 01864 sub_listener->on_data_on_readers(sub); 01865 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 01866 } 01867 else 01868 { 01869 sub->notify_status_condition(); 01870 01871 ::DDS::DataReaderListener_var listener = 01872 listener_for (::DDS::DATA_AVAILABLE_STATUS); 01873 01874 ::DDS::DataReader_var dr = get_dr_obj_ref(); 01875 if (!CORBA::is_nil(listener.in())) 01876 { 01877 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_); 01878 01879 listener->on_data_available(dr.in ()); 01880 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false); 01881 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false); 01882 } 01883 else 01884 { 01885 notify_status_condition_no_sample_lock(); 01886 } 01887 } 01888 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01889 } 01890 #endif 01891 } 01892 else 01893 { 01894 instance_ptr = this->get_handle_instance (handle); 01895 instance_ptr->instance_state_.lively(header.publication_id_); 01896 ACE_DES_FREE (instance_data, 01897 data_allocator_->free, 01898 MessageType ); 01899 } 01900 }
::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data | ( | const MessageType & | sample, | |
::DDS::ViewStateKind | view | |||
) | [inline] |
Definition at line 779 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::InstanceState::accessed(), DDS::HANDLE_NIL, header, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, lookup_instance(), DDS::NOT_NEW_VIEW_STATE, and OpenDDS::DCPS::SAMPLE_DATA.
00781 { 00782 using namespace OpenDDS::DCPS; 00783 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, 00784 ::DDS::HANDLE_NIL); 00785 00786 #ifndef OPENDDS_NO_MULTI_TOPIC 00787 ::DDS::TopicDescription_var descr = get_topicdescription(); 00788 if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) { 00789 if (!mt->filter(sample)) { 00790 return ::DDS::HANDLE_NIL; 00791 } 00792 } 00793 #endif 00794 00795 get_subscriber_servant()->data_received(this); 00796 00797 ::DDS::InstanceHandle_t inst = lookup_instance(sample); 00798 bool filtered; 00799 SubscriptionInstance* instance = 0; 00800 00801 // Call store_instance_data() once or twice, depending on if we need to 00802 // process the INSTANCE_REGISTRATION. In either case, store_instance_data() 00803 // owns the memory for the sample and it must come from the correct allocator. 00804 for (int i = 0; i < 2; ++i) { 00805 if (i == 0 && inst != ::DDS::HANDLE_NIL) continue; 00806 00807 DataSampleHeader header; 00808 header.message_id_ = i ? SAMPLE_DATA : INSTANCE_REGISTRATION; 00809 bool just_registered; 00810 MessageType* data; 00811 ACE_NEW_MALLOC_NORETURN(data, 00812 static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))), 00813 MessageType(sample)); 00814 store_instance_data(data, header, instance, just_registered, filtered); 00815 if (instance) inst = instance->instance_handle_; 00816 } 00817 00818 if (!filtered) { 00819 if (view == ::DDS::NOT_NEW_VIEW_STATE) { 00820 instance->instance_state_.accessed(); 00821 } 00822 notify_read_conditions(); 00823 } 00824 return inst; 00825 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take | ( | OpenDDS::DCPS::AbstractSamples & | samples, | |
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 717 of file DataReaderImpl_T.h.
References DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::AbstractSamples::push_back(), OpenDDS::DCPS::AbstractSamples::reserve(), and DDS::RETCODE_ERROR.
00721 { 00722 00723 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00724 guard, 00725 this->sample_lock_, 00726 ::DDS::RETCODE_ERROR); 00727 00728 00729 MessageSequenceType data; 00730 ::DDS::SampleInfoSeq infos; 00731 ::DDS::ReturnCode_t rc = take_i(data, infos, ::DDS::LENGTH_UNLIMITED, 00732 sample_states, view_states, instance_states, 0); 00733 00734 samples.reserve(data.length()); 00735 00736 for (CORBA::ULong i = 0; i < data.length(); ++i) { 00737 samples.push_back(infos[i], &data[i]); 00738 } 00739 00740 return rc; 00741 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 105 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00112 { 00113 ::DDS::ReturnCode_t const precond = 00114 check_inputs("take", received_data, info_seq, max_samples); 00115 if (::DDS::RETCODE_OK != precond) 00116 { 00117 return precond; 00118 } 00119 00120 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00121 guard, 00122 this->sample_lock_, 00123 ::DDS::RETCODE_ERROR); 00124 00125 return take_i(received_data, info_seq, max_samples, sample_states, 00126 view_states, instance_states, 0); 00127 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1095 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DDS_OPERATION_TAKE, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.
01107 { 01108 #ifdef OPENDDS_NO_QUERY_CONDITION 01109 ACE_UNUSED_ARG(ignored); 01110 #endif 01111 01112 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01113 01114 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01115 if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS 01116 && ! this->coherent_) { 01117 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 01118 } 01119 01120 bool group_coherent_ordered 01121 = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS 01122 && this->subqos_.presentation.coherent_access 01123 && this->subqos_.presentation.ordered_access; 01124 01125 if (group_coherent_ordered && this->coherent_) { 01126 max_samples = 1; 01127 } 01128 #endif 01129 01130 ::OpenDDS::DCPS::RakeResults< MessageSequenceType > 01131 results(this, received_data, info_seq, max_samples, 01132 this->subqos_.presentation, 01133 #ifndef OPENDDS_NO_QUERY_CONDITION 01134 a_condition, 01135 #endif 01136 ::OpenDDS::DCPS::DDS_OPERATION_TAKE); 01137 01138 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01139 if (! group_coherent_ordered) { 01140 #endif 01141 01142 for (typename InstanceMap::iterator it = instance_map_.begin(), 01143 the_end = instance_map_.end(); it != the_end; ++it) 01144 { 01145 ::DDS::InstanceHandle_t handle = it->second; 01146 01147 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle); 01148 01149 if ((inst->instance_state_.view_state() & view_states) && 01150 (inst->instance_state_.instance_state() & instance_states)) 01151 { 01152 size_t i(0); 01153 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_; 01154 item != 0; item = item->next_data_sample_) 01155 { 01156 if (item->sample_state_ & sample_states 01157 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01158 && !item->coherent_change_ 01159 #endif 01160 ) 01161 { 01162 results.insert_sample(item, inst, ++i); 01163 } 01164 } 01165 } 01166 } 01167 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01168 } 01169 else { 01170 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data(); 01171 results.insert_sample(item.rde_, item.si_, item.index_in_instance_); 01172 } 01173 #endif 01174 01175 results.copy_to_user(); 01176 01177 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA; 01178 if (received_data.length()) 01179 { 01180 ret = ::DDS::RETCODE_OK; 01181 if (received_data.maximum() == 0) //using ZeroCopy 01182 { 01183 received_data_p.set_loaner(this); 01184 } 01185 } 01186 01187 post_read_or_take(); 01188 return ret; 01189 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 392 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00400 { 00401 ::DDS::ReturnCode_t const precond = 00402 check_inputs("take_instance", received_data, info_seq, max_samples); 00403 if (::DDS::RETCODE_OK != precond) 00404 { 00405 return precond; 00406 } 00407 00408 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00409 guard, 00410 this->sample_lock_, 00411 ::DDS::RETCODE_ERROR); 00412 return take_instance_i(received_data, info_seq, max_samples, a_handle, 00413 sample_states, view_states, instance_states, 0); 00414 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1256 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().
01269 { 01270 #ifdef OPENDDS_NO_QUERY_CONDITION 01271 ACE_UNUSED_ARG(ignored); 01272 #endif 01273 01274 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data); 01275 01276 ::OpenDDS::DCPS::RakeResults< MessageSequenceType > 01277 results(this, received_data, info_seq, max_samples, 01278 this->subqos_.presentation, 01279 #ifndef OPENDDS_NO_QUERY_CONDITION 01280 a_condition, 01281 #endif 01282 ::OpenDDS::DCPS::DDS_OPERATION_TAKE); 01283 01284 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle); 01285 01286 if ((inst->instance_state_.view_state() & view_states) && 01287 (inst->instance_state_.instance_state() & instance_states)) 01288 { 01289 size_t i(0); 01290 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_; 01291 item; item = item->next_data_sample_) 01292 { 01293 if (item->sample_state_ & sample_states 01294 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 01295 && !item->coherent_change_ 01296 #endif 01297 ) 01298 { 01299 results.insert_sample(item, inst, ++i); 01300 } 01301 } 01302 } 01303 01304 results.copy_to_user(); 01305 01306 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA; 01307 if (received_data.length()) 01308 { 01309 ret = ::DDS::RETCODE_OK; 01310 if (received_data.maximum() == 0) //using ZeroCopy 01311 { 01312 received_data_p.set_loaner(this); 01313 } 01314 } 01315 01316 post_read_or_take(); 01317 return ret; 01318 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states | |||
) | [inline] |
Definition at line 436 of file DataReaderImpl_T.h.
References DDS::RETCODE_OK.
00444 { 00445 ::DDS::ReturnCode_t const precond = 00446 check_inputs("take_next_instance", received_data, info_seq, max_samples); 00447 if (::DDS::RETCODE_OK != precond) 00448 { 00449 return precond; 00450 } 00451 00452 return take_next_instance_i(received_data, info_seq, max_samples, a_handle, 00453 sample_states, view_states, instance_states, 0); 00454 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::SampleStateMask | sample_states, | |||
::DDS::ViewStateMask | view_states, | |||
::DDS::InstanceStateMask | instance_states, | |||
::DDS::QueryCondition_ptr | a_condition | |||
) | [inline, private] |
Definition at line 1388 of file DataReaderImpl_T.h.
References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, and DDS::RETCODE_NO_DATA.
01401 { 01402 #ifdef OPENDDS_NO_QUERY_CONDITION 01403 ACE_UNUSED_ARG(ignored); 01404 #endif 01405 01406 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL); 01407 01408 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01409 guard, 01410 this->sample_lock_, 01411 ::DDS::RETCODE_ERROR); 01412 01413 typename InstanceMap::iterator it; 01414 typename InstanceMap::iterator const the_end = instance_map_.end (); 01415 01416 if (a_handle == ::DDS::HANDLE_NIL) 01417 { 01418 it = instance_map_.begin (); 01419 } 01420 else 01421 { 01422 for (it = instance_map_.begin (); it != the_end; ++it) 01423 { 01424 if (a_handle == it->second) 01425 { 01426 ++it; 01427 break; 01428 } 01429 } 01430 } 01431 01432 for (; it != the_end; ++it) 01433 { 01434 handle = it->second; 01435 ::DDS::ReturnCode_t const status = 01436 take_instance_i(received_data, info_seq, max_samples, handle, 01437 sample_states, view_states, instance_states, 01438 #ifndef OPENDDS_NO_QUERY_CONDITION 01439 a_condition); 01440 #else 01441 0); 01442 #endif 01443 if (status != ::DDS::RETCODE_NO_DATA) 01444 { 01445 total_samples(); // see if we are empty 01446 post_read_or_take(); 01447 return status; 01448 } 01449 } 01450 post_read_or_take(); 01451 return ::DDS::RETCODE_NO_DATA; 01452 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | info_seq, | |||
::CORBA::Long | max_samples, | |||
::DDS::InstanceHandle_t | a_handle, | |||
::DDS::ReadCondition_ptr | a_condition | |||
) | [inline] |
Definition at line 496 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00502 { 00503 ::DDS::ReturnCode_t const precond = 00504 check_inputs("take_next_instance_w_condition", received_data, info_seq, 00505 max_samples); 00506 if (::DDS::RETCODE_OK != precond) 00507 { 00508 return precond; 00509 } 00510 00511 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00512 ::DDS::RETCODE_ERROR); 00513 00514 if (!has_readcondition(a_condition)) 00515 { 00516 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00517 } 00518 00519 #ifndef OPENDDS_NO_QUERY_CONDITION 00520 ::DDS::QueryCondition_ptr query_condition = 00521 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition); 00522 #endif 00523 00524 return take_next_instance_i(received_data, info_seq, max_samples, a_handle, 00525 a_condition->get_sample_state_mask(), 00526 a_condition->get_view_state_mask(), 00527 a_condition->get_instance_state_mask(), 00528 #ifndef OPENDDS_NO_QUERY_CONDITION 00529 query_condition 00530 #else 00531 0 00532 #endif 00533 ); 00534 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample | ( | MessageType & | received_data, | |
::DDS::SampleInfo & | sample_info | |||
) | [inline] |
Definition at line 265 of file DataReaderImpl_T.h.
References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::ReceivedDataElement::coherent_change_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::ReceivedDataElement::registered_data_, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::ReceivedDataElement::sample_state_.
00268 { 00269 bool found_data = false; 00270 00271 00272 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00273 guard, 00274 this->sample_lock_, 00275 ::DDS::RETCODE_ERROR); 00276 00277 typename InstanceMap::iterator const the_end = instance_map_.end (); 00278 for (typename InstanceMap::iterator it = instance_map_.begin (); 00279 it != the_end; 00280 ++it) 00281 { 00282 ::DDS::InstanceHandle_t handle = it->second; 00283 OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle); 00284 00285 bool mrg = false; //most_recent_generation 00286 00287 OpenDDS::DCPS::ReceivedDataElement *tail = 0; 00288 if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) && 00289 (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE)) 00290 { 00291 00292 OpenDDS::DCPS::ReceivedDataElement *next; 00293 tail = 0; 00294 OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_; 00295 while (item) 00296 { 00297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00298 if (item->coherent_change_) 00299 { 00300 item = item->next_data_sample_; 00301 continue; 00302 } 00303 #endif 00304 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE) 00305 { 00306 if (item->registered_data_ != 0) 00307 { 00308 received_data = 00309 *static_cast< MessageType *> (item->registered_data_); 00310 } 00311 ptr->instance_state_.sample_info(sample_info, item); 00312 00313 item->sample_state_ = ::DDS::READ_SAMPLE_STATE; 00314 00315 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item); 00316 00317 if (item == ptr->rcvd_samples_.tail_) 00318 { 00319 tail = ptr->rcvd_samples_.tail_; 00320 item = item->next_data_sample_; 00321 } 00322 else 00323 { 00324 next = item->next_data_sample_; 00325 00326 ptr->rcvd_samples_.remove(item); 00327 dec_ref_data_element(item); 00328 00329 item = next; 00330 } 00331 00332 found_data = true; 00333 } 00334 if (found_data) 00335 { 00336 break; 00337 } 00338 } 00339 } 00340 00341 if (found_data) 00342 { 00343 if (mrg) ptr->instance_state_.accessed(); 00344 00345 // 00346 // Get the sample_ranks, generation_ranks, and 00347 // absolute_generation_ranks for this info_seq 00348 // 00349 if (tail) 00350 { 00351 this->sample_info(sample_info, tail); 00352 00353 ptr->rcvd_samples_.remove(tail); 00354 dec_ref_data_element(tail); 00355 } 00356 else 00357 { 00358 this->sample_info(sample_info, ptr->rcvd_samples_.tail_); 00359 } 00360 00361 break; 00362 } 00363 } 00364 post_read_or_take(); 00365 return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA; 00366 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition | ( | MessageSequenceType & | received_data, | |
::DDS::SampleInfoSeq & | sample_info, | |||
::CORBA::Long | max_samples, | |||
::DDS::ReadCondition_ptr | a_condition | |||
) | [inline] |
Definition at line 161 of file DataReaderImpl_T.h.
References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00166 { 00167 ::DDS::ReturnCode_t const precond = 00168 check_inputs("take_w_condition", received_data, sample_info, max_samples); 00169 if (::DDS::RETCODE_OK != precond) 00170 { 00171 return precond; 00172 } 00173 00174 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 00175 ::DDS::RETCODE_ERROR); 00176 00177 if (!has_readcondition(a_condition)) 00178 { 00179 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00180 } 00181 00182 return take_i(received_data, sample_info, max_samples, 00183 a_condition->get_sample_state_mask(), 00184 a_condition->get_view_state_mask(), 00185 a_condition->get_instance_state_mask(), 00186 #ifndef OPENDDS_NO_QUERY_CONDITION 00187 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition) 00188 #else 00189 0 00190 #endif 00191 ); 00192 }
DataAllocator* OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator_ [private] |
Definition at line 2015 of file DataReaderImpl_T.h.
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_ [private] |
Definition at line 2014 of file DataReaderImpl_T.h.