OpenDDS::DCPS::DataReaderImpl_T< MessageType > Class Template Reference

#include <DataReaderImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:

Collaboration graph
[legend]
List of all members.

Public Types

typedef DDSTraits< MessageType > TraitsType
typedef TraitsType::MessageSequenceType MessageSequenceType
typedef ::OpenDDS::DCPS::Cached_Allocator_With_Overflow<
MessageType, ACE_Null_Mutex > 
DataAllocator
typedef TraitsType::DataReaderType Interface

Public Member Functions

typedef OPENDDS_MAP_CMP (MessageType,::DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 DataReaderImpl_T (void)
 Constructor.
virtual ~DataReaderImpl_T (void)
 Destructor.
virtual ::DDS::ReturnCode_t enable_specific ()
virtual ::DDS::ReturnCode_t read (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t take (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t read_w_condition (MessageSequenceType &received_data,::DDS::SampleInfoSeq &sample_info,::CORBA::Long max_samples,::DDS::ReadCondition_ptr a_condition)
virtual ::DDS::ReturnCode_t take_w_condition (MessageSequenceType &received_data,::DDS::SampleInfoSeq &sample_info,::CORBA::Long max_samples,::DDS::ReadCondition_ptr a_condition)
virtual ::DDS::ReturnCode_t read_next_sample (MessageType &received_data,::DDS::SampleInfo &sample_info)
virtual ::DDS::ReturnCode_t take_next_sample (MessageType &received_data,::DDS::SampleInfo &sample_info)
virtual ::DDS::ReturnCode_t read_instance (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t take_instance (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t read_next_instance (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t take_next_instance (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
virtual ::DDS::ReturnCode_t read_next_instance_w_condition (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::ReadCondition_ptr a_condition)
virtual ::DDS::ReturnCode_t take_next_instance_w_condition (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::ReadCondition_ptr a_condition)
virtual ::DDS::ReturnCode_t return_loan (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq)
virtual ::DDS::ReturnCode_t get_key_value (MessageType &key_holder,::DDS::InstanceHandle_t handle)
virtual ::DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
virtual ::DDS::ReturnCode_t auto_return_loan (void *seq)
void release_loan (MessageSequenceType &received_data)
void dec_ref_data_element (::OpenDDS::DCPS::ReceivedDataElement *item)
virtual void delete_instance_map (void *map)
bool contains_sample_filtered (::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states, const OpenDDS::DCPS::FilterEvaluator &evaluator, const ::DDS::StringSeq &params)
::DDS::ReturnCode_t read_generic (OpenDDS::DCPS::DataReaderImpl::GenericBundle &gen,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
::DDS::InstanceHandle_t lookup_instance_generic (const void *data)
virtual ::DDS::ReturnCode_t take (OpenDDS::DCPS::AbstractSamples &samples,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
::DDS::ReturnCode_t read_instance_generic (void *&data,::DDS::SampleInfo &info,::DDS::InstanceHandle_t instance,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
::DDS::ReturnCode_t read_next_instance_generic (void *&data,::DDS::SampleInfo &info,::DDS::InstanceHandle_t previous_instance,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states)
::DDS::InstanceHandle_t store_synthetic_data (const MessageType &sample,::DDS::ViewStateKind view)
void set_instance_state (::DDS::InstanceHandle_t instance,::DDS::InstanceStateKind state)
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance *&instance)

Protected Member Functions

virtual void dds_demarshal (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance *&instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type)
virtual void dispose_unregister (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance *&instance)
virtual void purge_data (OpenDDS::DCPS::SubscriptionInstance *instance)
virtual void release_instance_i (::DDS::InstanceHandle_t handle)

Private Member Functions

::DDS::ReturnCode_t read_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
::DDS::ReturnCode_t take_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
::DDS::ReturnCode_t read_instance_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
::DDS::ReturnCode_t take_instance_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
::DDS::ReturnCode_t read_next_instance_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
::DDS::ReturnCode_t take_next_instance_i (MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples,::DDS::InstanceHandle_t a_handle,::DDS::SampleStateMask sample_states,::DDS::ViewStateMask view_states,::DDS::InstanceStateMask instance_states,::DDS::QueryCondition_ptr a_condition)
void store_instance_data (MessageType *instance_data, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance *&instance_ptr, bool &just_registered, bool &filtered)
void notify_status_condition_no_sample_lock ()
::DDS::ReturnCode_t check_inputs (const char *method_name, MessageSequenceType &received_data,::DDS::SampleInfoSeq &info_seq,::CORBA::Long max_samples)
 Common input read* & take* input processing and precondition checks.

Private Attributes

InstanceMap instance_map_
DataAllocatordata_allocator_

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataReaderImpl_T< MessageType >

Servant for DataReader interface of Traits::MessageType data type.

See the DDS specification, OMG formal/04-12-02, for a description of this interface.

Definition at line 22 of file DataReaderImpl_T.h.


Member Typedef Documentation

template<typename MessageType>
typedef ::OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageType, ACE_Null_Mutex> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataAllocator

Definition at line 36 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface

Definition at line 38 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType

Definition at line 32 of file DataReaderImpl_T.h.

template<typename MessageType>
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType

Definition at line 31 of file DataReaderImpl_T.h.


Constructor & Destructor Documentation

template<typename MessageType>
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T ( void   )  [inline]

Constructor.

Definition at line 41 of file DataReaderImpl_T.h.

00042       : data_allocator_ (0)
00043     {
00044     }

template<typename MessageType>
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T ( void   )  [inline, virtual]

Destructor.

Definition at line 47 of file DataReaderImpl_T.h.

00048     {
00049       for (typename InstanceMap::iterator it = instance_map_.begin();
00050            it != instance_map_.end(); ++it)
00051         {
00052           OpenDDS::DCPS::SubscriptionInstance* ptr =
00053             get_handle_instance(it->second);
00054           this->purge_data(ptr);
00055         }
00056 
00057       delete data_allocator_;
00058       //X SHH release the data samples in the instance_map_.
00059     }


Member Function Documentation

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan ( void *  seq  )  [inline, virtual]

This method provides virtual access to type specific code that is used when loans are automatically returned. The destructor of the sequence supporing zero-copy read calls this method on the datareader that provided the loan.

Parameters:
seq - The sequence of loaned values.
Returns:
Always RETCODE_OK.
thows NONE.

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 599 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00600   {
00601     MessageSequenceType& received_data =
00602       *static_cast< MessageSequenceType*> (seq);
00603 
00604     if (!received_data.release())
00605       {
00606         // this->release_loan(received_data);
00607         received_data.length(0);
00608       }
00609     return ::DDS::RETCODE_OK;
00610   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs ( const char *  method_name,
MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples 
) [inline, private]

Common input read* & take* input processing and precondition checks.

Definition at line 1926 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

01931 {
01932   typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data);
01933 
01934   // ---- start of preconditions common to read and take -----
01935   // SPEC ref v1.2 7.1.2.5.3.8 #1
01936   // NOTE: We can't check maximum() or release() here since those are
01937   //       implementation details of the sequences.  In general, the
01938   //       info_seq will have release() == true and maximum() == 0.
01939   //       If we're in zero-copy mode, the received_data will have
01940   //       release() == false and maximum() == 0.  If it's not
01941   //       zero-copy then received_data will have release == true()
01942   //       and maximum() == anything.
01943   if (received_data.length() != info_seq.length())
01944     {
01945       ACE_DEBUG((LM_DEBUG,
01946                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01947                  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
01948                  ACE_TEXT("sequences do not match.\n"),
01949                  TraitsType::type_name(),
01950                  method_name ));
01951       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01952     }
01953 
01954   //SPEC ref v1.2 7.1.2.5.3.8 #4
01955   if ((received_data.maximum() > 0) && (received_data.release() == false))
01956     {
01957       ACE_DEBUG((LM_DEBUG,
01958                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01959                  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
01960                  ACE_TEXT("maximum %d and owns %d\n"),
01961                  TraitsType::type_name(),
01962                  method_name,
01963                  received_data.maximum(),
01964                  received_data.release() ));
01965 
01966       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01967     }
01968 
01969   if (received_data.maximum() == 0)
01970     {
01971       // not in SPEC but needed.
01972       if (max_samples == ::DDS::LENGTH_UNLIMITED)
01973         {
01974           max_samples =
01975             static_cast< ::CORBA::Long> (received_data_p.max_slots());
01976         }
01977     }
01978   else
01979     {
01980       if (max_samples == ::DDS::LENGTH_UNLIMITED)
01981         {
01982           //SPEC ref v1.2 7.1.2.5.3.8 #5a
01983           max_samples = received_data.maximum();
01984         }
01985       else if (
01986                max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
01987         {
01988           //SPEC ref v1.2 7.1.2.5.3.8 #5c
01989           ACE_DEBUG((LM_DEBUG,
01990                      ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01991                      ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
01992                      TraitsType::type_name(),
01993                      method_name,
01994                      max_samples,
01995                      received_data.maximum()));
01996           return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01997         }
01998       //else
01999       //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
02000     }
02001 
02002   // The spec does not say what to do in this case but it appears to be a good thing.
02003   // Note: max_slots is the greater of the sequence's maximum and init_size.
02004   if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
02005     {
02006       max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
02007     }
02008   //---- end of preconditions common to read and take -----
02009 
02010   return ::DDS::RETCODE_OK;
02011 }

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered ( ::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
const OpenDDS::DCPS::FilterEvaluator evaluator,
const ::DDS::StringSeq params 
) [inline]

Definition at line 649 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::FilterEvaluator::eval().

00654   {
00655     using namespace OpenDDS::DCPS;
00656     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
00657     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
00658 
00659     for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
00660            end = instances_.end(); iter != end; ++iter) {
00661       SubscriptionInstance& inst = *iter->second;
00662 
00663       if ((inst.instance_state_.view_state() & view_states) &&
00664           (inst.instance_state_.instance_state() & instance_states)) {
00665         for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
00666              item = item->next_data_sample_) {
00667           if (item->sample_state_ & sample_states
00668 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00669               && !item->coherent_change_
00670 #endif
00671               ) {
00672             if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) {
00673               return true;
00674             }
00675           }
00676         }
00677       }
00678     }
00679 
00680     return false;
00681   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance *&  instance,
bool &  just_registered,
bool &  filtered,
OpenDDS::DCPS::MarshalingType  marshaling_type 
) [inline, protected, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 895 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::content_filter_, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::Serializer::reset_alignment(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::Serializer::use_rti_serialization().

00900   {
00901     MessageType* data;
00902 
00903     ACE_NEW_MALLOC_NORETURN(data,
00904                             static_cast< MessageType *>(
00905                                                          data_allocator_->malloc(sizeof(MessageType))),
00906                             MessageType);
00907 
00908     const bool cdr = sample.header_.cdr_encapsulation_;
00909 
00910     OpenDDS::DCPS::Serializer ser(
00911                                   sample.sample_,
00912                                   sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00913                                   cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00914 
00915     if (cdr) {
00916       ACE_CDR::ULong header;
00917       ser >> header;
00918     }
00919 
00920     if (cdr && Serializer::use_rti_serialization()) {
00921       // Start counting byte-offset AFTER header
00922       ser.reset_alignment();
00923     }
00924     if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00925       ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(*data);
00926     } else {
00927       ser >> *data;
00928     }
00929 
00930 
00931 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00932     if (!sample.header_.content_filter_) { // if this is true, the writer has already filtered
00933       using OpenDDS::DCPS::ContentFilteredTopicImpl;
00934       if (ContentFilteredTopicImpl* cft =
00935           dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in())) {
00936         if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA
00937             && !cft->filter(*data)) {
00938           filtered = true;
00939           return;
00940         }
00941       }
00942     }
00943 #endif
00944 
00945     store_instance_data(data, sample.header_, instance, just_registered, filtered);
00946   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dec_ref_data_element ( ::OpenDDS::DCPS::ReceivedDataElement item  )  [inline]

Definition at line 617 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::ReceivedDataElement::dec_ref(), and OpenDDS::DCPS::ReceivedDataElement::registered_data_.

00618   {
00619     using ::OpenDDS::DCPS::ReceivedDataElement;
00620 
00621     if (0 == item->dec_ref())
00622       {
00623         if (item->registered_data_ != 0)
00624           {
00625             ACE_GUARD(ACE_Recursive_Thread_Mutex,
00626                       guard,
00627                       this->sample_lock_);
00628 
00629             MessageType* const ptr
00630                   = static_cast< MessageType* >(item->registered_data_);
00631             ACE_DES_FREE (ptr,
00632                           data_allocator_->free,
00633                           MessageType );
00634           }
00635 
00636         ACE_DES_FREE (item,
00637                       rd_allocator_->free,
00638                       ReceivedDataElement);
00639       }
00640   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::delete_instance_map ( void *  map  )  [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 642 of file DataReaderImpl_T.h.

00643   {
00644     InstanceMap* instances = reinterpret_cast <InstanceMap* > (map);
00645     delete instances;
00646   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance *&  instance 
) [inline, protected, virtual]

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 948 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, and OpenDDS::DCPS::KEY_ONLY_MARSHALING.

00950   {
00951     //!!! caller should already have the sample_lock_
00952 
00953     // The data sample in this dispose message does not contain any valid data.
00954     // What it needs here is the key value to identify the instance to dispose.
00955     // The demarshal push this "sample" to received sample list so the user
00956     // can be notified the dispose event.
00957     bool just_registered = false;
00958     bool filtered = false;
00959     OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
00960     if (sample.header_.key_fields_only_) {
00961       marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
00962     }
00963     this->dds_demarshal(sample, instance, just_registered, filtered, marshaling);
00964   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific (  )  [inline, virtual]

Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 65 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DCPS_debug_level, and DDS::RETCODE_OK.

00066     {
00067       data_allocator_ = new DataAllocator(get_n_chunks ());
00068       if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00069         ACE_DEBUG((LM_DEBUG,
00070                    ACE_TEXT("(%P|%t) %CDataReaderImpl::")
00071                    ACE_TEXT("enable_specific-data")
00072                    ACE_TEXT(" Cached_Allocator_With_Overflow ")
00073                    ACE_TEXT("%x with %d chunks\n"),
00074                    TraitsType::type_name(),
00075                    data_allocator_,
00076                    this->get_n_chunks ()));
00077 
00078       return ::DDS::RETCODE_OK;
00079     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
::DDS::InstanceHandle_t  handle 
) [inline]

Definition at line 560 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00563   {
00564     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00565                       guard,
00566                       this->sample_lock_,
00567                       ::DDS::RETCODE_ERROR);
00568 
00569     typename InstanceMap::iterator const the_end = instance_map_.end ();
00570     for (typename InstanceMap::iterator it = instance_map_.begin ();
00571          it != the_end;
00572          ++it)
00573       {
00574         if (it->second == handle)
00575           {
00576             key_holder = it->first;
00577             return ::DDS::RETCODE_OK;
00578           }
00579       }
00580 
00581     return ::DDS::RETCODE_ERROR;
00582   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance *&  instance 
) [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 849 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, DDS::HANDLE_NIL, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::Serializer::use_rti_serialization().

00851   {
00852     //!!! caller should already have the sample_lock_
00853 
00854     MessageType data;
00855 
00856     const bool cdr = sample.header_.cdr_encapsulation_;
00857 
00858     ::OpenDDS::DCPS::Serializer ser(
00859       sample.sample_,
00860       sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00861       cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00862           : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00863 
00864     if (cdr) {
00865       ACE_CDR::ULong header;
00866       ser >> header;
00867     }
00868 
00869     if (cdr && Serializer::use_rti_serialization()) {
00870       // Start counting byte-offset AFTER header
00871       ser.reset_alignment();
00872     }
00873     if (sample.header_.key_fields_only_) {
00874       ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(data);
00875     } else {
00876       ser >> data;
00877     }
00878 
00879 
00880     ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
00881     typename InstanceMap::const_iterator const it = instance_map_.find(data);
00882     if (it != instance_map_.end()) {
00883       handle = it->second;
00884     }
00885 
00886     if (handle == ::DDS::HANDLE_NIL) {
00887       instance = 0;
00888     } else {
00889       instance = get_handle_instance(handle);
00890     }
00891   }

template<typename MessageType>
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data  )  [inline]

Definition at line 584 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL.

00586   {
00587     typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00588 
00589     if (it == instance_map_.end())
00590       {
00591         return ::DDS::HANDLE_NIL;
00592       }
00593     else
00594       {
00595         return it->second;
00596       }
00597   }

template<typename MessageType>
::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic ( const void *  data  )  [inline, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 712 of file DataReaderImpl_T.h.

References lookup_instance().

00713   {
00714     return lookup_instance(*static_cast<const MessageType*>(data));
00715   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock (  )  [inline, private]

Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.

Definition at line 1905 of file DataReaderImpl_T.h.

01906 {
01907   // This member function avoids a deadlock condition which otherwise
01908   // could occur as follows:
01909   // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
01910   // eventually DataReaderImpl::sample_lock_ to lock in call to
01911   // DataReaderImpl::contains_samples().
01912   // Thread2: Call to DataReaderImpl::data_received()
01913   // causes DataReaderImpl::sample_lock_ to lock and eventually
01914   // during notify of status condition a call to WaitSet::signal()
01915   // causes WaitSet::lock_ to lock.
01916   // Because the DataReaderImpl::sample_lock_ is not needed during
01917   // status notification this member function is used in
01918   // store_instance_data() to release sample_lock_ before making
01919   // the notification.
01920   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01921   notify_status_condition();
01922 }

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP ( MessageType  ,
::DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data ( OpenDDS::DCPS::SubscriptionInstance instance  )  [inline, protected, virtual]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 966 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::InstanceState::cancel_release(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, OpenDDS::DCPS::ReceivedDataElementList::remove_head(), and OpenDDS::DCPS::ReceivedDataElementList::size_.

00967   {
00968     instance->instance_state_.cancel_release();
00969 
00970     while (instance->rcvd_samples_.size_ > 0)
00971       {
00972         OpenDDS::DCPS::ReceivedDataElement* head =
00973           instance->rcvd_samples_.remove_head();
00974         dec_ref_data_element(head);
00975       }
00976 
00977     delete instance;
00978   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 81 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00088     {
00089       ::DDS::ReturnCode_t const precond =
00090         check_inputs("read", received_data, info_seq, max_samples);
00091       if (::DDS::RETCODE_OK != precond)
00092         {
00093           return precond;
00094         }
00095 
00096       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00097                         guard,
00098                         this->sample_lock_,
00099                         ::DDS::RETCODE_ERROR);
00100 
00101       return read_i(received_data, info_seq, max_samples, sample_states,
00102                     view_states, instance_states, 0);
00103     }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic ( OpenDDS::DCPS::DataReaderImpl::GenericBundle gen,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count = false 
) [inline]

Definition at line 683 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, DDS::LENGTH_UNLIMITED, DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_.

00688   {
00689 
00690     MessageSequenceType data;
00691     ::DDS::ReturnCode_t rc;
00692     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00693                       guard,
00694                       this->sample_lock_,
00695                       ::DDS::RETCODE_ERROR);
00696     {
00697       rc = read_i(data, gen.info_,
00698                   ::DDS::LENGTH_UNLIMITED,
00699                   sample_states, view_states, instance_states, 0);
00700       if (true == adjust_ref_count ) {
00701         data.increment_references();
00702       }
00703     }
00704     gen.samples_.reserve(data.length());
00705     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00706       gen.samples_.push_back(&data[i]);
00707     }
00708     return rc;
00709 
00710   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 999 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DDS_OPERATION_READ, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.

01011 {
01012 #ifdef OPENDDS_NO_QUERY_CONDITION
01013   ACE_UNUSED_ARG(ignored);
01014 #endif
01015 
01016   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01017 
01018 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01019   if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01020       && ! this->coherent_) {
01021     return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01022   }
01023 
01024   bool group_coherent_ordered
01025     = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01026     && this->subqos_.presentation.coherent_access
01027     && this->subqos_.presentation.ordered_access;
01028 
01029   if (group_coherent_ordered && this->coherent_) {
01030     max_samples = 1;
01031   }
01032 #endif
01033 
01034   ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01035       results(this, received_data, info_seq, max_samples,
01036               this->subqos_.presentation,
01037 #ifndef OPENDDS_NO_QUERY_CONDITION
01038               a_condition,
01039 #endif
01040               ::OpenDDS::DCPS::DDS_OPERATION_READ);
01041 
01042 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01043   if (! group_coherent_ordered) {
01044 #endif
01045     for (typename InstanceMap::iterator it = instance_map_.begin(),
01046            the_end = instance_map_.end(); it != the_end; ++it)
01047       {
01048         ::DDS::InstanceHandle_t handle = it->second;
01049 
01050         OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle);
01051 
01052         if ((inst->instance_state_.view_state() & view_states) &&
01053             (inst->instance_state_.instance_state() & instance_states))
01054           {
01055             size_t i(0);
01056             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01057                  item != 0; item = item->next_data_sample_)
01058               {
01059                 if (item->sample_state_ & sample_states
01060 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01061                     && !item->coherent_change_
01062 #endif
01063                     )
01064                   {
01065                     results.insert_sample(item, inst, ++i);
01066                   }
01067               }
01068           }
01069       }
01070 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01071   }
01072   else {
01073     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01074     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01075   }
01076 #endif
01077 
01078   results.copy_to_user();
01079 
01080   ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01081   if (received_data.length())
01082     {
01083       ret = ::DDS::RETCODE_OK;
01084       if (received_data.maximum() == 0) //using ZeroCopy
01085         {
01086           received_data_p.set_loaner(this);
01087         }
01088     }
01089 
01090   post_read_or_take();
01091 
01092   return ret;
01093 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 368 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00376   {
00377     ::DDS::ReturnCode_t const precond =
00378       check_inputs("read_instance", received_data, info_seq, max_samples);
00379     if (::DDS::RETCODE_OK != precond)
00380       {
00381         return precond;
00382       }
00383 
00384     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00385                       guard,
00386                       this->sample_lock_,
00387                       ::DDS::RETCODE_ERROR);
00388     return read_instance_i(received_data, info_seq, max_samples, a_handle,
00389                            sample_states, view_states, instance_states, 0);
00390   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic ( void *&  data,
::DDS::SampleInfo info,
::DDS::InstanceHandle_t  instance,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 743 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.

00747   {
00748     MessageSequenceType dataseq;
00749     ::DDS::SampleInfoSeq infoseq;
00750     ::DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
00751                                              ::DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
00752                                              instance_states, 0);
00753     if (rc == ::DDS::RETCODE_NO_DATA) return rc;
00754     const CORBA::ULong last = dataseq.length() - 1;
00755     data = new MessageType(dataseq[last]);
00756     info = infoseq[last];
00757     return rc;
00758   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1191 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DDS_OPERATION_READ, OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().

01204 {
01205 #ifdef OPENDDS_NO_QUERY_CONDITION
01206   ACE_UNUSED_ARG(ignored);
01207 #endif
01208 
01209   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01210 
01211   ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01212       results(this, received_data, info_seq, max_samples,
01213               this->subqos_.presentation,
01214 #ifndef OPENDDS_NO_QUERY_CONDITION
01215               a_condition,
01216 #endif
01217               ::OpenDDS::DCPS::DDS_OPERATION_READ);
01218 
01219   OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle);
01220   if (inst == 0) return ::DDS::RETCODE_BAD_PARAMETER;
01221 
01222   if ((inst->instance_state_.view_state() & view_states) &&
01223       (inst->instance_state_.instance_state() & instance_states))
01224     {
01225       size_t i(0);
01226       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01227            item; item = item->next_data_sample_)
01228         {
01229           if (item->sample_state_ & sample_states
01230 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01231               && !item->coherent_change_
01232 #endif
01233               )
01234             {
01235               results.insert_sample(item, inst, ++i);
01236             }
01237         }
01238     }
01239 
01240   results.copy_to_user();
01241 
01242   ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01243   if (received_data.length())
01244     {
01245       ret = ::DDS::RETCODE_OK;
01246       if (received_data.maximum() == 0) //using ZeroCopy
01247         {
01248           received_data_p.set_loaner(this);
01249         }
01250     }
01251 
01252   post_read_or_take();
01253   return ret;
01254 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 416 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00424   {
00425     ::DDS::ReturnCode_t const precond =
00426       check_inputs("read_next_instance", received_data, info_seq, max_samples);
00427     if (::DDS::RETCODE_OK != precond)
00428       {
00429         return precond;
00430       }
00431 
00432     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00433                                 sample_states, view_states, instance_states, 0);
00434   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic ( void *&  data,
::DDS::SampleInfo info,
::DDS::InstanceHandle_t  previous_instance,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 760 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, and DDS::RETCODE_NO_DATA.

00764   {
00765     MessageSequenceType dataseq;
00766     ::DDS::SampleInfoSeq infoseq;
00767     ::DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
00768                                                   ::DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
00769                                                   instance_states, 0);
00770     if (rc == ::DDS::RETCODE_NO_DATA) return rc;
00771     const CORBA::ULong last = dataseq.length() - 1;
00772     data = new MessageType(dataseq[last]);
00773     info = infoseq[last];
00774     return rc;
00775   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1320 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, and DDS::RETCODE_NO_DATA.

01333 {
01334 #ifdef OPENDDS_NO_QUERY_CONDITION
01335   ACE_UNUSED_ARG(ignored);
01336 #endif
01337 
01338   ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01339 
01340   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01341                     guard,
01342                     this->sample_lock_,
01343                     ::DDS::RETCODE_ERROR);
01344 
01345   typename InstanceMap::iterator it;
01346   typename InstanceMap::iterator const the_end = instance_map_.end ();
01347 
01348   if (a_handle == ::DDS::HANDLE_NIL)
01349     {
01350       it = instance_map_.begin ();
01351     }
01352   else
01353     {
01354       for (it = instance_map_.begin ();
01355            it != the_end;
01356            ++it)
01357         {
01358           if (a_handle == it->second)
01359             {
01360               ++it;
01361               break;
01362             }
01363         }
01364     }
01365 
01366   for (; it != the_end; ++it)
01367     {
01368       handle = it->second;
01369       ::DDS::ReturnCode_t const status =
01370           read_instance_i(received_data, info_seq, max_samples, handle,
01371                           sample_states, view_states, instance_states,
01372 #ifndef OPENDDS_NO_QUERY_CONDITION
01373                           a_condition);
01374 #else
01375       0);
01376 #endif
01377   if (status != ::DDS::RETCODE_NO_DATA)
01378     {
01379       post_read_or_take();
01380       return status;
01381     }
01382 }
01383 
01384 post_read_or_take();
01385 return ::DDS::RETCODE_NO_DATA;
01386 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::ReadCondition_ptr  a_condition 
) [inline]

Definition at line 456 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00462   {
00463     ::DDS::ReturnCode_t const precond =
00464       check_inputs("read_next_instance_w_condition", received_data, info_seq,
00465                    max_samples);
00466     if (::DDS::RETCODE_OK != precond)
00467       {
00468         return precond;
00469       }
00470 
00471     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00472                       ::DDS::RETCODE_ERROR);
00473 
00474     if (!has_readcondition(a_condition))
00475       {
00476         return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00477       }
00478 
00479 #ifndef OPENDDS_NO_QUERY_CONDITION
00480     ::DDS::QueryCondition_ptr query_condition =
00481         dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition);
00482 #endif
00483 
00484     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00485                                 a_condition->get_sample_state_mask(),
00486                                 a_condition->get_view_state_mask(),
00487                                 a_condition->get_instance_state_mask(),
00488 #ifndef OPENDDS_NO_QUERY_CONDITION
00489                                 query_condition
00490 #else
00491                                 0
00492 #endif
00493                                 );
00494   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample ( MessageType &  received_data,
::DDS::SampleInfo sample_info 
) [inline]

Definition at line 194 of file DataReaderImpl_T.h.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, and DDS::RETCODE_OK.

00197   {
00198 
00199     bool found_data = false;
00200 
00201     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00202                       guard,
00203                       this->sample_lock_,
00204                       ::DDS::RETCODE_ERROR);
00205 
00206     typename InstanceMap::iterator const the_end = instance_map_.end ();
00207     for (typename InstanceMap::iterator it = instance_map_.begin ();
00208          it != the_end;
00209          ++it)
00210       {
00211         ::DDS::InstanceHandle_t handle = it->second;
00212         OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle);
00213 
00214         bool mrg = false; //most_recent_generation
00215 
00216         if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) &&
00217             (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE))
00218           {
00219             for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_;
00220                  item != 0;
00221                  item = item->next_data_sample_)
00222               {
00223 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00224                 if (item->coherent_change_) continue;
00225 #endif
00226 
00227                 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE)
00228                   {
00229                     if (item->registered_data_ != 0)
00230                       {
00231                         received_data =
00232                           *static_cast< MessageType *> (item->registered_data_);
00233                       }
00234                     ptr->instance_state_.sample_info(sample_info, item);
00235 
00236                     item->sample_state_ = ::DDS::READ_SAMPLE_STATE;
00237 
00238 
00239                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00240 
00241                     found_data = true;
00242                   }
00243                 if (found_data)
00244                   {
00245                     break;
00246                   }
00247               }
00248           }
00249 
00250         if (found_data)
00251           {
00252             if (mrg) ptr->instance_state_.accessed();
00253 
00254             // Get the sample_ranks, generation_ranks, and
00255             // absolute_generation_ranks for this info_seq
00256             this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00257 
00258             break;
00259           }
00260       }
00261     post_read_or_take();
00262     return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA;
00263   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition ( MessageSequenceType received_data,
::DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
::DDS::ReadCondition_ptr  a_condition 
) [inline]

Definition at line 129 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00134     {
00135       ::DDS::ReturnCode_t const precond =
00136         check_inputs("read_w_condition", received_data, sample_info, max_samples);
00137       if (::DDS::RETCODE_OK != precond)
00138         {
00139           return precond;
00140         }
00141 
00142       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00143                         ::DDS::RETCODE_ERROR);
00144 
00145       if (!has_readcondition(a_condition))
00146         {
00147           return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00148         }
00149 
00150       return read_i(received_data, sample_info, max_samples,
00151                     a_condition->get_sample_state_mask(),
00152                     a_condition->get_view_state_mask(),
00153                     a_condition->get_instance_state_mask(),
00154 #ifndef OPENDDS_NO_QUERY_CONDITION
00155                     dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition));
00156 #else
00157       0);
00158 #endif
00159   }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i ( ::DDS::InstanceHandle_t  handle  )  [inline, protected, virtual]

Definition at line 980 of file DataReaderImpl_T.h.

00981   {
00982     typename InstanceMap::iterator const the_end = instance_map_.end ();
00983     typename InstanceMap::iterator it = instance_map_.begin ();
00984     while (it != the_end)
00985       {
00986         if (it->second == handle)
00987           {
00988             typename InstanceMap::iterator curIt = it;
00989             ++ it;
00990             instance_map_.erase (curIt);
00991           }
00992         else
00993           ++ it;
00994       }
00995   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan ( MessageSequenceType received_data  )  [inline]

Definition at line 612 of file DataReaderImpl_T.h.

00613   {
00614     received_data.length(0);
00615   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq 
) [inline]

Definition at line 536 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00539   {
00540     // Some incomplete tests to see that the data and info are from the
00541     // same read.
00542     if (received_data.length() != info_seq.length())
00543       {
00544         return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00545       }
00546 
00547     if (received_data.release())
00548       {
00549         // nothing to do because this is not zero-copy data
00550         return ::DDS::RETCODE_OK;
00551       }
00552     else
00553       {
00554         info_seq.length(0);
00555         received_data.length(0);
00556       }
00557     return ::DDS::RETCODE_OK;
00558   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state ( ::DDS::InstanceHandle_t  instance,
::DDS::InstanceStateKind  state 
) [inline]

Definition at line 827 of file DataReaderImpl_T.h.

References DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::DISPOSE_INSTANCE, get_key_value(), header, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, and OpenDDS::DCPS::UNREGISTER_INSTANCE.

00829   {
00830     using namespace OpenDDS::DCPS;
00831     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00832 
00833     SubscriptionInstance* si = get_handle_instance(instance);
00834     if (si && state != ::DDS::ALIVE_INSTANCE_STATE) {
00835       DataSampleHeader header;
00836       header.message_id_ = (state == ::DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
00837         ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
00838       bool just_registered, filtered;
00839       MessageType* data;
00840       ACE_NEW_MALLOC_NORETURN(data,
00841                               static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))),
00842                               MessageType);
00843       get_key_value(*data, instance);
00844       store_instance_data(data, header, si, just_registered, filtered);
00845       notify_read_conditions();
00846     }
00847   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data ( MessageType *  instance_data,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance *&  instance_ptr,
bool &  just_registered,
bool &  filtered 
) [inline, private]

Definition at line 1454 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::ReceivedDataStrategy::add(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::ReceivedDataElement::coherent_change_, DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, OpenDDS::DCPS::InstanceState::data_was_received(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::InstanceState::dispose_was_received(), OpenDDS::DCPS::InstanceState::disposed_generation_count(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::HANDLE_NIL, OpenDDS::DCPS::ReceivedDataElementList::head_, header, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::keyFromSample(), OpenDDS::DCPS::SubscriptionInstance::last_sequence_, DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::SubscriberImpl::listener_for(), OpenDDS::DCPS::InstanceState::lively(), OpenDDS::DCPS::InstanceState::no_writers_generation_count(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::NOT_READ_SAMPLE_STATE, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, OpenDDS::DCPS::SubscriptionInstance::rcvd_strategy_, DDS::REJECTED_BY_INSTANCES_LIMIT, DDS::REJECTED_BY_SAMPLES_LIMIT, DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT, OpenDDS::DCPS::ReceivedDataElementList::remove(), OpenDDS::DCPS::SAMPLE_DATA, DDS::SAMPLE_LOST_STATUS, DDS::SAMPLE_REJECTED_STATUS, OpenDDS::DCPS::ReceivedDataElement::sample_state_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::ReceivedDataElementList::size_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and OpenDDS::DCPS::InstanceState::unregister_was_received().

01460 {
01461   const bool is_dispose_msg =
01462     header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
01463     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01464   const bool is_unregister_msg =
01465     header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
01466     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01467 
01468   ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01469 
01470   //!!! caller should already have the sample_lock_
01471   //We will unlock it before calling into listeners
01472 
01473   typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
01474 
01475   if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end())
01476     {
01477       ACE_DES_FREE (instance_data,
01478                     data_allocator_->free,
01479                     MessageType );
01480       instance_data = 0;
01481       return;
01482     }
01483 
01484 
01485   if (it == instance_map_.end())
01486     {
01487       std::size_t instances_size = 0;
01488       { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01489         instances_size = instances_.size();
01490       }
01491       if ((this->qos_.resource_limits.max_instances != ::DDS::LENGTH_UNLIMITED) &&
01492           ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances))
01493         {
01494 
01495           ::DDS::DataReaderListener_var listener
01496             = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01497 
01498           set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01499 
01500           sample_rejected_status_.last_reason =
01501             ::DDS::REJECTED_BY_INSTANCES_LIMIT;
01502           ++sample_rejected_status_.total_count;
01503           ++sample_rejected_status_.total_count_change;
01504           sample_rejected_status_.last_instance_handle = handle;
01505 
01506           ::DDS::DataReader_var dr = get_dr_obj_ref();
01507           if (!CORBA::is_nil(listener.in()))
01508             {
01509               ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01510 
01511               listener->on_sample_rejected(dr.in (),
01512                                            sample_rejected_status_);
01513             }  // do we want to do something if listener is nil???
01514           notify_status_condition_no_sample_lock();
01515 
01516           ACE_DES_FREE (instance_data,
01517                         data_allocator_->free,
01518                         MessageType );
01519 
01520           return;
01521         }
01522 
01523       // first find the instance mapin the participant instance map.
01524       // if the instance map for the type is not registered, then
01525       // create the instance map.
01526       // if the instance map for the type exists, then find the
01527       // handle of the instance. If the instance is not registered
01528       //
01529 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01530       InstanceMap* inst = 0;
01531       bool new_handle = true;
01532       if (this->is_exclusive_ownership_) {
01533         if (this->owner_manager_->instance_lock_acquire () != 0) {
01534           ACE_ERROR ((LM_ERROR,
01535                       ACE_TEXT("(%P|%t) ")
01536                       ACE_TEXT("%CDataReaderImpl::")
01537                       ACE_TEXT("store_instance_data, ")
01538                       ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name()));
01539           return;
01540         }
01541 
01542         inst = (InstanceMap*)(
01543                               this->owner_manager_->get_instance_map(this->topic_servant_->type_name(), this));
01544         if (inst != 0) {
01545           typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
01546           if (iter != inst->end ()) {
01547             handle = iter->second;
01548             new_handle = false;
01549           }
01550         }
01551       }
01552 #endif
01553 
01554       just_registered = true;
01555       OpenDDS::DCPS::SubscriptionInstance* instance = 0;
01556       ::DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(instance_data);
01557       handle = handle == ::DDS::HANDLE_NIL ? this->get_next_handle( key) : handle;
01558       ACE_NEW (instance,
01559                OpenDDS::DCPS::SubscriptionInstance(this,
01560                                                    this->qos_,
01561                                                    this->instances_lock_,
01562                                                    handle));
01563 
01564       instance->instance_handle_ = handle;
01565 
01566       { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01567         int ret = OpenDDS::DCPS::bind(instances_, handle, instance);
01568 
01569         if (ret != 0)
01570           {
01571             ACE_ERROR ((LM_ERROR,
01572                         ACE_TEXT("(%P|%t) ")
01573                         ACE_TEXT("%CDataReaderImpl::")
01574                         ACE_TEXT("store_instance_data, ")
01575                         ACE_TEXT("insert handle failed. \n"), TraitsType::type_name()));
01576             return;
01577           }
01578       }
01579 
01580 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01581       if (this->is_exclusive_ownership_) {
01582         if (inst == 0) {
01583           inst = new InstanceMap ();
01584           this->owner_manager_->set_instance_map(
01585                                                  this->topic_servant_->type_name(), reinterpret_cast <void* > (inst), this);
01586         }
01587 
01588         if (new_handle) {
01589           std::pair<typename InstanceMap::iterator, bool> bpair =
01590             inst->insert(typename InstanceMap::value_type(*instance_data,
01591                                                  handle));
01592           if (bpair.second == false)
01593             {
01594               ACE_ERROR ((LM_ERROR,
01595                           ACE_TEXT("(%P|%t) ")
01596                           ACE_TEXT("%CDataReaderImpl::")
01597                           ACE_TEXT("store_instance_data, ")
01598                           ACE_TEXT("insert to participant scope %s failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01599               return;
01600             }
01601         }
01602 
01603         if (this->owner_manager_->instance_lock_release () != 0) {
01604           ACE_ERROR ((LM_ERROR,
01605                       ACE_TEXT("(%P|%t) ")
01606                       ACE_TEXT("%CDataReaderImpl::")
01607                       ACE_TEXT("store_instance_data, ")
01608                       ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name()));
01609           return;
01610         }
01611       }
01612 #endif
01613 
01614       std::pair<typename InstanceMap::iterator, bool> bpair =
01615         instance_map_.insert(typename InstanceMap::value_type(*instance_data,
01616                                                      handle));
01617       if (bpair.second == false)
01618         {
01619           ACE_ERROR ((LM_ERROR,
01620                       ACE_TEXT("(%P|%t) ")
01621                       ACE_TEXT("%CDataReaderImpl::")
01622                       ACE_TEXT("store_instance_data, ")
01623                       ACE_TEXT("insert %s failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01624           return;
01625         }
01626     }
01627   else
01628     {
01629       just_registered = false;
01630       handle = it->second;
01631     }
01632 
01633   if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
01634     {
01635       instance_ptr = get_handle_instance(handle);
01636 
01637       if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
01638         {
01639           // Check instance based QoS policy filters
01640           // (i.e. OWNERSHIP, TIME_BASED_FILTER)
01641           filtered = this->filter_instance(instance_ptr,header.publication_id_);
01642 
01643           if (filtered)
01644             {
01645               ACE_DES_FREE (instance_data,
01646                             data_allocator_->free,
01647                             MessageType );
01648               return;
01649             }
01650         }
01651 
01652       if ((this->qos_.resource_limits.max_samples_per_instance !=
01653            ::DDS::LENGTH_UNLIMITED) &&
01654           (instance_ptr->rcvd_samples_.size_ >=
01655            this->qos_.resource_limits.max_samples_per_instance))
01656         {
01657 
01658           // According to spec 1.2, Samples that contain no data do not
01659           // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01660           // so do not remove the oldest sample when unregister/dispose
01661           // message arrives.
01662 
01663           if  (! is_dispose_msg  && ! is_unregister_msg
01664                && instance_ptr->rcvd_samples_.head_->sample_state_
01665                == ::DDS::NOT_READ_SAMPLE_STATE)
01666             {
01667               // for now the implemented QoS means that if the head sample
01668               // is NOT_READ then none are read.
01669               // TBD - in future we will reads may not read in order so
01670               //       just looking at the head will not be enough.
01671               ::DDS::DataReaderListener_var listener
01672                 = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01673 
01674               set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01675 
01676               sample_rejected_status_.last_reason =
01677                 ::DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
01678               ++sample_rejected_status_.total_count;
01679               ++sample_rejected_status_.total_count_change;
01680               sample_rejected_status_.last_instance_handle = handle;
01681 
01682               ::DDS::DataReader_var dr = get_dr_obj_ref();
01683               if (!CORBA::is_nil(listener.in()))
01684                 {
01685                   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01686 
01687                   listener->on_sample_rejected(dr.in (),
01688                                                sample_rejected_status_);
01689                 }  // do we want to do something if listener is nil???
01690               notify_status_condition_no_sample_lock();
01691 
01692               ACE_DES_FREE (instance_data,
01693                             data_allocator_->free,
01694                             MessageType );
01695 
01696               return;
01697             }
01698           else if (! is_dispose_msg  && ! is_unregister_msg)
01699             {
01700               // Discard the oldest previously-read sample
01701               OpenDDS::DCPS::ReceivedDataElement *item =
01702                 instance_ptr->rcvd_samples_.head_;
01703               instance_ptr->rcvd_samples_.remove(item);
01704               dec_ref_data_element(item);
01705             }
01706         }
01707       else if (this->qos_.resource_limits.max_samples != ::DDS::LENGTH_UNLIMITED)
01708         {
01709           CORBA::Long total_samples = 0;
01710           { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01711             for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
01712                  iter != instances_.end();
01713                  ++iter) {
01714               OpenDDS::DCPS::SubscriptionInstance *ptr = iter->second;
01715 
01716               total_samples += (CORBA::Long) ptr->rcvd_samples_.size_;
01717             }
01718           }
01719 
01720           if(total_samples >= this->qos_.resource_limits.max_samples)
01721             {
01722               // According to spec 1.2, Samples that contain no data do not
01723               // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01724               // so do not remove the oldest sample when unregister/dispose
01725               // message arrives.
01726 
01727               if  (! is_dispose_msg  && ! is_unregister_msg
01728                    && instance_ptr->rcvd_samples_.head_->sample_state_
01729                    == ::DDS::NOT_READ_SAMPLE_STATE)
01730                 {
01731                   // for now the implemented QoS means that if the head sample
01732                   // is NOT_READ then none are read.
01733                   // TBD - in future we will reads may not read in order so
01734                   //       just looking at the head will not be enough.
01735                   ::DDS::DataReaderListener_var listener
01736                     = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01737 
01738                   set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01739 
01740                   sample_rejected_status_.last_reason =
01741                     ::DDS::REJECTED_BY_SAMPLES_LIMIT;
01742                   ++sample_rejected_status_.total_count;
01743                   ++sample_rejected_status_.total_count_change;
01744                   sample_rejected_status_.last_instance_handle = handle;
01745                   ::DDS::DataReader_var dr = get_dr_obj_ref();
01746                   if (!CORBA::is_nil(listener.in()))
01747                     {
01748                       ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01749 
01750                       listener->on_sample_rejected(dr.in (),
01751                                                    sample_rejected_status_);
01752                     }  // do we want to do something if listener is nil???
01753                   notify_status_condition_no_sample_lock();
01754 
01755                   ACE_DES_FREE (instance_data,
01756                                 data_allocator_->free,
01757                                 MessageType );
01758 
01759                   return;
01760                 }
01761               else if (! is_dispose_msg  && ! is_unregister_msg)
01762                 {
01763                   // Discard the oldest previously-read sample
01764                   OpenDDS::DCPS::ReceivedDataElement *item =
01765                     instance_ptr->rcvd_samples_.head_;
01766                   instance_ptr->rcvd_samples_.remove(item);
01767                   dec_ref_data_element(item);
01768                 }
01769             }
01770         }
01771 
01772       if (is_dispose_msg || is_unregister_msg)
01773         {
01774           ACE_DES_FREE (instance_data,
01775                         data_allocator_->free,
01776                         MessageType );
01777           instance_data = 0;
01778         }
01779 
01780       bool event_notify = false;
01781 
01782       if (is_dispose_msg) {
01783         event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_);
01784       }
01785 
01786       if (is_unregister_msg) {
01787         if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) {
01788           event_notify = true;
01789         }
01790       }
01791 
01792       if (!is_dispose_msg && !is_unregister_msg) {
01793         event_notify = true;
01794         instance_ptr->instance_state_.data_was_received(header.publication_id_);
01795       }
01796 
01797       if (!event_notify) {
01798         return;
01799       }
01800 
01801       OpenDDS::DCPS::ReceivedDataElement *ptr;
01802       ACE_NEW_MALLOC (ptr,
01803                       static_cast<OpenDDS::DCPS::ReceivedDataElement *> (
01804                                                                          rd_allocator_->malloc (
01805                                                                                                 sizeof (OpenDDS::DCPS::ReceivedDataElement))),
01806                       OpenDDS::DCPS::ReceivedDataElement(header,
01807                                                          instance_data));
01808 
01809       ptr->disposed_generation_count_ =
01810         instance_ptr->instance_state_.disposed_generation_count();
01811       ptr->no_writers_generation_count_ =
01812         instance_ptr->instance_state_.no_writers_generation_count();
01813 
01814       instance_ptr->last_sequence_ = header.sequence_;
01815 
01816       instance_ptr->rcvd_strategy_->add(ptr);
01817 
01818       if (! is_dispose_msg  && ! is_unregister_msg
01819           && instance_ptr->rcvd_samples_.size_ > get_depth())
01820         {
01821           OpenDDS::DCPS::ReceivedDataElement* head_ptr =
01822             instance_ptr->rcvd_samples_.head_;
01823 
01824           instance_ptr->rcvd_samples_.remove(head_ptr);
01825 
01826           if (head_ptr->sample_state_ == ::DDS::NOT_READ_SAMPLE_STATE)
01827             {
01828               ::DDS::DataReaderListener_var listener
01829                 = listener_for (::DDS::SAMPLE_LOST_STATUS);
01830 
01831               ++sample_lost_status_.total_count;
01832               ++sample_lost_status_.total_count_change;
01833 
01834               set_status_changed_flag(::DDS::SAMPLE_LOST_STATUS, true);
01835 
01836               ::DDS::DataReader_var dr = get_dr_obj_ref();
01837               if (!CORBA::is_nil(listener.in()))
01838                 {
01839                   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01840 
01841                   listener->on_sample_lost(dr.in (), sample_lost_status_);
01842                 }
01843 
01844               notify_status_condition_no_sample_lock();
01845             }
01846 
01847           dec_ref_data_element(head_ptr);
01848         }
01849 
01850 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01851       if (! ptr->coherent_change_) {
01852 #endif
01853         OpenDDS::DCPS::SubscriberImpl* sub = get_subscriber_servant ();
01854 
01855         sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
01856         set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
01857 
01858         ::DDS::SubscriberListener_var sub_listener =
01859             sub->listener_for(::DDS::DATA_ON_READERS_STATUS);
01860         if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_)
01861           {
01862             ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01863 
01864             sub_listener->on_data_on_readers(sub);
01865             sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
01866           }
01867         else
01868           {
01869             sub->notify_status_condition();
01870 
01871             ::DDS::DataReaderListener_var listener =
01872                 listener_for (::DDS::DATA_AVAILABLE_STATUS);
01873 
01874             ::DDS::DataReader_var dr = get_dr_obj_ref();
01875             if (!CORBA::is_nil(listener.in()))
01876               {
01877                 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01878 
01879                 listener->on_data_available(dr.in ());
01880                 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
01881                 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
01882               }
01883             else
01884               {
01885                 notify_status_condition_no_sample_lock();
01886               }
01887           }
01888 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01889       }
01890 #endif
01891     }
01892   else
01893     {
01894       instance_ptr = this->get_handle_instance (handle);
01895       instance_ptr->instance_state_.lively(header.publication_id_);
01896       ACE_DES_FREE (instance_data,
01897                     data_allocator_->free,
01898                     MessageType );
01899     }
01900 }

template<typename MessageType>
::DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data ( const MessageType &  sample,
::DDS::ViewStateKind  view 
) [inline]

Definition at line 779 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::InstanceState::accessed(), DDS::HANDLE_NIL, header, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::SubscriptionInstance::instance_state_, lookup_instance(), DDS::NOT_NEW_VIEW_STATE, and OpenDDS::DCPS::SAMPLE_DATA.

00781   {
00782     using namespace OpenDDS::DCPS;
00783     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
00784                      ::DDS::HANDLE_NIL);
00785 
00786 #ifndef OPENDDS_NO_MULTI_TOPIC
00787     ::DDS::TopicDescription_var descr = get_topicdescription();
00788     if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
00789       if (!mt->filter(sample)) {
00790         return ::DDS::HANDLE_NIL;
00791       }
00792     }
00793 #endif
00794 
00795     get_subscriber_servant()->data_received(this);
00796 
00797     ::DDS::InstanceHandle_t inst = lookup_instance(sample);
00798     bool filtered;
00799     SubscriptionInstance* instance = 0;
00800 
00801     // Call store_instance_data() once or twice, depending on if we need to
00802     // process the INSTANCE_REGISTRATION.  In either case, store_instance_data()
00803     // owns the memory for the sample and it must come from the correct allocator.
00804     for (int i = 0; i < 2; ++i) {
00805       if (i == 0 && inst != ::DDS::HANDLE_NIL) continue;
00806 
00807       DataSampleHeader header;
00808       header.message_id_ = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
00809       bool just_registered;
00810       MessageType* data;
00811       ACE_NEW_MALLOC_NORETURN(data,
00812                               static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))),
00813                               MessageType(sample));
00814       store_instance_data(data, header, instance, just_registered, filtered);
00815       if (instance) inst = instance->instance_handle_;
00816     }
00817 
00818     if (!filtered) {
00819       if (view == ::DDS::NOT_NEW_VIEW_STATE) {
00820         instance->instance_state_.accessed();
00821       }
00822       notify_read_conditions();
00823     }
00824     return inst;
00825   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( OpenDDS::DCPS::AbstractSamples samples,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 717 of file DataReaderImpl_T.h.

References DDS::LENGTH_UNLIMITED, OpenDDS::DCPS::AbstractSamples::push_back(), OpenDDS::DCPS::AbstractSamples::reserve(), and DDS::RETCODE_ERROR.

00721   {
00722 
00723     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00724                       guard,
00725                       this->sample_lock_,
00726                       ::DDS::RETCODE_ERROR);
00727 
00728 
00729     MessageSequenceType data;
00730     ::DDS::SampleInfoSeq infos;
00731     ::DDS::ReturnCode_t rc = take_i(data, infos, ::DDS::LENGTH_UNLIMITED,
00732                                     sample_states, view_states, instance_states, 0);
00733 
00734     samples.reserve(data.length());
00735 
00736     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00737       samples.push_back(infos[i], &data[i]);
00738     }
00739 
00740     return rc;
00741   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 105 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00112     {
00113       ::DDS::ReturnCode_t const precond =
00114         check_inputs("take", received_data, info_seq, max_samples);
00115       if (::DDS::RETCODE_OK != precond)
00116         {
00117           return precond;
00118         }
00119 
00120       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00121                         guard,
00122                         this->sample_lock_,
00123                         ::DDS::RETCODE_ERROR);
00124 
00125       return take_i(received_data, info_seq, max_samples, sample_states,
00126                     view_states, instance_states, 0);
00127     }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1095 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DDS_OPERATION_TAKE, DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::RakeData::index_in_instance_, OpenDDS::DCPS::RakeData::rde_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::RakeData::si_.

01107 {
01108 #ifdef OPENDDS_NO_QUERY_CONDITION
01109   ACE_UNUSED_ARG(ignored);
01110 #endif
01111 
01112   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01113 
01114 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01115   if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01116       && ! this->coherent_) {
01117     return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01118   }
01119 
01120   bool group_coherent_ordered
01121     = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01122     && this->subqos_.presentation.coherent_access
01123     && this->subqos_.presentation.ordered_access;
01124 
01125   if (group_coherent_ordered && this->coherent_) {
01126     max_samples = 1;
01127   }
01128 #endif
01129 
01130   ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01131       results(this, received_data, info_seq, max_samples,
01132               this->subqos_.presentation,
01133 #ifndef OPENDDS_NO_QUERY_CONDITION
01134               a_condition,
01135 #endif
01136               ::OpenDDS::DCPS::DDS_OPERATION_TAKE);
01137 
01138 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01139   if (! group_coherent_ordered) {
01140 #endif
01141 
01142     for (typename InstanceMap::iterator it = instance_map_.begin(),
01143            the_end = instance_map_.end(); it != the_end; ++it)
01144       {
01145         ::DDS::InstanceHandle_t handle = it->second;
01146 
01147         OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle);
01148 
01149         if ((inst->instance_state_.view_state() & view_states) &&
01150             (inst->instance_state_.instance_state() & instance_states))
01151           {
01152             size_t i(0);
01153             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01154                  item != 0; item = item->next_data_sample_)
01155               {
01156                 if (item->sample_state_ & sample_states
01157 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01158                     && !item->coherent_change_
01159 #endif
01160                     )
01161                   {
01162                     results.insert_sample(item, inst, ++i);
01163                   }
01164               }
01165           }
01166       }
01167 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01168   }
01169   else {
01170     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01171     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01172   }
01173 #endif
01174 
01175   results.copy_to_user();
01176 
01177   ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01178   if (received_data.length())
01179     {
01180       ret = ::DDS::RETCODE_OK;
01181       if (received_data.maximum() == 0) //using ZeroCopy
01182         {
01183           received_data_p.set_loaner(this);
01184         }
01185     }
01186 
01187   post_read_or_take();
01188   return ret;
01189 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 392 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00400   {
00401     ::DDS::ReturnCode_t const precond =
00402       check_inputs("take_instance", received_data, info_seq, max_samples);
00403     if (::DDS::RETCODE_OK != precond)
00404       {
00405         return precond;
00406       }
00407 
00408     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00409                       guard,
00410                       this->sample_lock_,
00411                       ::DDS::RETCODE_ERROR);
00412     return take_instance_i(received_data, info_seq, max_samples, a_handle,
00413                            sample_states, view_states, instance_states, 0);
00414   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1256 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::ReceivedDataElementList::head_, OpenDDS::DCPS::InstanceState::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::InstanceState::view_state().

01269 {
01270 #ifdef OPENDDS_NO_QUERY_CONDITION
01271   ACE_UNUSED_ARG(ignored);
01272 #endif
01273 
01274   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01275 
01276   ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01277       results(this, received_data, info_seq, max_samples,
01278               this->subqos_.presentation,
01279 #ifndef OPENDDS_NO_QUERY_CONDITION
01280               a_condition,
01281 #endif
01282               ::OpenDDS::DCPS::DDS_OPERATION_TAKE);
01283 
01284   OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle);
01285 
01286   if ((inst->instance_state_.view_state() & view_states) &&
01287       (inst->instance_state_.instance_state() & instance_states))
01288     {
01289       size_t i(0);
01290       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01291            item; item = item->next_data_sample_)
01292         {
01293           if (item->sample_state_ & sample_states
01294 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01295               && !item->coherent_change_
01296 #endif
01297               )
01298             {
01299               results.insert_sample(item, inst, ++i);
01300             }
01301         }
01302     }
01303 
01304   results.copy_to_user();
01305 
01306   ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01307   if (received_data.length())
01308     {
01309       ret = ::DDS::RETCODE_OK;
01310       if (received_data.maximum() == 0) //using ZeroCopy
01311         {
01312           received_data_p.set_loaner(this);
01313         }
01314     }
01315 
01316   post_read_or_take();
01317   return ret;
01318 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states 
) [inline]

Definition at line 436 of file DataReaderImpl_T.h.

References DDS::RETCODE_OK.

00444   {
00445     ::DDS::ReturnCode_t const precond =
00446       check_inputs("take_next_instance", received_data, info_seq, max_samples);
00447     if (::DDS::RETCODE_OK != precond)
00448       {
00449         return precond;
00450       }
00451 
00452     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00453                                 sample_states, view_states, instance_states, 0);
00454   }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::SampleStateMask  sample_states,
::DDS::ViewStateMask  view_states,
::DDS::InstanceStateMask  instance_states,
::DDS::QueryCondition_ptr  a_condition 
) [inline, private]

Definition at line 1388 of file DataReaderImpl_T.h.

References DDS::HANDLE_NIL, DDS::RETCODE_ERROR, and DDS::RETCODE_NO_DATA.

01401 {
01402 #ifdef OPENDDS_NO_QUERY_CONDITION
01403   ACE_UNUSED_ARG(ignored);
01404 #endif
01405 
01406   ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01407 
01408   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01409                     guard,
01410                     this->sample_lock_,
01411                     ::DDS::RETCODE_ERROR);
01412 
01413   typename InstanceMap::iterator it;
01414   typename InstanceMap::iterator const the_end = instance_map_.end ();
01415 
01416   if (a_handle == ::DDS::HANDLE_NIL)
01417     {
01418       it = instance_map_.begin ();
01419     }
01420   else
01421     {
01422       for (it = instance_map_.begin (); it != the_end; ++it)
01423         {
01424           if (a_handle == it->second)
01425             {
01426               ++it;
01427               break;
01428             }
01429         }
01430     }
01431 
01432   for (; it != the_end; ++it)
01433     {
01434       handle = it->second;
01435       ::DDS::ReturnCode_t const status =
01436           take_instance_i(received_data, info_seq, max_samples, handle,
01437                           sample_states, view_states, instance_states,
01438 #ifndef OPENDDS_NO_QUERY_CONDITION
01439                           a_condition);
01440 #else
01441       0);
01442 #endif
01443   if (status != ::DDS::RETCODE_NO_DATA)
01444     {
01445       total_samples();  // see if we are empty
01446       post_read_or_take();
01447       return status;
01448     }
01449 }
01450 post_read_or_take();
01451 return ::DDS::RETCODE_NO_DATA;
01452 }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition ( MessageSequenceType received_data,
::DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
::DDS::InstanceHandle_t  a_handle,
::DDS::ReadCondition_ptr  a_condition 
) [inline]

Definition at line 496 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00502   {
00503     ::DDS::ReturnCode_t const precond =
00504       check_inputs("take_next_instance_w_condition", received_data, info_seq,
00505                    max_samples);
00506     if (::DDS::RETCODE_OK != precond)
00507       {
00508         return precond;
00509       }
00510 
00511     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00512                       ::DDS::RETCODE_ERROR);
00513 
00514     if (!has_readcondition(a_condition))
00515       {
00516         return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00517       }
00518 
00519 #ifndef OPENDDS_NO_QUERY_CONDITION
00520     ::DDS::QueryCondition_ptr query_condition =
00521         dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition);
00522 #endif
00523 
00524     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00525                                 a_condition->get_sample_state_mask(),
00526                                 a_condition->get_view_state_mask(),
00527                                 a_condition->get_instance_state_mask(),
00528 #ifndef OPENDDS_NO_QUERY_CONDITION
00529                                 query_condition
00530 #else
00531                                 0
00532 #endif
00533                                 );
00534   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample ( MessageType &  received_data,
::DDS::SampleInfo sample_info 
) [inline]

Definition at line 265 of file DataReaderImpl_T.h.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::ReceivedDataElement::coherent_change_, OpenDDS::DCPS::ReceivedDataElement::next_data_sample_, DDS::NOT_READ_SAMPLE_STATE, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::ReceivedDataElement::registered_data_, DDS::RETCODE_ERROR, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, and OpenDDS::DCPS::ReceivedDataElement::sample_state_.

00268   {
00269     bool found_data = false;
00270 
00271 
00272     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00273                       guard,
00274                       this->sample_lock_,
00275                       ::DDS::RETCODE_ERROR);
00276 
00277     typename InstanceMap::iterator const the_end = instance_map_.end ();
00278     for (typename InstanceMap::iterator it = instance_map_.begin ();
00279          it != the_end;
00280          ++it)
00281       {
00282         ::DDS::InstanceHandle_t handle = it->second;
00283         OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle);
00284 
00285         bool mrg = false; //most_recent_generation
00286 
00287         OpenDDS::DCPS::ReceivedDataElement *tail = 0;
00288         if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) &&
00289             (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE))
00290           {
00291 
00292             OpenDDS::DCPS::ReceivedDataElement *next;
00293             tail = 0;
00294             OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
00295             while (item)
00296               {
00297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00298                 if (item->coherent_change_)
00299                   {
00300                     item = item->next_data_sample_;
00301                     continue;
00302                   }
00303 #endif
00304                 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE)
00305                   {
00306                     if (item->registered_data_ != 0)
00307                       {
00308                         received_data =
00309                           *static_cast< MessageType *> (item->registered_data_);
00310                       }
00311                     ptr->instance_state_.sample_info(sample_info, item);
00312 
00313                     item->sample_state_ = ::DDS::READ_SAMPLE_STATE;
00314 
00315                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00316 
00317                     if (item == ptr->rcvd_samples_.tail_)
00318                       {
00319                         tail = ptr->rcvd_samples_.tail_;
00320                         item = item->next_data_sample_;
00321                       }
00322                     else
00323                       {
00324                         next = item->next_data_sample_;
00325 
00326                         ptr->rcvd_samples_.remove(item);
00327                         dec_ref_data_element(item);
00328 
00329                         item = next;
00330                       }
00331 
00332                     found_data = true;
00333                   }
00334                 if (found_data)
00335                   {
00336                     break;
00337                   }
00338               }
00339           }
00340 
00341         if (found_data)
00342           {
00343             if (mrg) ptr->instance_state_.accessed();
00344 
00345             //
00346             // Get the sample_ranks, generation_ranks, and
00347             // absolute_generation_ranks for this info_seq
00348             //
00349             if (tail)
00350               {
00351                 this->sample_info(sample_info, tail);
00352 
00353                 ptr->rcvd_samples_.remove(tail);
00354                 dec_ref_data_element(tail);
00355               }
00356             else
00357               {
00358                 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00359               }
00360 
00361             break;
00362           }
00363       }
00364     post_read_or_take();
00365     return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA;
00366   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition ( MessageSequenceType received_data,
::DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
::DDS::ReadCondition_ptr  a_condition 
) [inline]

Definition at line 161 of file DataReaderImpl_T.h.

References DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00166     {
00167       ::DDS::ReturnCode_t const precond =
00168         check_inputs("take_w_condition", received_data, sample_info, max_samples);
00169       if (::DDS::RETCODE_OK != precond)
00170         {
00171           return precond;
00172         }
00173 
00174       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00175                         ::DDS::RETCODE_ERROR);
00176 
00177       if (!has_readcondition(a_condition))
00178         {
00179           return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00180         }
00181 
00182       return take_i(received_data, sample_info, max_samples,
00183                     a_condition->get_sample_state_mask(),
00184                     a_condition->get_view_state_mask(),
00185                     a_condition->get_instance_state_mask(),
00186 #ifndef OPENDDS_NO_QUERY_CONDITION
00187                     dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition)
00188 #else
00189                     0
00190 #endif
00191                     );
00192     }


Member Data Documentation

template<typename MessageType>
DataAllocator* OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator_ [private]

Definition at line 2015 of file DataReaderImpl_T.h.

template<typename MessageType>
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_ [private]

Definition at line 2014 of file DataReaderImpl_T.h.


The documentation for this class was generated from the following file:
Generated on Fri Feb 12 20:06:13 2016 for OpenDDS by  doxygen 1.4.7