OpenDDS::DCPS::DataReaderImpl_T< MessageType > Class Template Reference

#include <DataReaderImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Collaboration graph
[legend]

List of all members.

Classes

class  FilterDelayedHandler
struct  MessageTypeMemoryBlock
class  MessageTypeWithAllocator
class  SharedInstanceMap

Public Types

typedef DDSTraits< MessageType > TraitsType
typedef
TraitsType::MessageSequenceType 
MessageSequenceType
typedef RcHandle
< SharedInstanceMap
SharedInstanceMap_rch
typedef
OpenDDS::DCPS::Cached_Allocator_With_Overflow
< MessageTypeMemoryBlock,
ACE_Null_Mutex
DataAllocator
typedef TraitsType::DataReaderType Interface

Public Member Functions

typedef OPENDDS_MAP_CMP_T (MessageType, DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 DataReaderImpl_T (void)
virtual ~DataReaderImpl_T (void)
virtual DDS::ReturnCode_t enable_specific ()
virtual DDS::ReturnCode_t read (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t take (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t read_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info,::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t take_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info,::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t read_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info)
virtual DDS::ReturnCode_t take_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info)
virtual DDS::ReturnCode_t read_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t take_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t read_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t take_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t read_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t take_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t read_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t take_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t return_loan (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq)
virtual DDS::ReturnCode_t get_key_value (MessageType &key_holder, DDS::InstanceHandle_t handle)
virtual DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
virtual DDS::ReturnCode_t auto_return_loan (void *seq)
void release_loan (MessageSequenceType &received_data)
bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const OpenDDS::DCPS::FilterEvaluator &evaluator, const DDS::StringSeq &params)
DDS::ReturnCode_t read_generic (OpenDDS::DCPS::DataReaderImpl::GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
DDS::InstanceHandle_t lookup_instance_generic (const void *data)
virtual DDS::ReturnCode_t take (OpenDDS::DCPS::AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::InstanceHandle_t store_synthetic_data (const MessageType &sample, DDS::ViewStateKind view)
void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state)
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
virtual void qos_change (const DDS::DataReaderQos &qos)

Protected Member Functions

virtual void dds_demarshal (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type)
virtual void dispose_unregister (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
virtual void purge_data (OpenDDS::DCPS::SubscriptionInstance_rch instance)
virtual void release_instance_i (DDS::InstanceHandle_t handle)

Private Member Functions

DDS::ReturnCode_t read_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t take_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t read_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t take_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t read_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t take_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
void store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
void finish_store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
void notify_status_condition_no_sample_lock ()
DDS::ReturnCode_t check_inputs (const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples)
 Common input read* & take* input processing and precondition checks.
unique_ptr< DataAllocator > & data_allocator ()

Private Attributes

RcHandle< FilterDelayedHandlerfilter_delayed_handler_
InstanceMap instance_map_

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataReaderImpl_T< MessageType >

Servant for DataReader interface of Traits::MessageType data type.

See the DDS specification, OMG formal/04-12-02, for a description of this interface.

Definition at line 28 of file DataReaderImpl_T.h.


Member Typedef Documentation

Definition at line 72 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface

Definition at line 74 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType

Definition at line 38 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef RcHandle<SharedInstanceMap> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::SharedInstanceMap_rch

Definition at line 49 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType

Definition at line 37 of file DataReaderImpl_T.h.


Constructor & Destructor Documentation

template<typename MessageType>
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T ( void   )  [inline]

Definition at line 76 of file DataReaderImpl_T.h.

00077     : filter_delayed_handler_(make_rch<FilterDelayedHandler>(ref(*this)))
00078     {
00079     }

template<typename MessageType>
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T ( void   )  [inline, virtual]

Definition at line 81 of file DataReaderImpl_T.h.

00082     {
00083       for (typename InstanceMap::iterator it = instance_map_.begin();
00084            it != instance_map_.end(); ++it)
00085         {
00086           OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
00087           this->purge_data(ptr);
00088         }
00089       //X SHH release the data samples in the instance_map_.
00090     }


Member Function Documentation

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan ( void *  seq  )  [inline, virtual]

Definition at line 709 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00710   {
00711     MessageSequenceType& received_data =
00712       *static_cast< MessageSequenceType*> (seq);
00713 
00714     if (!received_data.release())
00715       {
00716         // this->release_loan(received_data);
00717         received_data.length(0);
00718       }
00719     return DDS::RETCODE_OK;
00720   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs ( const char *  method_name,
MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples 
) [inline, private]

Common input read* & take* input processing and precondition checks.

Definition at line 2067 of file DataReaderImpl_T.h.

References ACE_TEXT(), DDS::LENGTH_UNLIMITED, LM_DEBUG, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

02072 {
02073   typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data);
02074 
02075   // ---- start of preconditions common to read and take -----
02076   // SPEC ref v1.2 7.1.2.5.3.8 #1
02077   // NOTE: We can't check maximum() or release() here since those are
02078   //       implementation details of the sequences.  In general, the
02079   //       info_seq will have release() == true and maximum() == 0.
02080   //       If we're in zero-copy mode, the received_data will have
02081   //       release() == false and maximum() == 0.  If it's not
02082   //       zero-copy then received_data will have release == true()
02083   //       and maximum() == anything.
02084   if (received_data.length() != info_seq.length())
02085     {
02086       ACE_DEBUG((LM_DEBUG,
02087                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02088                  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
02089                  ACE_TEXT("sequences do not match.\n"),
02090                  TraitsType::type_name(),
02091                  method_name ));
02092       return DDS::RETCODE_PRECONDITION_NOT_MET;
02093     }
02094 
02095   //SPEC ref v1.2 7.1.2.5.3.8 #4
02096   if ((received_data.maximum() > 0) && (received_data.release() == false))
02097     {
02098       ACE_DEBUG((LM_DEBUG,
02099                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02100                  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
02101                  ACE_TEXT("maximum %d and owns %d\n"),
02102                  TraitsType::type_name(),
02103                  method_name,
02104                  received_data.maximum(),
02105                  received_data.release() ));
02106 
02107       return DDS::RETCODE_PRECONDITION_NOT_MET;
02108     }
02109 
02110   if (received_data.maximum() == 0)
02111     {
02112       // not in SPEC but needed.
02113       if (max_samples == DDS::LENGTH_UNLIMITED)
02114         {
02115           max_samples =
02116             static_cast< ::CORBA::Long> (received_data_p.max_slots());
02117         }
02118     }
02119   else
02120     {
02121       if (max_samples == DDS::LENGTH_UNLIMITED)
02122         {
02123           //SPEC ref v1.2 7.1.2.5.3.8 #5a
02124           max_samples = received_data.maximum();
02125         }
02126       else if (
02127                max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
02128         {
02129           //SPEC ref v1.2 7.1.2.5.3.8 #5c
02130           ACE_DEBUG((LM_DEBUG,
02131                      ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02132                      ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
02133                      TraitsType::type_name(),
02134                      method_name,
02135                      max_samples,
02136                      received_data.maximum()));
02137           return DDS::RETCODE_PRECONDITION_NOT_MET;
02138         }
02139       //else
02140       //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
02141     }
02142 
02143   // The spec does not say what to do in this case but it appears to be a good thing.
02144   // Note: max_slots is the greater of the sequence's maximum and init_size.
02145   if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
02146     {
02147       max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
02148     }
02149   //---- end of preconditions common to read and take -----
02150 
02151   return DDS::RETCODE_OK;
02152 }

Here is the call graph for this function:

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const OpenDDS::DCPS::FilterEvaluator evaluator,
const DDS::StringSeq params 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 728 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, item(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, and OpenDDS::DCPS::InstanceState::view_state().

00733   {
00734     using namespace OpenDDS::DCPS;
00735     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
00736     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
00737 
00738     for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
00739            end = instances_.end(); iter != end; ++iter) {
00740       SubscriptionInstance& inst = *iter->second;
00741 
00742       if ((inst.instance_state_.view_state() & view_states) &&
00743           (inst.instance_state_.instance_state() & instance_states)) {
00744         for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
00745              item = item->next_data_sample_) {
00746           if (item->sample_state_ & sample_states
00747 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00748               && !item->coherent_change_
00749 #endif
00750               && item->registered_data_) {
00751             if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) {
00752               return true;
00753             }
00754           }
00755         }
00756       }
00757     }
00758 
00759     return false;
00760   }

Here is the call graph for this function:

template<typename MessageType>
unique_ptr<DataAllocator>& OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator (  )  [inline, private]

Definition at line 2381 of file DataReaderImpl_T.h.

02381 { return filter_delayed_handler_->data_allocator_; }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance,
bool &  just_registered,
bool &  filtered,
OpenDDS::DCPS::MarshalingType  marshaling_type 
) [inline, protected, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1010 of file DataReaderImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::content_filter_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::Serializer::use_rti_serialization().

01015   {
01016     unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
01017     const bool cdr = sample.header_.cdr_encapsulation_;
01018 
01019     OpenDDS::DCPS::Serializer ser(
01020                                   sample.sample_.get(),
01021                                   sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01022                                   cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE);
01023 
01024     if (cdr) {
01025       ACE_CDR::ULong header;
01026       if (!(ser >> header)) {
01027         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01028                   ACE_TEXT("deserialization header failed, dropping sample.\n"),
01029                   TraitsType::type_name()));
01030         return;
01031       }
01032 
01033       if (Serializer::use_rti_serialization()) {
01034         // Start counting byte-offset AFTER header
01035         ser.reset_alignment();
01036       }
01037     }
01038 
01039     if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
01040       ser >> OpenDDS::DCPS::KeyOnly< MessageType>(*data);
01041     } else {
01042       ser >> *data;
01043     }
01044 
01045     if (!ser.good_bit()) {
01046       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01047                  ACE_TEXT("deserialization failed, dropping sample.\n"),
01048                  TraitsType::type_name()));
01049       return;
01050     }
01051 
01052 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01053     if (!sample.header_.content_filter_) { // if this is true, the writer has already filtered
01054       using OpenDDS::DCPS::ContentFilteredTopicImpl;
01055       if (content_filtered_topic_) {
01056         if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA
01057             && !content_filtered_topic_->filter(static_cast<MessageType&>(*data))) {
01058           filtered = true;
01059           return;
01060         }
01061       }
01062     }
01063 #endif
01064 
01065     store_instance_data(move(data), sample.header_, instance, just_registered, filtered);
01066   }

Here is the call graph for this function:

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
) [inline, protected, virtual]

!! caller should already have the sample_lock_

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 1068 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, and OpenDDS::DCPS::KEY_ONLY_MARSHALING.

01070   {
01071     //!!! caller should already have the sample_lock_
01072 
01073     // The data sample in this dispose message does not contain any valid data.
01074     // What it needs here is the key value to identify the instance to dispose.
01075     // The demarshal push this "sample" to received sample list so the user
01076     // can be notified the dispose event.
01077     bool just_registered = false;
01078     bool filtered = false;
01079     OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
01080     if (sample.header_.key_fields_only_) {
01081       marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
01082     }
01083     this->dds_demarshal(sample, instance, just_registered, filtered, marshaling);
01084   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific (  )  [inline, virtual]

Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 96 of file DataReaderImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and DDS::RETCODE_OK.

00097     {
00098       data_allocator().reset(new DataAllocator(get_n_chunks ()));
00099       if (OpenDDS::DCPS::DCPS_debug_level >= 2)
00100         ACE_DEBUG((LM_DEBUG,
00101                    ACE_TEXT("(%P|%t) %CDataReaderImpl::")
00102                    ACE_TEXT("enable_specific-data")
00103                    ACE_TEXT(" Cached_Allocator_With_Overflow ")
00104                    ACE_TEXT("%x with %d chunks\n"),
00105                    TraitsType::type_name(),
00106                    data_allocator().get(),
00107                    this->get_n_chunks ()));
00108 
00109       return DDS::RETCODE_OK;
00110     }

Here is the call graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
const DataSampleHeader header,
SubscriptionInstance_rch  instance_ptr,
bool  is_dispose_msg,
bool  is_unregister_msg 
) [inline, private]

Definition at line 1815 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::ReceivedDataElement::coherent_change_, DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), item(), DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::NOT_READ_SAMPLE_STATE, OpenDDS::DCPS::DataSampleHeader::publication_id_, DDS::REJECTED_BY_SAMPLES_LIMIT, DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::SAMPLE_LOST_STATUS, DDS::SAMPLE_REJECTED_STATUS, OpenDDS::DCPS::ReceivedDataElement::sample_state_, and OpenDDS::DCPS::DataSampleHeader::sequence_.

01817 {
01818   if ((this->qos_.resource_limits.max_samples_per_instance !=
01819         DDS::LENGTH_UNLIMITED) &&
01820       (instance_ptr->rcvd_samples_.size_ >=
01821         this->qos_.resource_limits.max_samples_per_instance)) {
01822 
01823     // According to spec 1.2, Samples that contain no data do not
01824     // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01825     // so do not remove the oldest sample when unregister/dispose
01826     // message arrives.
01827 
01828     if (!is_dispose_msg && !is_unregister_msg
01829       && instance_ptr->rcvd_samples_.head_->sample_state_
01830       == DDS::NOT_READ_SAMPLE_STATE)
01831     {
01832       // for now the implemented QoS means that if the head sample
01833       // is NOT_READ then none are read.
01834       // TBD - in future we will reads may not read in order so
01835       //       just looking at the head will not be enough.
01836       DDS::DataReaderListener_var listener
01837         = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01838 
01839       set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01840 
01841       sample_rejected_status_.last_reason =
01842         DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
01843       ++sample_rejected_status_.total_count;
01844       ++sample_rejected_status_.total_count_change;
01845       sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01846 
01847       if (!CORBA::is_nil(listener.in()))
01848       {
01849         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01850 
01851         listener->on_sample_rejected(this, sample_rejected_status_);
01852         sample_rejected_status_.total_count_change = 0;
01853       }  // do we want to do something if listener is nil???
01854       notify_status_condition_no_sample_lock();
01855       return;
01856     }
01857     else if (!is_dispose_msg && !is_unregister_msg)
01858     {
01859       // Discard the oldest previously-read sample
01860       OpenDDS::DCPS::ReceivedDataElement *item =
01861         instance_ptr->rcvd_samples_.head_;
01862       instance_ptr->rcvd_samples_.remove(item);
01863       item->dec_ref();
01864     }
01865   }
01866   else if (this->qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
01867   {
01868     CORBA::Long total_samples = 0;
01869     {
01870       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01871       for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
01872         iter != instances_.end();
01873         ++iter) {
01874         OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
01875 
01876         total_samples += (CORBA::Long) ptr->rcvd_samples_.size_;
01877       }
01878     }
01879 
01880     if (total_samples >= this->qos_.resource_limits.max_samples)
01881     {
01882       // According to spec 1.2, Samples that contain no data do not
01883       // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01884       // so do not remove the oldest sample when unregister/dispose
01885       // message arrives.
01886 
01887       if (!is_dispose_msg && !is_unregister_msg
01888         && instance_ptr->rcvd_samples_.head_->sample_state_
01889         == DDS::NOT_READ_SAMPLE_STATE)
01890       {
01891         // for now the implemented QoS means that if the head sample
01892         // is NOT_READ then none are read.
01893         // TBD - in future we will reads may not read in order so
01894         //       just looking at the head will not be enough.
01895         DDS::DataReaderListener_var listener
01896           = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01897 
01898         set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01899 
01900         sample_rejected_status_.last_reason =
01901           DDS::REJECTED_BY_SAMPLES_LIMIT;
01902         ++sample_rejected_status_.total_count;
01903         ++sample_rejected_status_.total_count_change;
01904         sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01905         if (!CORBA::is_nil(listener.in()))
01906         {
01907           ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01908 
01909           listener->on_sample_rejected(this, sample_rejected_status_);
01910           sample_rejected_status_.total_count_change = 0;
01911         }  // do we want to do something if listener is nil???
01912         notify_status_condition_no_sample_lock();
01913 
01914         return;
01915       }
01916       else if (!is_dispose_msg && !is_unregister_msg)
01917       {
01918         // Discard the oldest previously-read sample
01919         OpenDDS::DCPS::ReceivedDataElement *item =
01920           instance_ptr->rcvd_samples_.head_;
01921         instance_ptr->rcvd_samples_.remove(item);
01922         item->dec_ref();
01923       }
01924     }
01925   }
01926 
01927   if (is_dispose_msg || is_unregister_msg)
01928   {
01929     instance_data.reset();
01930   }
01931 
01932   bool event_notify = false;
01933 
01934   if (is_dispose_msg) {
01935     event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_);
01936   }
01937 
01938   if (is_unregister_msg) {
01939     if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) {
01940       event_notify = true;
01941     }
01942   }
01943 
01944   if (!is_dispose_msg && !is_unregister_msg) {
01945     event_notify = true;
01946     instance_ptr->instance_state_.data_was_received(header.publication_id_);
01947   }
01948 
01949   if (!event_notify) {
01950     return;
01951   }
01952 
01953   OpenDDS::DCPS::ReceivedDataElement *ptr =
01954     new (*rd_allocator_.get()) OpenDDS::DCPS::ReceivedDataElementWithType<MessageTypeWithAllocator>(header,instance_data.release(), &this->sample_lock_);
01955 
01956   ptr->disposed_generation_count_ =
01957     instance_ptr->instance_state_.disposed_generation_count();
01958   ptr->no_writers_generation_count_ =
01959     instance_ptr->instance_state_.no_writers_generation_count();
01960 
01961   instance_ptr->last_sequence_ = header.sequence_;
01962 
01963   instance_ptr->rcvd_strategy_->add(ptr);
01964 
01965   if (! is_dispose_msg  && ! is_unregister_msg
01966       && instance_ptr->rcvd_samples_.size_ > get_depth())
01967     {
01968       OpenDDS::DCPS::ReceivedDataElement* head_ptr =
01969         instance_ptr->rcvd_samples_.head_;
01970 
01971       instance_ptr->rcvd_samples_.remove(head_ptr);
01972 
01973       if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
01974         {
01975           DDS::DataReaderListener_var listener
01976             = listener_for (DDS::SAMPLE_LOST_STATUS);
01977 
01978           ++sample_lost_status_.total_count;
01979           ++sample_lost_status_.total_count_change;
01980 
01981           set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
01982 
01983           if (!CORBA::is_nil(listener.in()))
01984             {
01985               ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01986 
01987               listener->on_sample_lost(this, sample_lost_status_);
01988 
01989               sample_lost_status_.total_count_change = 0;
01990             }
01991 
01992           notify_status_condition_no_sample_lock();
01993         }
01994 
01995       head_ptr->dec_ref();
01996     }
01997 
01998 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01999   if (! ptr->coherent_change_) {
02000 #endif
02001     RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant ();
02002     if (!sub)
02003       return;
02004 
02005     sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true);
02006 
02007     set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
02008 
02009     DDS::SubscriberListener_var sub_listener =
02010         sub->listener_for(DDS::DATA_ON_READERS_STATUS);
02011     if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_)
02012       {
02013         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02014 
02015         sub_listener->on_data_on_readers(sub.in());
02016         sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02017       }
02018     else
02019       {
02020         sub->notify_status_condition();
02021 
02022         DDS::DataReaderListener_var listener =
02023             listener_for (DDS::DATA_AVAILABLE_STATUS);
02024 
02025         if (!CORBA::is_nil(listener.in()))
02026           {
02027             ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02028 
02029             listener->on_data_available(this);
02030             set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02031             sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02032           }
02033         else
02034           {
02035             notify_status_condition_no_sample_lock();
02036           }
02037       }
02038 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02039   }
02040 #endif
02041 }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
DDS::InstanceHandle_t  handle 
) [inline, virtual]

Definition at line 671 of file DataReaderImpl_T.h.

References DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00674   {
00675     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00676                       guard,
00677                       this->sample_lock_,
00678                       DDS::RETCODE_ERROR);
00679 
00680     typename InstanceMap::iterator const the_end = instance_map_.end ();
00681     for (typename InstanceMap::iterator it = instance_map_.begin ();
00682          it != the_end;
00683          ++it)
00684       {
00685         if (it->second == handle)
00686           {
00687             key_holder = it->first;
00688             return DDS::RETCODE_OK;
00689           }
00690       }
00691 
00692     return DDS::RETCODE_BAD_PARAMETER;
00693   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
) [inline, virtual]

!! caller should already have the sample_lock_

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 930 of file DataReaderImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_ERROR, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::Serializer::use_rti_serialization().

00932   {
00933     //!!! caller should already have the sample_lock_
00934 
00935     MessageType data;
00936 
00937     const bool cdr = sample.header_.cdr_encapsulation_;
00938 
00939     OpenDDS::DCPS::Serializer ser(
00940       sample.sample_.get(),
00941       sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00942       cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00943           : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00944 
00945     if (cdr) {
00946       ACE_CDR::ULong header;
00947       if (!(ser >> header)) {
00948         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00949                   ACE_TEXT("deserialization header failed.\n"),
00950                   TraitsType::type_name()));
00951         return;
00952       }
00953 
00954       if (Serializer::use_rti_serialization()) {
00955         // Start counting byte-offset AFTER header
00956         ser.reset_alignment();
00957       }
00958     }
00959 
00960     if (sample.header_.key_fields_only_) {
00961       ser >> OpenDDS::DCPS::KeyOnly< MessageType>(data);
00962     } else {
00963       ser >> data;
00964     }
00965 
00966     if (!ser.good_bit()) {
00967       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00968                  ACE_TEXT("deserialization failed.\n"),
00969                  TraitsType::type_name()));
00970       return;
00971     }
00972 
00973     DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
00974     typename InstanceMap::const_iterator const it = instance_map_.find(data);
00975     if (it != instance_map_.end()) {
00976       handle = it->second;
00977     }
00978 
00979     if (handle == DDS::HANDLE_NIL) {
00980       instance.reset();
00981     } else {
00982       instance = get_handle_instance(handle);
00983     }
00984   }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data  )  [inline, virtual]

Definition at line 695 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL.

00696   {
00697     typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00698 
00699     if (it == instance_map_.end())
00700       {
00701         return DDS::HANDLE_NIL;
00702       }
00703     else
00704       {
00705         return it->second;
00706       }
00707   }

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic ( const void *  data  )  [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 791 of file DataReaderImpl_T.h.

References lookup_instance().

00792   {
00793     return lookup_instance(*static_cast<const MessageType*>(data));
00794   }

Here is the call graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock (  )  [inline, private]

Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.

Definition at line 2046 of file DataReaderImpl_T.h.

02047 {
02048   // This member function avoids a deadlock condition which otherwise
02049   // could occur as follows:
02050   // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
02051   // eventually DataReaderImpl::sample_lock_ to lock in call to
02052   // DataReaderImpl::contains_samples().
02053   // Thread2: Call to DataReaderImpl::data_received()
02054   // causes DataReaderImpl::sample_lock_ to lock and eventually
02055   // during notify of status condition a call to WaitSet::signal()
02056   // causes WaitSet::lock_ to lock.
02057   // Because the DataReaderImpl::sample_lock_ is not needed during
02058   // status notification this member function is used in
02059   // store_instance_data() to release sample_lock_ before making
02060   // the notification.
02061   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02062   notify_status_condition();
02063 }

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP_T ( MessageType  ,
DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)
template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data ( OpenDDS::DCPS::SubscriptionInstance_rch  instance  )  [inline, protected, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1086 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::ReceivedDataElement::dec_ref().

01087   {
01088     filter_delayed_handler_->drop_sample(instance->instance_handle_);
01089 
01090 
01091     instance->instance_state_.cancel_release();
01092 
01093     while (instance->rcvd_samples_.size_ > 0)
01094       {
01095         OpenDDS::DCPS::ReceivedDataElement* head =
01096           instance->rcvd_samples_.remove_head();
01097         head->dec_ref();
01098       }
01099   }

Here is the call graph for this function:

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change ( const DDS::DataReaderQos qos  )  [inline, virtual]

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 986 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::duration_to_time_value(), DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, OpenDDS::DCPS::DataReaderImpl::qos_change(), DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, and DDS::DataReaderQos::time_based_filter.

00987   {
00988     // reliability is not changeable, just time_based_filter
00989     if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00990       if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
00991         const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
00992         if (qos_.time_based_filter.minimum_separation != zero) {
00993           if (qos.time_based_filter.minimum_separation != zero) {
00994             const ACE_Time_Value new_interval = duration_to_time_value(qos.time_based_filter.minimum_separation);
00995             filter_delayed_handler_->reset_interval(new_interval);
00996           } else {
00997             filter_delayed_handler_->cancel();
00998           }
00999         }
01000         // else no existing timers to change/cancel
01001       }
01002       // else no qos change so nothing to change
01003     }
01004 
01005     DataReaderImpl::qos_change(qos);
01006   }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 112 of file DataReaderImpl_T.h.

References read(), DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00119     {
00120       DDS::ReturnCode_t const precond =
00121         check_inputs("read", received_data, info_seq, max_samples);
00122       if (DDS::RETCODE_OK != precond)
00123         {
00124           return precond;
00125         }
00126 
00127       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00128                         guard,
00129                         this->sample_lock_,
00130                         DDS::RETCODE_ERROR);
00131 
00132       return read_i(received_data, info_seq, max_samples, sample_states,
00133                     view_states, instance_states, 0);
00134     }

Here is the call graph for this function:

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic ( OpenDDS::DCPS::DataReaderImpl::GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count = false 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 762 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, DDS::LENGTH_UNLIMITED, DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_.

00767   {
00768 
00769     MessageSequenceType data;
00770     DDS::ReturnCode_t rc;
00771     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00772                       guard,
00773                       this->sample_lock_,
00774                       DDS::RETCODE_ERROR);
00775     {
00776       rc = read_i(data, gen.info_,
00777                   DDS::LENGTH_UNLIMITED,
00778                   sample_states, view_states, instance_states, 0);
00779       if (true == adjust_ref_count ) {
00780         data.increment_references();
00781       }
00782     }
00783     gen.samples_.reserve(data.length());
00784     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00785       gen.samples_.push_back(&data[i]);
00786     }
00787     return rc;
00788 
00789   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1120 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_READ, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.

01132 {
01133 #ifdef OPENDDS_NO_QUERY_CONDITION
01134   ACE_UNUSED_ARG(ignored);
01135 #endif
01136 
01137   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01138 
01139 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01140   if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01141       && ! this->coherent_) {
01142     return DDS::RETCODE_PRECONDITION_NOT_MET;
01143   }
01144 
01145   bool group_coherent_ordered
01146     = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01147     && this->subqos_.presentation.coherent_access
01148     && this->subqos_.presentation.ordered_access;
01149 
01150   if (group_coherent_ordered && this->coherent_) {
01151     max_samples = 1;
01152   }
01153 #endif
01154 
01155   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01156     results(this, received_data, info_seq, max_samples,
01157             this->subqos_.presentation,
01158 #ifndef OPENDDS_NO_QUERY_CONDITION
01159             a_condition,
01160 #endif
01161             OpenDDS::DCPS::DDS_OPERATION_READ);
01162 
01163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01164   if (! group_coherent_ordered) {
01165 #endif
01166     for (typename InstanceMap::iterator it = instance_map_.begin(),
01167            the_end = instance_map_.end(); it != the_end; ++it)
01168       {
01169         DDS::InstanceHandle_t handle = it->second;
01170 
01171         OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01172 
01173         if ((inst->instance_state_.view_state() & view_states) &&
01174             (inst->instance_state_.instance_state() & instance_states))
01175           {
01176             size_t i(0);
01177             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01178                  item != 0; item = item->next_data_sample_)
01179               {
01180                 if (item->sample_state_ & sample_states
01181 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01182                     && !item->coherent_change_
01183 #endif
01184                     )
01185                   {
01186                     results.insert_sample(item, inst, ++i);
01187                   }
01188               }
01189           }
01190       }
01191 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01192   }
01193   else {
01194     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01195     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01196   }
01197 #endif
01198 
01199   results.copy_to_user();
01200 
01201   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01202   if (received_data.length())
01203     {
01204       ret = DDS::RETCODE_OK;
01205       if (received_data.maximum() == 0) //using ZeroCopy
01206         {
01207           received_data_p.set_loaner(this);
01208         }
01209     }
01210 
01211   post_read_or_take();
01212 
01213   return ret;
01214 }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 399 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00407   {
00408     DDS::ReturnCode_t const precond =
00409       check_inputs("read_instance", received_data, info_seq, max_samples);
00410     if (DDS::RETCODE_OK != precond)
00411       {
00412         return precond;
00413       }
00414 
00415     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00416                       guard,
00417                       this->sample_lock_,
00418                       DDS::RETCODE_ERROR);
00419     return read_instance_i(received_data, info_seq, max_samples, a_handle,
00420                            sample_states, view_states, instance_states, 0);
00421   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 821 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.

00825   {
00826     MessageSequenceType dataseq;
00827     DDS::SampleInfoSeq infoseq;
00828     const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
00829                                                  DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
00830                                                  instance_states, 0);
00831     if (rc != DDS::RETCODE_NO_DATA)
00832       {
00833         const CORBA::ULong last = dataseq.length() - 1;
00834         data = new MessageType(dataseq[last]);
00835         info = infoseq[last];
00836       }
00837     return rc;
00838   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1312 of file DataReaderImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DDS_OPERATION_READ, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::InstanceState::instance_state_string(), item(), LM_DEBUG, OPENDDS_STRING, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().

01325 {
01326 #ifdef OPENDDS_NO_QUERY_CONDITION
01327   ACE_UNUSED_ARG(ignored);
01328 #endif
01329 
01330   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01331 
01332   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01333     results(this, received_data, info_seq, max_samples,
01334             this->subqos_.presentation,
01335 #ifndef OPENDDS_NO_QUERY_CONDITION
01336             a_condition,
01337 #endif
01338             OpenDDS::DCPS::DDS_OPERATION_READ);
01339 
01340   OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01341   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01342 
01343   InstanceState& state_obj = inst->instance_state_;
01344   bool valid_view_state = state_obj.view_state() & view_states;
01345   bool valid_instance_state = state_obj.instance_state() & instance_states;
01346   if (valid_view_state && valid_instance_state)
01347     {
01348       size_t i(0);
01349       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01350            item; item = item->next_data_sample_)
01351         {
01352           if (item->sample_state_ & sample_states
01353 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01354               && !item->coherent_change_
01355 #endif
01356               )
01357             {
01358               results.insert_sample(item, inst, ++i);
01359             }
01360         }
01361       }
01362     else
01363       {
01364         if (OpenDDS::DCPS::DCPS_debug_level >= 8) {
01365           OPENDDS_STRING msg;
01366           if (!valid_view_state) {
01367             msg += "view state is not valid";
01368             if (!valid_instance_state) {
01369               msg += " and ";
01370             }
01371           }
01372           if (!valid_instance_state) {
01373             msg = msg
01374               + "instance state is "
01375               + state_obj.instance_state_string()
01376               + " while the validity mask is "
01377               + InstanceState::instance_state_string(instance_states);
01378           }
01379           GuidConverter conv(get_subscription_id());
01380           ACE_DEBUG((LM_DEBUG,
01381             ACE_TEXT(
01382               "(%P|%t) DataReaderImpl_T::read_instance_i: "
01383               "will return no data reading sub %C because:\n  %C\n"
01384             ),
01385             OPENDDS_STRING(conv).c_str(), msg.c_str()
01386           ));
01387         }
01388       }
01389 
01390   results.copy_to_user();
01391 
01392   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01393   if (received_data.length())
01394     {
01395       ret = DDS::RETCODE_OK;
01396       if (received_data.maximum() == 0) //using ZeroCopy
01397         {
01398           received_data_p.set_loaner(this);
01399         }
01400     }
01401 
01402   post_read_or_take();
01403   return ret;
01404 }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 447 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00453   {
00454     DDS::ReturnCode_t const precond =
00455       check_inputs("read_instance_w_condition", received_data, info_seq,
00456                    max_samples);
00457     if (DDS::RETCODE_OK != precond)
00458       {
00459         return precond;
00460       }
00461 
00462     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00463                       DDS::RETCODE_ERROR);
00464 
00465     if (!has_readcondition(a_condition))
00466       {
00467         return DDS::RETCODE_PRECONDITION_NOT_MET;
00468       }
00469 
00470 #ifndef OPENDDS_NO_QUERY_CONDITION
00471     DDS::QueryCondition_ptr query_condition =
00472         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00473 #endif
00474 
00475     return read_instance_i(received_data, info_seq, max_samples, a_handle,
00476                            a_condition->get_sample_state_mask(),
00477                            a_condition->get_view_state_mask(),
00478                            a_condition->get_instance_state_mask(),
00479 #ifndef OPENDDS_NO_QUERY_CONDITION
00480                            query_condition
00481 #else
00482                            0
00483 #endif
00484                            );
00485   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 527 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00535   {
00536     DDS::ReturnCode_t const precond =
00537       check_inputs("read_next_instance", received_data, info_seq, max_samples);
00538     if (DDS::RETCODE_OK != precond)
00539       {
00540         return precond;
00541       }
00542 
00543     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00544                                 sample_states, view_states, instance_states, 0);
00545   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 840 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.

00844   {
00845     MessageSequenceType dataseq;
00846     DDS::SampleInfoSeq infoseq;
00847     const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
00848                                                       DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
00849                                                       instance_states, 0);
00850     if (rc != DDS::RETCODE_NO_DATA)
00851       {
00852         const CORBA::ULong last = dataseq.length() - 1;
00853         data = new MessageType(dataseq[last]);
00854         info = infoseq[last];
00855       }
00856     return rc;
00857   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1471 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and status.

01484 {
01485 #ifdef OPENDDS_NO_QUERY_CONDITION
01486   ACE_UNUSED_ARG(ignored);
01487 #endif
01488 
01489   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01490 
01491   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01492                     guard,
01493                     this->sample_lock_,
01494                     DDS::RETCODE_ERROR);
01495 
01496   typename InstanceMap::iterator it;
01497   typename InstanceMap::iterator const the_end = instance_map_.end ();
01498 
01499   if (a_handle == DDS::HANDLE_NIL)
01500     {
01501       it = instance_map_.begin ();
01502     }
01503   else
01504     {
01505       for (it = instance_map_.begin ();
01506            it != the_end;
01507            ++it)
01508         {
01509           if (a_handle == it->second)
01510             {
01511               ++it;
01512               break;
01513             }
01514         }
01515     }
01516 
01517   for (; it != the_end; ++it)
01518     {
01519       handle = it->second;
01520       DDS::ReturnCode_t const status =
01521           read_instance_i(received_data, info_seq, max_samples, handle,
01522                           sample_states, view_states, instance_states,
01523 #ifndef OPENDDS_NO_QUERY_CONDITION
01524                           a_condition);
01525 #else
01526       0);
01527 #endif
01528   if (status != DDS::RETCODE_NO_DATA)
01529     {
01530       post_read_or_take();
01531       return status;
01532     }
01533 }
01534 
01535 post_read_or_take();
01536 return DDS::RETCODE_NO_DATA;
01537 }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 567 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00573   {
00574     DDS::ReturnCode_t const precond =
00575       check_inputs("read_next_instance_w_condition", received_data, info_seq,
00576                    max_samples);
00577     if (DDS::RETCODE_OK != precond)
00578       {
00579         return precond;
00580       }
00581 
00582     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00583                       DDS::RETCODE_ERROR);
00584 
00585     if (!has_readcondition(a_condition))
00586       {
00587         return DDS::RETCODE_PRECONDITION_NOT_MET;
00588       }
00589 
00590 #ifndef OPENDDS_NO_QUERY_CONDITION
00591     DDS::QueryCondition_ptr query_condition =
00592         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00593 #endif
00594 
00595     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00596                                 a_condition->get_sample_state_mask(),
00597                                 a_condition->get_view_state_mask(),
00598                                 a_condition->get_instance_state_mask(),
00599 #ifndef OPENDDS_NO_QUERY_CONDITION
00600                                 query_condition
00601 #else
00602                                 0
00603 #endif
00604                                 );
00605   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info 
) [inline, virtual]

Definition at line 225 of file DataReaderImpl_T.h.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, item(), DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.

00228   {
00229 
00230     bool found_data = false;
00231 
00232     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00233                       guard,
00234                       this->sample_lock_,
00235                       DDS::RETCODE_ERROR);
00236 
00237     typename InstanceMap::iterator const the_end = instance_map_.end ();
00238     for (typename InstanceMap::iterator it = instance_map_.begin ();
00239          it != the_end;
00240          ++it)
00241       {
00242         DDS::InstanceHandle_t handle = it->second;
00243         OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00244 
00245         bool mrg = false; //most_recent_generation
00246 
00247         if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00248             (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00249           {
00250             for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_;
00251                  item != 0;
00252                  item = item->next_data_sample_)
00253               {
00254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00255                 if (item->coherent_change_) continue;
00256 #endif
00257 
00258                 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00259                   {
00260                     if (item->registered_data_ != 0)
00261                       {
00262                         received_data =
00263                           *static_cast< MessageType *> (item->registered_data_);
00264                       }
00265                     ptr->instance_state_.sample_info(sample_info, item);
00266 
00267                     item->sample_state_ = DDS::READ_SAMPLE_STATE;
00268 
00269 
00270                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00271 
00272                     found_data = true;
00273                   }
00274                 if (found_data)
00275                   {
00276                     break;
00277                   }
00278               }
00279           }
00280 
00281         if (found_data)
00282           {
00283             if (mrg) ptr->instance_state_.accessed();
00284 
00285             // Get the sample_ranks, generation_ranks, and
00286             // absolute_generation_ranks for this info_seq
00287             this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00288 
00289             break;
00290           }
00291       }
00292     post_read_or_take();
00293     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00294   }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 160 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00165     {
00166       DDS::ReturnCode_t const precond =
00167         check_inputs("read_w_condition", received_data, sample_info, max_samples);
00168       if (DDS::RETCODE_OK != precond)
00169         {
00170           return precond;
00171         }
00172 
00173       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00174                         DDS::RETCODE_ERROR);
00175 
00176       if (!has_readcondition(a_condition))
00177         {
00178           return DDS::RETCODE_PRECONDITION_NOT_MET;
00179         }
00180 
00181       return read_i(received_data, sample_info, max_samples,
00182                     a_condition->get_sample_state_mask(),
00183                     a_condition->get_view_state_mask(),
00184                     a_condition->get_instance_state_mask(),
00185 #ifndef OPENDDS_NO_QUERY_CONDITION
00186                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
00187 #else
00188       0);
00189 #endif
00190   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i ( DDS::InstanceHandle_t  handle  )  [inline, protected, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1101 of file DataReaderImpl_T.h.

01102   {
01103     typename InstanceMap::iterator const the_end = instance_map_.end ();
01104     typename InstanceMap::iterator it = instance_map_.begin ();
01105     while (it != the_end)
01106       {
01107         if (it->second == handle)
01108           {
01109             typename InstanceMap::iterator curIt = it;
01110             ++ it;
01111             instance_map_.erase (curIt);
01112           }
01113         else
01114           ++ it;
01115       }
01116   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan ( MessageSequenceType received_data  )  [inline]

Definition at line 722 of file DataReaderImpl_T.h.

00723   {
00724     received_data.length(0);
00725   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq 
) [inline, virtual]

Definition at line 647 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00650   {
00651     // Some incomplete tests to see that the data and info are from the
00652     // same read.
00653     if (received_data.length() != info_seq.length())
00654       {
00655         return DDS::RETCODE_PRECONDITION_NOT_MET;
00656       }
00657 
00658     if (received_data.release())
00659       {
00660         // nothing to do because this is not zero-copy data
00661         return DDS::RETCODE_OK;
00662       }
00663     else
00664       {
00665         info_seq.length(0);
00666         received_data.length(0);
00667       }
00668     return DDS::RETCODE_OK;
00669   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state ( DDS::InstanceHandle_t  instance,
DDS::InstanceStateKind  state 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 907 of file DataReaderImpl_T.h.

References DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::DISPOSE_INSTANCE, get_key_value(), header, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, and OpenDDS::DCPS::UNREGISTER_INSTANCE.

Referenced by OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), and OpenDDS::RTPS::Sedp::Task::svc_i().

00909   {
00910     using namespace OpenDDS::DCPS;
00911     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00912 
00913     SubscriptionInstance_rch si = get_handle_instance(instance);
00914     if (si && state != DDS::ALIVE_INSTANCE_STATE) {
00915       DataSampleHeader header;
00916       const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
00917         ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
00918       header.message_id_ = static_cast<char>(msg);
00919       bool just_registered, filtered;
00920       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
00921       get_key_value(*data, instance);
00922       store_instance_data(move(data), header, si, just_registered, filtered);
00923       if (!filtered)
00924       {
00925         notify_read_conditions();
00926       }
00927     }
00928   }

Here is the call graph for this function:

Here is the caller graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance_rch instance_ptr,
bool &  just_registered,
bool &  filtered 
) [inline, private]

!! caller should already have the sample_lock_

Definition at line 1605 of file DataReaderImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, OpenDDS::DCPS::INSTANCE_REGISTRATION, CORBA::is_nil(), OpenDDS::DCPS::keyFromSample(), DDS::LENGTH_UNLIMITED, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ref(), DDS::REJECTED_BY_INSTANCES_LIMIT, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::SAMPLE_DATA, DDS::SAMPLE_REJECTED_STATUS, and OpenDDS::DCPS::UNREGISTER_INSTANCE.

01611 {
01612   const bool is_dispose_msg =
01613     header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
01614     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01615   const bool is_unregister_msg =
01616     header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
01617     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01618 
01619   // not filtering any data, except what is specifically identified as filtered below
01620   filtered = false;
01621 
01622   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01623 
01624   //!!! caller should already have the sample_lock_
01625   //We will unlock it before calling into listeners
01626 
01627   typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
01628 
01629   if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end())
01630   {
01631      return;
01632   }
01633 
01634 
01635   if (it == instance_map_.end())
01636   {
01637     std::size_t instances_size = 0;
01638     {
01639       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01640       instances_size = instances_.size();
01641     }
01642     if ((this->qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
01643       ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances))
01644     {
01645       DDS::DataReaderListener_var listener
01646         = listener_for (DDS::SAMPLE_REJECTED_STATUS);
01647 
01648       set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
01649 
01650       sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
01651       ++sample_rejected_status_.total_count;
01652       ++sample_rejected_status_.total_count_change;
01653       sample_rejected_status_.last_instance_handle = handle;
01654 
01655       if (!CORBA::is_nil(listener.in()))
01656       {
01657         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01658 
01659         listener->on_sample_rejected(this, sample_rejected_status_);
01660         sample_rejected_status_.total_count_change = 0;
01661       }  // do we want to do something if listener is nil???
01662       notify_status_condition_no_sample_lock();
01663 
01664       return;
01665     }
01666 
01667 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01668     SharedInstanceMap_rch inst;
01669     bool new_handle = true;
01670     if (this->is_exclusive_ownership_) {
01671       OwnershipManagerPtr owner_manager = this->ownership_manager();
01672 
01673       if (!owner_manager || owner_manager->instance_lock_acquire () != 0) {
01674         ACE_ERROR ((LM_ERROR,
01675                     ACE_TEXT("(%P|%t) ")
01676                     ACE_TEXT("%CDataReaderImpl::")
01677                     ACE_TEXT("store_instance_data, ")
01678                     ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name()));
01679         return;
01680       }
01681 
01682       inst = dynamic_rchandle_cast<SharedInstanceMap>(
01683         owner_manager->get_instance_map(this->topic_servant_->type_name(), this));
01684       if (inst != 0) {
01685         typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
01686         if (iter != inst->end ()) {
01687           handle = iter->second;
01688           new_handle = false;
01689         }
01690       }
01691     }
01692 #endif
01693 
01694     just_registered = true;
01695     DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
01696     handle = handle == DDS::HANDLE_NIL ? this->get_next_handle( key) : handle;
01697     OpenDDS::DCPS::SubscriptionInstance_rch instance =
01698       OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
01699         this,
01700         this->qos_,
01701         ref(this->instances_lock_),
01702         handle);
01703 
01704     instance->instance_handle_ = handle;
01705 
01706     {
01707       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01708       int ret = OpenDDS::DCPS::bind(instances_, handle, instance);
01709 
01710       if (ret != 0)
01711       {
01712         ACE_ERROR ((LM_ERROR,
01713                     ACE_TEXT("(%P|%t) ")
01714                     ACE_TEXT("%CDataReaderImpl::")
01715                     ACE_TEXT("store_instance_data, ")
01716                     ACE_TEXT("insert handle failed. \n"), TraitsType::type_name()));
01717         return;
01718       }
01719     }
01720 
01721 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01722     OwnershipManagerPtr owner_manager = this->ownership_manager();
01723 
01724     if (owner_manager) {
01725       if (!inst) {
01726         inst = make_rch<SharedInstanceMap>();
01727         owner_manager->set_instance_map(
01728           this->topic_servant_->type_name(),
01729           inst,
01730           this);
01731       }
01732 
01733       if (new_handle) {
01734         std::pair<typename InstanceMap::iterator, bool> bpair =
01735           inst->insert(typename InstanceMap::value_type(*instance_data,
01736             handle));
01737         if (bpair.second == false)
01738         {
01739           ACE_ERROR ((LM_ERROR,
01740                       ACE_TEXT("(%P|%t) ")
01741                       ACE_TEXT("%CDataReaderImpl::")
01742                       ACE_TEXT("store_instance_data, ")
01743                       ACE_TEXT("insert to participant scope %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01744           return;
01745         }
01746       }
01747 
01748       if (owner_manager->instance_lock_release () != 0) {
01749         ACE_ERROR ((LM_ERROR,
01750                     ACE_TEXT("(%P|%t) ")
01751                     ACE_TEXT("%CDataReaderImpl::")
01752                     ACE_TEXT("store_instance_data, ")
01753                     ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name()));
01754         return;
01755       }
01756     }
01757 #endif
01758 
01759     std::pair<typename InstanceMap::iterator, bool> bpair =
01760       instance_map_.insert(typename InstanceMap::value_type(*instance_data,
01761         handle));
01762     if (bpair.second == false)
01763     {
01764       ACE_ERROR ((LM_ERROR,
01765                   ACE_TEXT("(%P|%t) ")
01766                   ACE_TEXT("%CDataReaderImpl::")
01767                   ACE_TEXT("store_instance_data, ")
01768                   ACE_TEXT("insert %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01769       return;
01770     }
01771   }
01772   else
01773   {
01774     just_registered = false;
01775     handle = it->second;
01776   }
01777 
01778   if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
01779   {
01780     instance_ptr = get_handle_instance(handle);
01781 
01782     if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
01783     {
01784       filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
01785 
01786       ACE_Time_Value filter_time_expired;
01787       if (!filtered &&
01788           time_based_filter_instance(instance_ptr, filter_time_expired)) {
01789         filtered = true;
01790         if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01791           filter_delayed_handler_->delay_sample(handle, move(instance_data), header, just_registered, filter_time_expired);
01792 
01793         }
01794       } else {
01795         // nothing time based filtered now
01796         filter_delayed_handler_->clear_sample(handle);
01797 
01798       }
01799 
01800       if (filtered)
01801       {
01802         return;
01803       }
01804     }
01805 
01806     finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
01807   }
01808   else
01809   {
01810     instance_ptr = this->get_handle_instance(handle);
01811     instance_ptr->instance_state_.lively(header.publication_id_);
01812   }
01813 }

Here is the call graph for this function:

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data ( const MessageType &  sample,
DDS::ViewStateKind  view 
) [inline]

Definition at line 861 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL, header, OpenDDS::DCPS::INSTANCE_REGISTRATION, lookup_instance(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), DDS::NOT_NEW_VIEW_STATE, and OpenDDS::DCPS::SAMPLE_DATA.

Referenced by OpenDDS::RTPS::Spdp::handle_participant_data(), OpenDDS::DCPS::StaticEndpointManager::init_bit(), and OpenDDS::RTPS::Spdp::match_unauthenticated().

00863   {
00864     using namespace OpenDDS::DCPS;
00865     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
00866                      DDS::HANDLE_NIL);
00867 
00868 #ifndef OPENDDS_NO_MULTI_TOPIC
00869     DDS::TopicDescription_var descr = get_topicdescription();
00870     if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
00871       if (!mt->filter(sample)) {
00872         return DDS::HANDLE_NIL;
00873       }
00874     }
00875 #endif
00876 
00877     get_subscriber_servant()->data_received(this);
00878 
00879     DDS::InstanceHandle_t inst = lookup_instance(sample);
00880     bool filtered = false;
00881     SubscriptionInstance_rch instance;
00882 
00883     // Call store_instance_data() once or twice, depending on if we need to
00884     // process the INSTANCE_REGISTRATION.  In either case, store_instance_data()
00885     // owns the memory for the sample and it must come from the correct allocator.
00886     for (int i = 0; i < 2; ++i) {
00887       if (i == 0 && inst != DDS::HANDLE_NIL) continue;
00888 
00889       DataSampleHeader header;
00890       const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
00891       header.message_id_ = static_cast<char>(msg);
00892       bool just_registered;
00893       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
00894       store_instance_data(move(data), header, instance, just_registered, filtered);
00895       if (instance) inst = instance->instance_handle_;
00896     }
00897 
00898     if (!filtered) {
00899       if (view == DDS::NOT_NEW_VIEW_STATE) {
00900         if (instance) instance->instance_state_.accessed();
00901       }
00902       notify_read_conditions();
00903     }
00904     return inst;
00905   }

Here is the call graph for this function:

Here is the caller graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( OpenDDS::DCPS::AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 796 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::AbstractSamples::push_back(), OpenDDS::DCPS::AbstractSamples::reserve(), and DDS::RETCODE_ERROR.

00800   {
00801 
00802     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00803                       guard,
00804                       this->sample_lock_,
00805                       DDS::RETCODE_ERROR);
00806 
00807     MessageSequenceType data;
00808     DDS::SampleInfoSeq infos;
00809     DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
00810                                   sample_states, view_states, instance_states, 0);
00811 
00812     samples.reserve(data.length());
00813 
00814     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00815       samples.push_back(infos[i], &data[i]);
00816     }
00817 
00818     return rc;
00819   }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 136 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00143     {
00144       DDS::ReturnCode_t const precond =
00145         check_inputs("take", received_data, info_seq, max_samples);
00146       if (DDS::RETCODE_OK != precond)
00147         {
00148           return precond;
00149         }
00150 
00151       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00152                         guard,
00153                         this->sample_lock_,
00154                         DDS::RETCODE_ERROR);
00155 
00156       return take_i(received_data, info_seq, max_samples, sample_states,
00157                     view_states, instance_states, 0);
00158     }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1216 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_TAKE, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.

01228 {
01229 #ifdef OPENDDS_NO_QUERY_CONDITION
01230   ACE_UNUSED_ARG(ignored);
01231 #endif
01232 
01233   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01234 
01235 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01236   if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01237       && ! this->coherent_) {
01238     return DDS::RETCODE_PRECONDITION_NOT_MET;
01239   }
01240 
01241   bool group_coherent_ordered
01242     = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01243     && this->subqos_.presentation.coherent_access
01244     && this->subqos_.presentation.ordered_access;
01245 
01246   if (group_coherent_ordered && this->coherent_) {
01247     max_samples = 1;
01248   }
01249 #endif
01250 
01251   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01252     results(this, received_data, info_seq, max_samples,
01253             this->subqos_.presentation,
01254 #ifndef OPENDDS_NO_QUERY_CONDITION
01255             a_condition,
01256 #endif
01257             OpenDDS::DCPS::DDS_OPERATION_TAKE);
01258 
01259 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01260   if (! group_coherent_ordered) {
01261 #endif
01262 
01263     for (typename InstanceMap::iterator it = instance_map_.begin(),
01264            the_end = instance_map_.end(); it != the_end; ++it)
01265       {
01266         DDS::InstanceHandle_t handle = it->second;
01267 
01268         OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01269 
01270         if ((inst->instance_state_.view_state() & view_states) &&
01271             (inst->instance_state_.instance_state() & instance_states))
01272           {
01273             size_t i(0);
01274             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01275                  item != 0; item = item->next_data_sample_)
01276               {
01277                 if (item->sample_state_ & sample_states
01278 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01279                     && !item->coherent_change_
01280 #endif
01281                     )
01282                   {
01283                     results.insert_sample(item, inst, ++i);
01284                   }
01285               }
01286           }
01287       }
01288 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01289   }
01290   else {
01291     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01292     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01293   }
01294 #endif
01295 
01296   results.copy_to_user();
01297 
01298   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01299   if (received_data.length())
01300     {
01301       ret = DDS::RETCODE_OK;
01302       if (received_data.maximum() == 0) //using ZeroCopy
01303         {
01304           received_data_p.set_loaner(this);
01305         }
01306     }
01307 
01308   post_read_or_take();
01309   return ret;
01310 }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 423 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00431   {
00432     DDS::ReturnCode_t const precond =
00433       check_inputs("take_instance", received_data, info_seq, max_samples);
00434     if (DDS::RETCODE_OK != precond)
00435       {
00436         return precond;
00437       }
00438 
00439     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00440                       guard,
00441                       this->sample_lock_,
00442                       DDS::RETCODE_ERROR);
00443     return take_instance_i(received_data, info_seq, max_samples, a_handle,
00444                            sample_states, view_states, instance_states, 0);
00445   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1406 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), item(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.

01419 {
01420 #ifdef OPENDDS_NO_QUERY_CONDITION
01421   ACE_UNUSED_ARG(ignored);
01422 #endif
01423 
01424   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01425 
01426   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01427     results(this, received_data, info_seq, max_samples,
01428             this->subqos_.presentation,
01429 #ifndef OPENDDS_NO_QUERY_CONDITION
01430             a_condition,
01431 #endif
01432             OpenDDS::DCPS::DDS_OPERATION_TAKE);
01433 
01434   OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01435   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01436 
01437   if ((inst->instance_state_.view_state() & view_states) &&
01438       (inst->instance_state_.instance_state() & instance_states))
01439     {
01440       size_t i(0);
01441       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01442            item; item = item->next_data_sample_)
01443         {
01444           if (item->sample_state_ & sample_states
01445 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01446               && !item->coherent_change_
01447 #endif
01448               )
01449             {
01450               results.insert_sample(item, inst, ++i);
01451             }
01452         }
01453     }
01454 
01455   results.copy_to_user();
01456 
01457   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01458   if (received_data.length())
01459     {
01460       ret = DDS::RETCODE_OK;
01461       if (received_data.maximum() == 0) //using ZeroCopy
01462         {
01463           received_data_p.set_loaner(this);
01464         }
01465     }
01466 
01467   post_read_or_take();
01468   return ret;
01469 }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 487 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00493   {
00494     DDS::ReturnCode_t const precond =
00495       check_inputs("take_instance_w_condition", received_data, info_seq,
00496                    max_samples);
00497     if (DDS::RETCODE_OK != precond)
00498       {
00499         return precond;
00500       }
00501 
00502     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00503                       DDS::RETCODE_ERROR);
00504 
00505     if (!has_readcondition(a_condition))
00506       {
00507         return DDS::RETCODE_PRECONDITION_NOT_MET;
00508       }
00509 
00510 #ifndef OPENDDS_NO_QUERY_CONDITION
00511     DDS::QueryCondition_ptr query_condition =
00512         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00513 #endif
00514 
00515     return take_instance_i(received_data, info_seq, max_samples, a_handle,
00516                            a_condition->get_sample_state_mask(),
00517                            a_condition->get_view_state_mask(),
00518                            a_condition->get_instance_state_mask(),
00519 #ifndef OPENDDS_NO_QUERY_CONDITION
00520                            query_condition
00521 #else
00522                            0
00523 #endif
00524                            );
00525   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [inline, virtual]

Definition at line 547 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00555   {
00556     DDS::ReturnCode_t const precond =
00557       check_inputs("take_next_instance", received_data, info_seq, max_samples);
00558     if (DDS::RETCODE_OK != precond)
00559       {
00560         return precond;
00561       }
00562 
00563     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00564                                 sample_states, view_states, instance_states, 0);
00565   }

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1539 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and status.

01552 {
01553 #ifdef OPENDDS_NO_QUERY_CONDITION
01554   ACE_UNUSED_ARG(ignored);
01555 #endif
01556 
01557   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01558 
01559   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01560                     guard,
01561                     this->sample_lock_,
01562                     DDS::RETCODE_ERROR);
01563 
01564   typename InstanceMap::iterator it;
01565   typename InstanceMap::iterator const the_end = instance_map_.end ();
01566 
01567   if (a_handle == DDS::HANDLE_NIL)
01568     {
01569       it = instance_map_.begin ();
01570     }
01571   else
01572     {
01573       for (it = instance_map_.begin (); it != the_end; ++it)
01574         {
01575           if (a_handle == it->second)
01576             {
01577               ++it;
01578               break;
01579             }
01580         }
01581     }
01582 
01583   for (; it != the_end; ++it)
01584     {
01585       handle = it->second;
01586       DDS::ReturnCode_t const status =
01587           take_instance_i(received_data, info_seq, max_samples, handle,
01588                           sample_states, view_states, instance_states,
01589 #ifndef OPENDDS_NO_QUERY_CONDITION
01590                           a_condition);
01591 #else
01592       0);
01593 #endif
01594     if (status != DDS::RETCODE_NO_DATA)
01595       {
01596         total_samples();  // see if we are empty
01597         post_read_or_take();
01598         return status;
01599       }
01600   }
01601   post_read_or_take();
01602   return DDS::RETCODE_NO_DATA;
01603 }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 607 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00613   {
00614     DDS::ReturnCode_t const precond =
00615       check_inputs("take_next_instance_w_condition", received_data, info_seq,
00616                    max_samples);
00617     if (DDS::RETCODE_OK != precond)
00618       {
00619         return precond;
00620       }
00621 
00622     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00623                       DDS::RETCODE_ERROR);
00624 
00625     if (!has_readcondition(a_condition))
00626       {
00627         return DDS::RETCODE_PRECONDITION_NOT_MET;
00628       }
00629 
00630 #ifndef OPENDDS_NO_QUERY_CONDITION
00631     DDS::QueryCondition_ptr query_condition =
00632         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00633 #endif
00634 
00635     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00636                                 a_condition->get_sample_state_mask(),
00637                                 a_condition->get_view_state_mask(),
00638                                 a_condition->get_instance_state_mask(),
00639 #ifndef OPENDDS_NO_QUERY_CONDITION
00640                                 query_condition
00641 #else
00642                                 0
00643 #endif
00644                                 );
00645   }

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info 
) [inline, virtual]

Definition at line 296 of file DataReaderImpl_T.h.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::ReceivedDataElement::coherent_change_, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), item(), OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::ReceivedDataElement::registered_data_, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::ReceivedDataElement::sample_state_.

00299   {
00300     bool found_data = false;
00301 
00302 
00303     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00304                       guard,
00305                       this->sample_lock_,
00306                       DDS::RETCODE_ERROR);
00307 
00308     typename InstanceMap::iterator const the_end = instance_map_.end ();
00309     for (typename InstanceMap::iterator it = instance_map_.begin ();
00310          it != the_end;
00311          ++it)
00312       {
00313         DDS::InstanceHandle_t handle = it->second;
00314         OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00315 
00316         bool mrg = false; //most_recent_generation
00317 
00318         OpenDDS::DCPS::ReceivedDataElement *tail = 0;
00319         if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00320             (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00321           {
00322 
00323             OpenDDS::DCPS::ReceivedDataElement *next;
00324             tail = 0;
00325             OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
00326             while (item)
00327               {
00328 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00329                 if (item->coherent_change_)
00330                   {
00331                     item = item->next_data_sample_;
00332                     continue;
00333                   }
00334 #endif
00335                 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00336                   {
00337                     if (item->registered_data_ != 0)
00338                       {
00339                         received_data =
00340                           *static_cast< MessageType *> (item->registered_data_);
00341                       }
00342                     ptr->instance_state_.sample_info(sample_info, item);
00343 
00344                     item->sample_state_ = DDS::READ_SAMPLE_STATE;
00345 
00346                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00347 
00348                     if (item == ptr->rcvd_samples_.tail_)
00349                       {
00350                         tail = ptr->rcvd_samples_.tail_;
00351                         item = item->next_data_sample_;
00352                       }
00353                     else
00354                       {
00355                         next = item->next_data_sample_;
00356 
00357                         ptr->rcvd_samples_.remove(item);
00358                         item->dec_ref();
00359 
00360                         item = next;
00361                       }
00362 
00363                     found_data = true;
00364                   }
00365                 if (found_data)
00366                   {
00367                     break;
00368                   }
00369               }
00370           }
00371 
00372         if (found_data)
00373           {
00374             if (mrg) ptr->instance_state_.accessed();
00375 
00376             //
00377             // Get the sample_ranks, generation_ranks, and
00378             // absolute_generation_ranks for this info_seq
00379             //
00380             if (tail)
00381               {
00382                 this->sample_info(sample_info, tail);
00383 
00384                 ptr->rcvd_samples_.remove(tail);
00385                 tail->dec_ref();
00386               }
00387             else
00388               {
00389                 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00390               }
00391 
00392             break;
00393           }
00394       }
00395     post_read_or_take();
00396     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00397   }

Here is the call graph for this function:

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
) [inline, virtual]

Definition at line 192 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00197     {
00198       DDS::ReturnCode_t const precond =
00199         check_inputs("take_w_condition", received_data, sample_info, max_samples);
00200       if (DDS::RETCODE_OK != precond)
00201         {
00202           return precond;
00203         }
00204 
00205       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00206                         DDS::RETCODE_ERROR);
00207 
00208       if (!has_readcondition(a_condition))
00209         {
00210           return DDS::RETCODE_PRECONDITION_NOT_MET;
00211         }
00212 
00213       return take_i(received_data, sample_info, max_samples,
00214                     a_condition->get_sample_state_mask(),
00215                     a_condition->get_view_state_mask(),
00216                     a_condition->get_instance_state_mask(),
00217 #ifndef OPENDDS_NO_QUERY_CONDITION
00218                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
00219 #else
00220                     0
00221 #endif
00222                     );
00223     }


Member Data Documentation

template<typename MessageType>
RcHandle<FilterDelayedHandler> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_handler_ [private]

Definition at line 2383 of file DataReaderImpl_T.h.

template<typename MessageType>
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_ [private]

Definition at line 2385 of file DataReaderImpl_T.h.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1