LCOV - code coverage report
Current view: top level - DCPS - DataReaderImpl_T.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 1110 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 3492 0.0 %

          Line data    Source code
       1             : #ifndef OPENDDS_DCPS_DATAREADERIMPL_T_H
       2             : #define OPENDDS_DCPS_DATAREADERIMPL_T_H
       3             : 
       4             : #include <ace/config-lite.h>
       5             : 
       6             : #ifdef ACE_HAS_CPP11
       7             : #  define OPENDDS_HAS_STD_SHARED_PTR
       8             : #endif
       9             : 
      10             : #include "MultiTopicImpl.h"
      11             : #include "RakeResults_T.h"
      12             : #include "SubscriberImpl.h"
      13             : #include "BuiltInTopicUtils.h"
      14             : #include "Util.h"
      15             : #include "TypeSupportImpl.h"
      16             : #include "dcps_export.h"
      17             : #include "GuidConverter.h"
      18             : #include "XTypes/DynamicDataAdapter.h"
      19             : 
      20             : #ifndef OPENDDS_HAS_STD_SHARED_PTR
      21             : #  include <ace/Bound_Ptr.h>
      22             : #endif
      23             : #include <ace/Time_Value.h>
      24             : 
      25             : #ifndef OPENDDS_HAS_STD_SHARED_PTR
      26             : #  include <memory>
      27             : #endif
      28             : 
      29             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      30             : 
      31             : namespace OpenDDS {
      32             :   namespace DCPS {
      33             : 
      34             :   /** Servant for DataReader interface of Traits::MessageType data type.
      35             :    *
      36             :    * See the DDS specification, OMG formal/2015-04-10, for a description of
      37             :    * this interface.
      38             :    *
      39             :    */
      40             :   template <typename MessageType>
      41             :   class
      42             : #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
      43             :     OpenDDS_Dcps_Export
      44             : #endif
      45             :   DataReaderImpl_T
      46             :     : public virtual LocalObject<typename DDSTraits<MessageType>::DataReaderType>
      47             :     , public virtual DataReaderImpl
      48             :   {
      49             :   public:
      50             :     typedef DDSTraits<MessageType> TraitsType;
      51             :     typedef MarshalTraits<MessageType> MarshalTraitsType;
      52             :     typedef typename TraitsType::MessageSequenceType MessageSequenceType;
      53             : 
      54             :     typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
      55             :                               typename TraitsType::LessThanType) InstanceMap;
      56             :     typedef OPENDDS_MAP(DDS::InstanceHandle_t, typename InstanceMap::iterator) ReverseInstanceMap;
      57             : 
      58             :     class SharedInstanceMap
      59             :       : public virtual RcObject
      60             :       , public InstanceMap
      61             :     {
      62             :     };
      63             : 
      64             :     typedef RcHandle<SharedInstanceMap> SharedInstanceMap_rch;
      65             : 
      66             :     typedef typename TraitsType::DataReaderType Interface;
      67             : 
      68           0 :     CORBA::Boolean _is_a(const char* type_id)
      69             :     {
      70           0 :       return Interface::_is_a(type_id);
      71             :     }
      72             : 
      73           0 :     const char* _interface_repository_id() const
      74             :     {
      75           0 :       return Interface::_interface_repository_id();
      76             :     }
      77             : 
      78           0 :     CORBA::Boolean marshal(TAO_OutputCDR&)
      79             :     {
      80           0 :       return false;
      81             :     }
      82             : 
      83             :     // work around "hides overloaded virtual" warnings when MessageType=DynamicSample
      84             :     using Interface::read_next_sample;
      85             :     using Interface::take_next_sample;
      86             :     using Interface::lookup_instance;
      87             :     using Interface::get_key_value;
      88             : 
      89             :     class MessageTypeWithAllocator
      90             :       : public MessageType
      91             :       , public EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>
      92             :     {
      93             :     public:
      94             :       void* operator new(size_t size, ACE_New_Allocator& pool);
      95             :       void operator delete(void* memory, ACE_New_Allocator& pool);
      96             :       void operator delete(void* memory);
      97             : 
      98           0 :       MessageTypeWithAllocator(){}
      99           0 :       MessageTypeWithAllocator(const MessageType& other)
     100           0 :         : MessageType(other)
     101             :       {
     102           0 :       }
     103             : 
     104           0 :       const MessageType* message() const { return this; }
     105             : 
     106             : #ifndef OPENDDS_HAS_STD_UNIQUE_PTR
     107             :       using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::_remove_ref;
     108             :       using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::_add_ref;
     109             :       using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::ref_count;
     110             : #endif
     111             :     };
     112             : 
     113             :     struct MessageTypeMemoryBlock {
     114             :       MessageTypeWithAllocator element_;
     115             :       ACE_New_Allocator* allocator_;
     116             :     };
     117             : 
     118             :     typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageTypeMemoryBlock, ACE_Thread_Mutex>  DataAllocator;
     119             : 
     120           0 :     DataReaderImpl_T()
     121           0 :       : filter_delayed_sample_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl_T::filter_delayed))
     122           0 :       , marshal_skip_serialize_(false)
     123             :     {
     124           0 :       initialize_lookup_maps();
     125           0 :     }
     126             : 
     127           0 :     virtual ~DataReaderImpl_T()
     128             :     {
     129           0 :       filter_delayed_sample_task_->cancel();
     130             : 
     131           0 :       for (typename InstanceMap::iterator it = instance_map_.begin();
     132           0 :            it != instance_map_.end(); ++it)
     133             :         {
     134           0 :           OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
     135           0 :           if (!ptr) continue;
     136           0 :           purge_data(ptr);
     137             :         }
     138             :       //X SHH release the data samples in the instance_map_.
     139           0 :     }
     140             : 
     141             :     /**
     142             :      * Do parts of enable specific to the datatype.
     143             :      * Called by DataReaderImpl::enable().
     144             :      */
     145           0 :     virtual DDS::ReturnCode_t enable_specific ()
     146             :     {
     147           0 :       data_allocator().reset(new DataAllocator(get_n_chunks ()));
     148           0 :       if (OpenDDS::DCPS::DCPS_debug_level >= 2)
     149           0 :         ACE_DEBUG((LM_DEBUG,
     150             :                    ACE_TEXT("(%P|%t) %CDataReaderImpl::")
     151             :                    ACE_TEXT("enable_specific-data")
     152             :                    ACE_TEXT(" Cached_Allocator_With_Overflow ")
     153             :                    ACE_TEXT("%x with %d chunks\n"),
     154             :                    TraitsType::type_name(),
     155             :                    data_allocator().get(),
     156             :                    get_n_chunks ()));
     157             : 
     158           0 :       return DDS::RETCODE_OK;
     159             :     }
     160             : 
     161           0 :     virtual DDS::ReturnCode_t read (
     162             :                                     MessageSequenceType & received_data,
     163             :                                     DDS::SampleInfoSeq & info_seq,
     164             :                                     ::CORBA::Long max_samples,
     165             :                                     DDS::SampleStateMask sample_states,
     166             :                                     DDS::ViewStateMask view_states,
     167             :                                     DDS::InstanceStateMask instance_states)
     168             :     {
     169             :       DDS::ReturnCode_t const precond =
     170           0 :         check_inputs("read", received_data, info_seq, max_samples);
     171           0 :       if (DDS::RETCODE_OK != precond)
     172             :         {
     173           0 :           return precond;
     174             :         }
     175             : 
     176           0 :       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
     177             :                         guard,
     178             :                         sample_lock_,
     179             :                         DDS::RETCODE_ERROR);
     180             : 
     181           0 :       return read_i(received_data, info_seq, max_samples, sample_states,
     182           0 :                     view_states, instance_states, 0);
     183           0 :     }
     184             : 
     185           0 :     virtual DDS::ReturnCode_t take (
     186             :                                       MessageSequenceType & received_data,
     187             :                                       DDS::SampleInfoSeq & info_seq,
     188             :                                       ::CORBA::Long max_samples,
     189             :                                       DDS::SampleStateMask sample_states,
     190             :                                       DDS::ViewStateMask view_states,
     191             :                                       DDS::InstanceStateMask instance_states)
     192             :     {
     193             :       DDS::ReturnCode_t const precond =
     194           0 :         check_inputs("take", received_data, info_seq, max_samples);
     195           0 :       if (DDS::RETCODE_OK != precond)
     196             :         {
     197           0 :           return precond;
     198             :         }
     199             : 
     200           0 :       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
     201             :                         guard,
     202             :                         sample_lock_,
     203             :                         DDS::RETCODE_ERROR);
     204             : 
     205           0 :       return take_i(received_data, info_seq, max_samples, sample_states,
     206           0 :                     view_states, instance_states, 0);
     207           0 :     }
     208             : 
     209           0 :     virtual DDS::ReturnCode_t read_w_condition (
     210             :                                                   MessageSequenceType & received_data,
     211             :                                                   DDS::SampleInfoSeq & sample_info,
     212             :                                                   ::CORBA::Long max_samples,
     213             :                                                   DDS::ReadCondition_ptr a_condition)
     214             :     {
     215             :       DDS::ReturnCode_t const precond =
     216           0 :         check_inputs("read_w_condition", received_data, sample_info, max_samples);
     217           0 :       if (DDS::RETCODE_OK != precond)
     218             :         {
     219           0 :           return precond;
     220             :         }
     221             : 
     222           0 :       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     223             :                         DDS::RETCODE_ERROR);
     224             : 
     225           0 :       if (!has_readcondition(a_condition))
     226             :         {
     227           0 :           return DDS::RETCODE_PRECONDITION_NOT_MET;
     228             :         }
     229             : 
     230           0 :       return read_i(received_data, sample_info, max_samples,
     231           0 :                     a_condition->get_sample_state_mask(),
     232           0 :                     a_condition->get_view_state_mask(),
     233           0 :                     a_condition->get_instance_state_mask(),
     234             : #ifndef OPENDDS_NO_QUERY_CONDITION
     235           0 :                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
     236             : #else
     237             :       0);
     238             : #endif
     239           0 :   }
     240             : 
     241           0 :     virtual DDS::ReturnCode_t take_w_condition (
     242             :                                                   MessageSequenceType & received_data,
     243             :                                                   DDS::SampleInfoSeq & sample_info,
     244             :                                                   ::CORBA::Long max_samples,
     245             :                                                   DDS::ReadCondition_ptr a_condition)
     246             :     {
     247             :       DDS::ReturnCode_t const precond =
     248           0 :         check_inputs("take_w_condition", received_data, sample_info, max_samples);
     249           0 :       if (DDS::RETCODE_OK != precond)
     250             :         {
     251           0 :           return precond;
     252             :         }
     253             : 
     254           0 :       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     255             :                         DDS::RETCODE_ERROR);
     256             : 
     257           0 :       if (!has_readcondition(a_condition))
     258             :         {
     259           0 :           return DDS::RETCODE_PRECONDITION_NOT_MET;
     260             :         }
     261             : 
     262           0 :       return take_i(received_data, sample_info, max_samples,
     263           0 :                     a_condition->get_sample_state_mask(),
     264           0 :                     a_condition->get_view_state_mask(),
     265           0 :                     a_condition->get_instance_state_mask(),
     266             : #ifndef OPENDDS_NO_QUERY_CONDITION
     267           0 :                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
     268             : #else
     269             :                     0
     270             : #endif
     271           0 :                     );
     272           0 :     }
     273             : 
     274           0 :   virtual DDS::ReturnCode_t read_next_sample(MessageType& received_data,
     275             :                                              DDS::SampleInfo& sample_info_ref)
     276             :   {
     277           0 :     bool found_data = false;
     278           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
     279             : 
     280           0 :     const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
     281             : 
     282           0 :     const CORBA::ULong sample_states = DDS::NOT_READ_SAMPLE_STATE;
     283           0 :     const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
     284           0 :     for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
     285           0 :       ++next; // pre-increment iterator, in case updates cause changes to match set
     286           0 :       const DDS::InstanceHandle_t handle = *it;
     287           0 :       const SubscriptionInstance_rch inst = get_handle_instance(handle);
     288           0 :       if (!inst) continue;
     289             : 
     290           0 :       bool most_recent_generation = false;
     291           0 :       for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
     292           0 :            !found_data && item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
     293           0 :         if (item->registered_data_) {
     294           0 :           received_data = *static_cast<MessageType*>(item->registered_data_);
     295             :         }
     296           0 :         inst->instance_state_->sample_info(sample_info_ref, item);
     297           0 :         inst->rcvd_samples_.mark_read(item);
     298             : 
     299           0 :         const ValueDispatcher* vd = get_value_dispatcher();
     300           0 :         if (observer && item->registered_data_ && vd) {
     301           0 :           Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
     302           0 :           observer->on_sample_read(this, s);
     303             :         }
     304             : 
     305           0 :         if (!most_recent_generation) {
     306           0 :           most_recent_generation = inst->instance_state_->most_recent_generation(item);
     307             :         }
     308             : 
     309           0 :         found_data = true;
     310             :       }
     311             : 
     312           0 :       if (found_data) {
     313           0 :         if (most_recent_generation) {
     314           0 :           inst->instance_state_->accessed();
     315             :         }
     316             :         // Get the sample_ranks, generation_ranks, and
     317             :         // absolute_generation_ranks for this info_seq
     318           0 :         sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
     319             : 
     320           0 :         break;
     321             :       }
     322             :     }
     323             : 
     324           0 :     post_read_or_take();
     325           0 :     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
     326           0 :   }
     327             : 
     328           0 :   virtual DDS::ReturnCode_t take_next_sample(MessageType& received_data,
     329             :                                              DDS::SampleInfo& sample_info_ref)
     330             :   {
     331           0 :     bool found_data = false;
     332           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
     333             : 
     334           0 :     const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
     335             : 
     336           0 :     const CORBA::ULong sample_states = DDS::NOT_READ_SAMPLE_STATE;
     337           0 :     const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
     338           0 :     for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
     339           0 :       ++next; // pre-increment iterator, in case updates cause changes to match set
     340           0 :       const DDS::InstanceHandle_t handle = *it;
     341           0 :       const SubscriptionInstance_rch inst = get_handle_instance(handle);
     342           0 :       if (!inst) continue;
     343             : 
     344           0 :       bool most_recent_generation = false;
     345           0 :       ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
     346           0 :       if (item) {
     347           0 :         if (item->registered_data_) {
     348           0 :           received_data = *static_cast<MessageType*>(item->registered_data_);
     349             :         }
     350           0 :         inst->instance_state_->sample_info(sample_info_ref, item);
     351           0 :         inst->rcvd_samples_.mark_read(item);
     352             : 
     353           0 :         const ValueDispatcher* vd = get_value_dispatcher();
     354           0 :         if (observer && item->registered_data_ && vd) {
     355           0 :           Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
     356           0 :           observer->on_sample_taken(this, s);
     357             :         }
     358             : 
     359           0 :         if (!most_recent_generation) {
     360           0 :           most_recent_generation = inst->instance_state_->most_recent_generation(item);
     361             :         }
     362             : 
     363           0 :         if (most_recent_generation) {
     364           0 :           inst->instance_state_->accessed();
     365             :         }
     366             : 
     367             :         // Get the sample_ranks, generation_ranks, and
     368             :         // absolute_generation_ranks for this info_seq
     369           0 :         sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
     370             : 
     371           0 :         inst->rcvd_samples_.remove(item);
     372           0 :         item->dec_ref();
     373           0 :         item = 0;
     374             : 
     375           0 :         found_data = true;
     376             : 
     377           0 :         break;
     378             :       }
     379             :     }
     380             : 
     381           0 :     post_read_or_take();
     382           0 :     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
     383           0 :   }
     384             : 
     385           0 :   virtual DDS::ReturnCode_t read_instance (
     386             :                                              MessageSequenceType & received_data,
     387             :                                              DDS::SampleInfoSeq & info_seq,
     388             :                                              ::CORBA::Long max_samples,
     389             :                                              DDS::InstanceHandle_t a_handle,
     390             :                                              DDS::SampleStateMask sample_states,
     391             :                                              DDS::ViewStateMask view_states,
     392             :                                              DDS::InstanceStateMask instance_states)
     393             :   {
     394             :     DDS::ReturnCode_t const precond =
     395           0 :       check_inputs("read_instance", received_data, info_seq, max_samples);
     396           0 :     if (DDS::RETCODE_OK != precond)
     397             :       {
     398           0 :         return precond;
     399             :       }
     400             : 
     401           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
     402             :                       guard,
     403             :                       sample_lock_,
     404             :                       DDS::RETCODE_ERROR);
     405           0 :     return read_instance_i(received_data, info_seq, max_samples, a_handle,
     406           0 :                            sample_states, view_states, instance_states, 0);
     407           0 :   }
     408             : 
     409           0 :   virtual DDS::ReturnCode_t take_instance (
     410             :                                              MessageSequenceType & received_data,
     411             :                                              DDS::SampleInfoSeq & info_seq,
     412             :                                              ::CORBA::Long max_samples,
     413             :                                              DDS::InstanceHandle_t a_handle,
     414             :                                              DDS::SampleStateMask sample_states,
     415             :                                              DDS::ViewStateMask view_states,
     416             :                                              DDS::InstanceStateMask instance_states)
     417             :   {
     418             :     DDS::ReturnCode_t const precond =
     419           0 :       check_inputs("take_instance", received_data, info_seq, max_samples);
     420           0 :     if (DDS::RETCODE_OK != precond)
     421             :       {
     422           0 :         return precond;
     423             :       }
     424             : 
     425           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
     426             :                       guard,
     427             :                       sample_lock_,
     428             :                       DDS::RETCODE_ERROR);
     429           0 :     return take_instance_i(received_data, info_seq, max_samples, a_handle,
     430           0 :                            sample_states, view_states, instance_states, 0);
     431           0 :   }
     432             : 
     433           0 :   virtual DDS::ReturnCode_t read_instance_w_condition (
     434             :                                                        MessageSequenceType & received_data,
     435             :                                                        DDS::SampleInfoSeq & info_seq,
     436             :                                                        ::CORBA::Long max_samples,
     437             :                                                        DDS::InstanceHandle_t a_handle,
     438             :                                                        DDS::ReadCondition_ptr a_condition)
     439             :   {
     440             :     DDS::ReturnCode_t const precond =
     441           0 :       check_inputs("read_instance_w_condition", received_data, info_seq,
     442             :                    max_samples);
     443           0 :     if (DDS::RETCODE_OK != precond)
     444             :       {
     445           0 :         return precond;
     446             :       }
     447             : 
     448           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     449             :                       DDS::RETCODE_ERROR);
     450             : 
     451           0 :     if (!has_readcondition(a_condition))
     452             :       {
     453           0 :         return DDS::RETCODE_PRECONDITION_NOT_MET;
     454             :       }
     455             : 
     456             : #ifndef OPENDDS_NO_QUERY_CONDITION
     457           0 :     DDS::QueryCondition_ptr query_condition =
     458           0 :         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
     459             : #endif
     460             : 
     461           0 :     return read_instance_i(received_data, info_seq, max_samples, a_handle,
     462           0 :                            a_condition->get_sample_state_mask(),
     463           0 :                            a_condition->get_view_state_mask(),
     464           0 :                            a_condition->get_instance_state_mask(),
     465             : #ifndef OPENDDS_NO_QUERY_CONDITION
     466             :                            query_condition
     467             : #else
     468             :                            0
     469             : #endif
     470           0 :                            );
     471           0 :   }
     472             : 
     473           0 :   virtual DDS::ReturnCode_t take_instance_w_condition (
     474             :                                                        MessageSequenceType & received_data,
     475             :                                                        DDS::SampleInfoSeq & info_seq,
     476             :                                                        ::CORBA::Long max_samples,
     477             :                                                        DDS::InstanceHandle_t a_handle,
     478             :                                                        DDS::ReadCondition_ptr a_condition)
     479             :   {
     480             :     DDS::ReturnCode_t const precond =
     481           0 :       check_inputs("take_instance_w_condition", received_data, info_seq,
     482             :                    max_samples);
     483           0 :     if (DDS::RETCODE_OK != precond)
     484             :       {
     485           0 :         return precond;
     486             :       }
     487             : 
     488           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     489             :                       DDS::RETCODE_ERROR);
     490             : 
     491           0 :     if (!has_readcondition(a_condition))
     492             :       {
     493           0 :         return DDS::RETCODE_PRECONDITION_NOT_MET;
     494             :       }
     495             : 
     496             : #ifndef OPENDDS_NO_QUERY_CONDITION
     497           0 :     DDS::QueryCondition_ptr query_condition =
     498           0 :         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
     499             : #endif
     500             : 
     501           0 :     return take_instance_i(received_data, info_seq, max_samples, a_handle,
     502           0 :                            a_condition->get_sample_state_mask(),
     503           0 :                            a_condition->get_view_state_mask(),
     504           0 :                            a_condition->get_instance_state_mask(),
     505             : #ifndef OPENDDS_NO_QUERY_CONDITION
     506             :                            query_condition
     507             : #else
     508             :                            0
     509             : #endif
     510           0 :                            );
     511           0 :   }
     512             : 
     513           0 :   virtual DDS::ReturnCode_t read_next_instance (
     514             :                                                   MessageSequenceType & received_data,
     515             :                                                   DDS::SampleInfoSeq & info_seq,
     516             :                                                   ::CORBA::Long max_samples,
     517             :                                                   DDS::InstanceHandle_t a_handle,
     518             :                                                   DDS::SampleStateMask sample_states,
     519             :                                                   DDS::ViewStateMask view_states,
     520             :                                                   DDS::InstanceStateMask instance_states)
     521             :   {
     522             :     DDS::ReturnCode_t const precond =
     523           0 :       check_inputs("read_next_instance", received_data, info_seq, max_samples);
     524           0 :     if (DDS::RETCODE_OK != precond)
     525             :       {
     526           0 :         return precond;
     527             :       }
     528             : 
     529           0 :     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
     530           0 :                                 sample_states, view_states, instance_states, 0);
     531             :   }
     532             : 
     533           0 :   virtual DDS::ReturnCode_t take_next_instance (
     534             :                                                   MessageSequenceType & received_data,
     535             :                                                   DDS::SampleInfoSeq & info_seq,
     536             :                                                   ::CORBA::Long max_samples,
     537             :                                                   DDS::InstanceHandle_t a_handle,
     538             :                                                   DDS::SampleStateMask sample_states,
     539             :                                                   DDS::ViewStateMask view_states,
     540             :                                                   DDS::InstanceStateMask instance_states)
     541             :   {
     542             :     DDS::ReturnCode_t const precond =
     543           0 :       check_inputs("take_next_instance", received_data, info_seq, max_samples);
     544           0 :     if (DDS::RETCODE_OK != precond)
     545             :       {
     546           0 :         return precond;
     547             :       }
     548             : 
     549           0 :     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
     550           0 :                                 sample_states, view_states, instance_states, 0);
     551             :   }
     552             : 
     553           0 :   virtual DDS::ReturnCode_t read_next_instance_w_condition (
     554             :                                                               MessageSequenceType & received_data,
     555             :                                                               DDS::SampleInfoSeq & info_seq,
     556             :                                                               ::CORBA::Long max_samples,
     557             :                                                               DDS::InstanceHandle_t a_handle,
     558             :                                                               DDS::ReadCondition_ptr a_condition)
     559             :   {
     560             :     DDS::ReturnCode_t const precond =
     561           0 :       check_inputs("read_next_instance_w_condition", received_data, info_seq,
     562             :                    max_samples);
     563           0 :     if (DDS::RETCODE_OK != precond)
     564             :       {
     565           0 :         return precond;
     566             :       }
     567             : 
     568           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     569             :                       DDS::RETCODE_ERROR);
     570             : 
     571           0 :     if (!has_readcondition(a_condition))
     572             :       {
     573           0 :         return DDS::RETCODE_PRECONDITION_NOT_MET;
     574             :       }
     575             : 
     576             : #ifndef OPENDDS_NO_QUERY_CONDITION
     577           0 :     DDS::QueryCondition_ptr query_condition =
     578           0 :         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
     579             : #endif
     580             : 
     581           0 :     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
     582           0 :                                 a_condition->get_sample_state_mask(),
     583           0 :                                 a_condition->get_view_state_mask(),
     584           0 :                                 a_condition->get_instance_state_mask(),
     585             : #ifndef OPENDDS_NO_QUERY_CONDITION
     586             :                                 query_condition
     587             : #else
     588             :                                 0
     589             : #endif
     590           0 :                                 );
     591           0 :   }
     592             : 
     593           0 :   virtual DDS::ReturnCode_t take_next_instance_w_condition (
     594             :                                                               MessageSequenceType & received_data,
     595             :                                                               DDS::SampleInfoSeq & info_seq,
     596             :                                                               ::CORBA::Long max_samples,
     597             :                                                               DDS::InstanceHandle_t a_handle,
     598             :                                                               DDS::ReadCondition_ptr a_condition)
     599             :   {
     600             :     DDS::ReturnCode_t const precond =
     601           0 :       check_inputs("take_next_instance_w_condition", received_data, info_seq,
     602             :                    max_samples);
     603           0 :     if (DDS::RETCODE_OK != precond)
     604             :       {
     605           0 :         return precond;
     606             :       }
     607             : 
     608           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     609             :                       DDS::RETCODE_ERROR);
     610             : 
     611           0 :     if (!has_readcondition(a_condition))
     612             :       {
     613           0 :         return DDS::RETCODE_PRECONDITION_NOT_MET;
     614             :       }
     615             : 
     616             : #ifndef OPENDDS_NO_QUERY_CONDITION
     617           0 :     DDS::QueryCondition_ptr query_condition =
     618           0 :         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
     619             : #endif
     620             : 
     621           0 :     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
     622           0 :                                 a_condition->get_sample_state_mask(),
     623           0 :                                 a_condition->get_view_state_mask(),
     624           0 :                                 a_condition->get_instance_state_mask(),
     625             : #ifndef OPENDDS_NO_QUERY_CONDITION
     626             :                                 query_condition
     627             : #else
     628             :                                 0
     629             : #endif
     630           0 :                                 );
     631           0 :   }
     632             : 
     633           0 :   virtual DDS::ReturnCode_t return_loan (
     634             :                                            MessageSequenceType & received_data,
     635             :                                            DDS::SampleInfoSeq & info_seq)
     636             :   {
     637             :     // Some incomplete tests to see that the data and info are from the
     638             :     // same read.
     639           0 :     if (received_data.length() != info_seq.length())
     640             :       {
     641           0 :         return DDS::RETCODE_PRECONDITION_NOT_MET;
     642             :       }
     643             : 
     644           0 :     if (received_data.release())
     645             :       {
     646             :         // nothing to do because this is not zero-copy data
     647           0 :         return DDS::RETCODE_OK;
     648             :       }
     649             :     else
     650             :       {
     651           0 :         info_seq.length(0);
     652           0 :         received_data.length(0);
     653             :       }
     654           0 :     return DDS::RETCODE_OK;
     655             :   }
     656             : 
     657           0 :   virtual DDS::ReturnCode_t get_key_value(MessageType& key_holder,
     658             :                                           DDS::InstanceHandle_t handle)
     659             :   {
     660           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
     661             : 
     662           0 :     const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
     663           0 :     if (pos != reverse_instance_map_.end()) {
     664           0 :       key_holder = pos->second->first;
     665           0 :       return DDS::RETCODE_OK;
     666             :     }
     667             : 
     668           0 :     return DDS::RETCODE_BAD_PARAMETER;
     669           0 :   }
     670             : 
     671           0 :   virtual DDS::InstanceHandle_t lookup_instance(const MessageType& instance_data)
     672             :   {
     673           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
     674             : 
     675           0 :     const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
     676           0 :     if (it != instance_map_.end()) {
     677           0 :       return it->second;
     678             :     }
     679           0 :     return DDS::HANDLE_NIL;
     680           0 :   }
     681             : 
     682           0 :   virtual DDS::ReturnCode_t auto_return_loan(void* seq)
     683             :   {
     684           0 :     MessageSequenceType& received_data =
     685             :       *static_cast< MessageSequenceType*> (seq);
     686             : 
     687           0 :     if (!received_data.release())
     688             :       {
     689             :         // release_loan(received_data);
     690           0 :         received_data.length(0);
     691             :       }
     692           0 :     return DDS::RETCODE_OK;
     693             :   }
     694             : 
     695             :   void release_loan (MessageSequenceType & received_data)
     696             :   {
     697             :     received_data.length(0);
     698             :   }
     699             : 
     700             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     701           0 :   bool contains_sample_filtered(DDS::SampleStateMask sample_states,
     702             :                                 DDS::ViewStateMask view_states,
     703             :                                 DDS::InstanceStateMask instance_states,
     704             :                                 const FilterEvaluator& evaluator,
     705             :                                 const DDS::StringSeq& params)
     706             :   {
     707           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
     708           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_, false);
     709             : 
     710           0 :     TopicDescriptionPtr<TopicImpl> topic(topic_servant_);
     711           0 :     TypeSupport* const ts = topic->get_type_support();
     712           0 :     TypeSupportImpl* const type_support = dynamic_cast<TypeSupportImpl*>(ts);
     713           0 :     const bool filter_has_non_key_fields = type_support ? evaluator.has_non_key_fields(*type_support) : true;
     714             : 
     715           0 :     const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
     716           0 :     for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
     717           0 :       ++next; // pre-increment iterator, in case updates cause changes to match set
     718           0 :       const DDS::InstanceHandle_t handle = *it;
     719           0 :       const SubscriptionInstance_rch inst = get_handle_instance(handle);
     720           0 :       if (!inst) continue;
     721             : 
     722           0 :       for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
     723           0 :            item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
     724           0 :         if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
     725           0 :           continue;
     726             :         }
     727           0 :         if (evaluator.eval(*static_cast<MessageType*>(item->registered_data_), params)) {
     728           0 :           return true;
     729             :         }
     730             :       }
     731             :     }
     732             : 
     733           0 :     return false;
     734           0 :   }
     735             : 
     736           0 :   DDS::ReturnCode_t read_generic(GenericBundle& gen,
     737             :                                  DDS::SampleStateMask sample_states,
     738             :                                  DDS::ViewStateMask view_states,
     739             :                                  DDS::InstanceStateMask instance_states,
     740             :                                  bool adjust_ref_count = false)
     741             :   {
     742           0 :     MessageSequenceType data;
     743             :     DDS::ReturnCode_t rc;
     744             :     {
     745           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
     746           0 :       rc = read_i(data, gen.info_, DDS::LENGTH_UNLIMITED,
     747             :                   sample_states, view_states, instance_states, 0);
     748           0 :       if (adjust_ref_count) {
     749           0 :         typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(data);
     750           0 :         received_data_p.increment_references();
     751             :       }
     752           0 :     }
     753           0 :     gen.samples_.reserve(data.length());
     754           0 :     for (CORBA::ULong i = 0; i < data.length(); ++i) {
     755           0 :       gen.samples_.push_back(&data[i]);
     756             :     }
     757           0 :     return rc;
     758           0 :   }
     759             : 
     760           0 :   DDS::InstanceHandle_t lookup_instance_generic(const void* data)
     761             :   {
     762           0 :     return lookup_instance(*static_cast<const MessageType*>(data));
     763             :   }
     764             : 
     765           0 :   virtual DDS::ReturnCode_t take(AbstractSamples& samples,
     766             :                                  DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     767             :                                  DDS::InstanceStateMask instance_states)
     768             :   {
     769           0 :     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
     770             :                       guard,
     771             :                       sample_lock_,
     772             :                       DDS::RETCODE_ERROR);
     773             : 
     774           0 :     MessageSequenceType data;
     775           0 :     DDS::SampleInfoSeq infos;
     776           0 :     const DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
     777             :                                         sample_states, view_states, instance_states, 0);
     778             : 
     779           0 :     samples.reserve(data.length());
     780             : 
     781           0 :     for (CORBA::ULong i = 0; i < data.length(); ++i) {
     782           0 :       samples.push_back(infos[i], &data[i]);
     783             :     }
     784             : 
     785           0 :     return rc;
     786           0 :   }
     787             : 
     788           0 :   DDS::ReturnCode_t read_instance_generic(void*& data,
     789             :                                           DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
     790             :                                           DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     791             :                                           DDS::InstanceStateMask instance_states)
     792             :   {
     793           0 :     MessageSequenceType dataseq;
     794           0 :     DDS::SampleInfoSeq infoseq;
     795           0 :     const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
     796             :                                                  DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
     797             :                                                  instance_states, 0);
     798           0 :     if (rc != DDS::RETCODE_NO_DATA)
     799             :       {
     800           0 :         const CORBA::ULong last = dataseq.length() - 1;
     801           0 :         data = new MessageType(dataseq[last]);
     802           0 :         info = infoseq[last];
     803             :       }
     804           0 :     return rc;
     805           0 :   }
     806             : 
     807           0 :   DDS::ReturnCode_t read_next_instance_generic(void*& data,
     808             :                                                DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
     809             :                                                DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     810             :                                                DDS::InstanceStateMask instance_states)
     811             :   {
     812           0 :     MessageSequenceType dataseq;
     813           0 :     DDS::SampleInfoSeq infoseq;
     814           0 :     const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
     815             :                                                       DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
     816             :                                                       instance_states, 0);
     817           0 :     if (rc != DDS::RETCODE_NO_DATA)
     818             :       {
     819           0 :         const CORBA::ULong last = dataseq.length() - 1;
     820           0 :         data = new MessageType(dataseq[last]);
     821           0 :         info = infoseq[last];
     822             :       }
     823           0 :     return rc;
     824           0 :   }
     825             : 
     826             : #endif
     827             : 
     828           0 :   DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
     829             :                                              DDS::ViewStateKind view,
     830             :                                              const SystemTimePoint& timestamp = SystemTimePoint::now())
     831             :   {
     832             :     using namespace OpenDDS::DCPS;
     833           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
     834             :                      DDS::HANDLE_NIL);
     835             : #ifndef OPENDDS_NO_MULTI_TOPIC
     836           0 :     DDS::TopicDescription_var descr = get_topicdescription();
     837           0 :     if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
     838           0 :       if (!mt->filter(sample)) {
     839           0 :         return DDS::HANDLE_NIL;
     840             :       }
     841             :     }
     842             : #endif
     843             : 
     844           0 :     get_subscriber_servant()->data_received(this);
     845             : 
     846           0 :     DDS::InstanceHandle_t inst = lookup_instance(sample);
     847           0 :     bool filtered = false;
     848           0 :     SubscriptionInstance_rch instance;
     849             : 
     850           0 :     const DDS::Time_t now = timestamp.to_dds_time();
     851           0 :     DataSampleHeader header;
     852           0 :     header.source_timestamp_sec_ = now.sec;
     853           0 :     header.source_timestamp_nanosec_ = now.nanosec;
     854             : 
     855             :     // Call store_instance_data() once or twice, depending on if we need to
     856             :     // process the INSTANCE_REGISTRATION.  In either case, store_instance_data()
     857             :     // owns the memory for the sample and it must come from the correct allocator.
     858           0 :     for (int i = 0; i < 2; ++i) {
     859           0 :       if (i == 0 && inst != DDS::HANDLE_NIL) continue;
     860             : 
     861           0 :       const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
     862           0 :       header.message_id_ = static_cast<char>(msg);
     863             : 
     864             :       bool just_registered;
     865           0 :       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
     866           0 :       store_instance_data(move(data), DDS::HANDLE_NIL, header, instance, just_registered, filtered);
     867           0 :       if (instance) inst = instance->instance_handle_;
     868             :     }
     869             : 
     870           0 :     if (!filtered) {
     871           0 :       if (view == DDS::NOT_NEW_VIEW_STATE) {
     872           0 :         if (instance) instance->instance_state_->accessed();
     873             :       }
     874           0 :       notify_read_conditions();
     875             :     }
     876             : 
     877           0 :     const ValueDispatcher* vd = get_value_dispatcher();
     878           0 :     const Observer_rch observer = get_observer(Observer::e_SAMPLE_RECEIVED);
     879           0 :     if (observer && vd) {
     880           0 :       Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, header.instance_state(), now, header.sequence_, &sample, *vd);
     881           0 :       observer->on_sample_received(this, s);
     882             :     }
     883             : 
     884           0 :     return inst;
     885           0 :   }
     886             : 
     887           0 :   void set_instance_state_i(DDS::InstanceHandle_t instance,
     888             :                             DDS::InstanceHandle_t publication_handle,
     889             :                             DDS::InstanceStateKind state,
     890             :                             const SystemTimePoint& timestamp,
     891             :                             const GUID_t& publication_id)
     892             :   {
     893             :     // sample_lock_ must be held.
     894             :     using namespace OpenDDS::DCPS;
     895             : 
     896           0 :     SubscriptionInstance_rch si = get_handle_instance(instance);
     897           0 :     if (si && state != DDS::ALIVE_INSTANCE_STATE) {
     898           0 :       const DDS::Time_t now = timestamp.to_dds_time();
     899           0 :       DataSampleHeader header;
     900           0 :       header.publication_id_ = publication_id;
     901           0 :       header.source_timestamp_sec_ = now.sec;
     902           0 :       header.source_timestamp_nanosec_ = now.nanosec;
     903           0 :       const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
     904           0 :         ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
     905           0 :       header.message_id_ = static_cast<char>(msg);
     906             :       bool just_registered, filtered;
     907           0 :       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
     908           0 :       get_key_value(*data, instance);
     909           0 :       store_instance_data(move(data), publication_handle, header, si, just_registered, filtered);
     910           0 :       if (!filtered) {
     911           0 :         notify_read_conditions();
     912             :       }
     913           0 :     }
     914           0 :   }
     915             : 
     916           0 :   virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
     917             :                                OpenDDS::DCPS::SubscriptionInstance_rch& instance)
     918             :   {
     919             :     //!!! caller should already have the sample_lock_
     920           0 :     const bool encapsulated = sample.header_.cdr_encapsulation_;
     921           0 :     Message_Block_Ptr payload(sample.data(&mb_alloc_));
     922           0 :     OpenDDS::DCPS::Serializer ser(
     923             :       payload.get(),
     924             :       encapsulated ? Encoding::KIND_XCDR1 : Encoding::KIND_UNALIGNED_CDR,
     925           0 :       static_cast<Endianness>(sample.header_.byte_order_));
     926             : 
     927           0 :     if (encapsulated) {
     928           0 :       EncapsulationHeader encap;
     929           0 :       if (!(ser >> encap)) {
     930           0 :         if (DCPS_debug_level > 0) {
     931           0 :           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
     932             :             ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
     933             :             ACE_TEXT("deserialization of encapsulation header failed.\n"),
     934             :             TraitsType::type_name()));
     935             :         }
     936           0 :         return;
     937             :       }
     938           0 :       Encoding encoding;
     939           0 :       if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
     940           0 :         return;
     941             :       }
     942             : 
     943           0 :       if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
     944           0 :         if (DCPS_debug_level >= 1) {
     945           0 :           ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
     946             :             ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
     947             :             ACE_TEXT("Encoding kind of the received sample (%C) does not ")
     948             :             ACE_TEXT("match the ones specified by DataReader.\n"),
     949             :             TraitsType::type_name(),
     950             :             Encoding::kind_to_string(encoding.kind()).c_str()));
     951             :         }
     952           0 :         return;
     953             :       }
     954           0 :       if (DCPS_debug_level >= 8) {
     955           0 :         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
     956             :           ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
     957             :           ACE_TEXT("Deserializing with encoding kind %C.\n"),
     958             :           TraitsType::type_name(),
     959             :           Encoding::kind_to_string(encoding.kind()).c_str()));
     960             :       }
     961             : 
     962           0 :       ser.encoding(encoding);
     963             :     }
     964             : 
     965           0 :     bool ser_ret = true;
     966           0 :     MessageType data;
     967           0 :     if (sample.header_.key_fields_only_) {
     968           0 :       ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(data);
     969             :     } else {
     970           0 :       ser_ret = ser >> data;
     971             :     }
     972           0 :     if (!ser_ret) {
     973           0 :       if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
     974           0 :         if (DCPS_debug_level > 1) {
     975           0 :           ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
     976             :                      ACE_TEXT("object construction failure, dropping sample.\n"),
     977             :                      TraitsType::type_name()));
     978             :         }
     979             :       } else {
     980           0 :         if (DCPS_debug_level > 0) {
     981           0 :           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
     982             :                     ACE_TEXT("deserialization failed.\n"),
     983             :                     TraitsType::type_name()));
     984             :         }
     985             :       }
     986           0 :       return;
     987             :     }
     988             : 
     989           0 :     DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
     990           0 :     typename InstanceMap::const_iterator const it = instance_map_.find(data);
     991           0 :     if (it != instance_map_.end()) {
     992           0 :       handle = it->second;
     993             :     }
     994             : 
     995           0 :     if (handle == DDS::HANDLE_NIL) {
     996           0 :       instance.reset();
     997             :     } else {
     998           0 :       instance = get_handle_instance(handle);
     999             :     }
    1000           0 :   }
    1001             : 
    1002           0 :   virtual void qos_change(const DDS::DataReaderQos& qos)
    1003             :   {
    1004             :     // reliability is not changeable, just time_based_filter
    1005           0 :     if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
    1006           0 :       if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
    1007           0 :         const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
    1008           0 :         if (qos_.time_based_filter.minimum_separation != zero) {
    1009           0 :           if (qos.time_based_filter.minimum_separation != zero) {
    1010           0 :             const MonotonicTimePoint now = MonotonicTimePoint::now();
    1011           0 :             const TimeDuration interval(qos_.time_based_filter.minimum_separation);
    1012           0 :             FilterDelayedSampleQueue queue;
    1013             : 
    1014           0 :             ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    1015           0 :             for (typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
    1016           0 :               FilterDelayedSample& sample = pos->second;
    1017           0 :               sample.expiration_time = now + (interval - (sample.expiration_time - now));
    1018           0 :               queue.insert(std::make_pair(sample.expiration_time, pos->first));
    1019             :             }
    1020           0 :             std::swap(queue, filter_delayed_sample_queue_);
    1021             : 
    1022           0 :             if (!filter_delayed_sample_queue_.empty()) {
    1023           0 :               filter_delayed_sample_task_->cancel();
    1024           0 :               filter_delayed_sample_task_->schedule(interval);
    1025             :             }
    1026             : 
    1027           0 :           } else {
    1028           0 :             filter_delayed_sample_task_->cancel();
    1029           0 :             ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    1030           0 :             filter_delayed_sample_map_.clear();
    1031           0 :             filter_delayed_sample_queue_.clear();
    1032           0 :           }
    1033             :         }
    1034             :         // else no existing timers to change/cancel
    1035             :       }
    1036             :       // else no qos change so nothing to change
    1037             :     }
    1038             : 
    1039           0 :     DataReaderImpl::qos_change(qos);
    1040             :   }
    1041             : 
    1042             :   void set_marshal_skip_serialize(bool value)
    1043             :   {
    1044             :     marshal_skip_serialize_ = value;
    1045             :   }
    1046             : 
    1047             :   bool get_marshal_skip_serialize() const
    1048             :   {
    1049             :     return marshal_skip_serialize_;
    1050             :   }
    1051             : 
    1052           0 :   void release_all_instances()
    1053             :   {
    1054           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    1055             : 
    1056           0 :     const typename InstanceMap::iterator end = instance_map_.end();
    1057           0 :     typename InstanceMap::iterator it = instance_map_.begin();
    1058           0 :     while (it != end) {
    1059           0 :       const DDS::InstanceHandle_t handle = it->second;
    1060           0 :       ++it; // it will be invalid, so iterate now.
    1061           0 :       release_instance(handle);
    1062             :     }
    1063           0 :   }
    1064             : 
    1065             : protected:
    1066             : 
    1067           0 :   virtual RcHandle<MessageHolder> dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample& sample,
    1068             :                                                 DDS::InstanceHandle_t publication_handle,
    1069             :                                                 OpenDDS::DCPS::SubscriptionInstance_rch& instance,
    1070             :                                                 bool& just_registered,
    1071             :                                                 bool& filtered,
    1072             :                                                 OpenDDS::DCPS::MarshalingType marshaling_type,
    1073             :                                                 bool full_copy)
    1074             :   {
    1075           0 :     unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
    1076           0 :     dynamic_hook(*data);
    1077           0 :     RcHandle<MessageHolder> message_holder;
    1078             : 
    1079           0 :     Message_Block_Ptr payload(sample.data(&mb_alloc_));
    1080           0 :     if (marshal_skip_serialize_) {
    1081           0 :       if (!MarshalTraitsType::from_message_block(*data, *payload)) {
    1082           0 :         if (DCPS_debug_level > 0) {
    1083           0 :           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::dds_demarshal: ")
    1084             :                     ACE_TEXT("attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
    1085             :         }
    1086           0 :         return message_holder;
    1087             :       }
    1088           0 :       store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
    1089           0 :       return message_holder;
    1090             :     }
    1091           0 :     const bool encapsulated = sample.header_.cdr_encapsulation_;
    1092             : 
    1093           0 :     OpenDDS::DCPS::Serializer ser(
    1094             :       payload.get(),
    1095             :       encapsulated ? Encoding::KIND_XCDR1 : Encoding::KIND_UNALIGNED_CDR,
    1096           0 :       static_cast<Endianness>(sample.header_.byte_order_));
    1097             : 
    1098           0 :     if (encapsulated) {
    1099           0 :       EncapsulationHeader encap;
    1100           0 :       if (!(ser >> encap)) {
    1101           0 :         if (DCPS_debug_level > 0) {
    1102           0 :           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
    1103             :             ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
    1104             :             ACE_TEXT("deserialization of encapsulation header failed.\n"),
    1105             :             TraitsType::type_name()));
    1106             :         }
    1107           0 :         return message_holder;
    1108             :       }
    1109           0 :       Encoding encoding;
    1110           0 :       if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
    1111           0 :         return message_holder;
    1112             :       }
    1113             : 
    1114           0 :       if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
    1115           0 :         if (DCPS_debug_level >= 1) {
    1116           0 :           ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
    1117             :             ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
    1118             :             ACE_TEXT("Encoding kind %C of the received sample does not ")
    1119             :             ACE_TEXT("match the ones specified by DataReader.\n"),
    1120             :             TraitsType::type_name(),
    1121             :             Encoding::kind_to_string(encoding.kind()).c_str()));
    1122             :         }
    1123           0 :         return message_holder;
    1124             :       }
    1125           0 :       if (DCPS_debug_level >= 8) {
    1126           0 :         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
    1127             :           ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
    1128             :           ACE_TEXT("Deserializing with encoding kind %C.\n"),
    1129             :           TraitsType::type_name(),
    1130             :           Encoding::kind_to_string(encoding.kind()).c_str()));
    1131             :       }
    1132             : 
    1133           0 :       ser.encoding(encoding);
    1134             :     }
    1135             : 
    1136           0 :     const bool key_only_marshaling =
    1137             :       marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING;
    1138             : 
    1139           0 :     bool ser_ret = true;
    1140           0 :     if (key_only_marshaling) {
    1141           0 :       ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(*data);
    1142             :     } else {
    1143           0 :       ser_ret = ser >> *data;
    1144           0 :       if (full_copy) {
    1145           0 :         message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
    1146             :       }
    1147             :     }
    1148           0 :     if (!ser_ret) {
    1149           0 :       if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
    1150           0 :         if (DCPS_debug_level > 1) {
    1151           0 :           ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
    1152             :                      ACE_TEXT("object construction failure, dropping sample.\n"),
    1153             :                      TraitsType::type_name()));
    1154             :         }
    1155             :       } else {
    1156           0 :         if (DCPS_debug_level > 0) {
    1157           0 :           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR %CDataReaderImpl::dds_demarshal ")
    1158             :                     ACE_TEXT("deserialization failed, dropping sample.\n"),
    1159             :                     TraitsType::type_name()));
    1160             :         }
    1161             :       }
    1162           0 :       return message_holder;
    1163             :     }
    1164             : 
    1165             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1166             :     /*
    1167             :      * If sample.header_.content_filter_ is true, the writer has already
    1168             :      * filtered.
    1169             :      */
    1170           0 :     if (!sample.header_.content_filter_) {
    1171           0 :       ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
    1172           0 :       if (content_filtered_topic_) {
    1173           0 :         const bool sample_only_has_key_fields = !sample.header_.valid_data();
    1174           0 :         if (key_only_marshaling != sample_only_has_key_fields) {
    1175           0 :           if (DCPS_debug_level > 0) {
    1176           0 :             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
    1177             :               ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
    1178             :               ACE_TEXT("Mismatch between the key only and valid data properties ")
    1179             :               ACE_TEXT("of a %C message of a content filtered topic!\n"),
    1180             :               TraitsType::type_name(),
    1181             :               to_string(static_cast<MessageId>(sample.header_.message_id_))));
    1182             :           }
    1183           0 :           filtered = true;
    1184           0 :           message_holder.reset();
    1185           0 :           return message_holder;
    1186             :         }
    1187           0 :         const MessageType& type = static_cast<MessageType&>(*data);
    1188           0 :         if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
    1189           0 :           filtered = true;
    1190           0 :           message_holder.reset();
    1191           0 :           return message_holder;
    1192             :         }
    1193             :       }
    1194           0 :     }
    1195             : #endif
    1196             : 
    1197           0 :     store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
    1198           0 :     return message_holder;
    1199           0 :   }
    1200             : 
    1201           0 :   virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample& sample,
    1202             :                                   DDS::InstanceHandle_t publication_handle,
    1203             :                                   OpenDDS::DCPS::SubscriptionInstance_rch& instance)
    1204             :   {
    1205             :     //!!! caller should already have the sample_lock_
    1206             : 
    1207             :     // The data sample in this dispose message does not contain any valid data.
    1208             :     // What it needs here is the key value to identify the instance to dispose.
    1209             :     // The demarshal push this "sample" to received sample list so the user
    1210             :     // can be notified the dispose event.
    1211           0 :     bool just_registered = false;
    1212           0 :     bool filtered = false;
    1213           0 :     OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
    1214           0 :     if (sample.header_.key_fields_only_) {
    1215           0 :       marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
    1216             :     }
    1217           0 :     dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling, false);
    1218           0 :   }
    1219             : 
    1220           0 :   virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
    1221             :   {
    1222           0 :     drop_sample(instance->instance_handle_);
    1223             : 
    1224             : 
    1225           0 :     instance->instance_state_->cancel_release();
    1226             : 
    1227           0 :     while (instance->rcvd_samples_.size() > 0)
    1228             :       {
    1229             :         OpenDDS::DCPS::ReceivedDataElement* head =
    1230           0 :           instance->rcvd_samples_.remove_head();
    1231           0 :         head->dec_ref();
    1232             :       }
    1233           0 :   }
    1234             : 
    1235           0 :   virtual void release_instance_i(DDS::InstanceHandle_t handle)
    1236             :   {
    1237           0 :     const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
    1238           0 :     if (pos != reverse_instance_map_.end()) {
    1239           0 :       remove_from_lookup_maps(handle);
    1240           0 :       instance_map_.erase(pos->second);
    1241           0 :       reverse_instance_map_.erase(pos);
    1242             :     }
    1243           0 :   }
    1244             : 
    1245           0 :   virtual void state_updated_i(DDS::InstanceHandle_t handle)
    1246             :   {
    1247           0 :     const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
    1248           0 :     if (pos != instances_.end()) {
    1249           0 :       update_lookup_maps(pos);
    1250             :     }
    1251           0 :   }
    1252             : 
    1253             : private:
    1254             : 
    1255             :   /// Available for specialization so that some types of MessageType can observe and
    1256             :   /// change the sample before dds_demarshal deserializes into it
    1257           0 :   void dynamic_hook(MessageType&) {}
    1258             : 
    1259           0 :   bool store_instance_data_check(unique_ptr<MessageTypeWithAllocator>& instance_data,
    1260             :                                  DDS::InstanceHandle_t publication_handle,
    1261             :                                  const OpenDDS::DCPS::DataSampleHeader& header,
    1262             :                                  OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr)
    1263             :   {
    1264             : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
    1265           0 :     const bool is_dispose_msg =
    1266           0 :       header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
    1267           0 :       header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
    1268             : 
    1269           0 :     if (!is_bit() && security_config_) {
    1270           0 :       if (header.message_id_ == SAMPLE_DATA ||
    1271           0 :           header.message_id_ == INSTANCE_REGISTRATION) {
    1272             : 
    1273             :         // Pubulisher has already gone through the check.
    1274           0 :         if (instance_ptr &&
    1275           0 :             instance_ptr->instance_state_ &&
    1276           0 :             instance_ptr->instance_state_->writes_instance(header.publication_id_)) {
    1277           0 :           return true;
    1278             :         }
    1279             : 
    1280           0 :         DDS::Security::SecurityException ex;
    1281           0 :         const GUID_t local_participant = make_part_guid(get_guid());
    1282           0 :         const GUID_t remote_participant = make_part_guid(header.publication_id_);
    1283           0 :         const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
    1284             :         // Construct a DynamicData around the deserialized sample.
    1285           0 :         DDS::DynamicData_var dda =
    1286           0 :           XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
    1287             :         // The remote participant might not be using security.
    1288           0 :         if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
    1289           0 :             !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
    1290           0 :           if (log_level >= LogLevel::Warning) {
    1291           0 :             ACE_ERROR((LM_WARNING,
    1292             :                        "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
    1293             :                        ex.code, ex.minor_code, ex.message.in()));
    1294             :           }
    1295           0 :           return false;
    1296             :         }
    1297           0 :       } else if (is_dispose_msg) {
    1298             : 
    1299           0 :         DDS::Security::SecurityException ex;
    1300           0 :         const GUID_t local_participant = make_part_guid(get_guid());
    1301           0 :         const GUID_t remote_participant = make_part_guid(header.publication_id_);
    1302           0 :         const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
    1303             :         // Construct a DynamicData around the deserialized sample.
    1304           0 :         DDS::DynamicData_var dda =
    1305           0 :           XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
    1306             :         // The remote participant might not be using security.
    1307           0 :         if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
    1308           0 :             !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
    1309           0 :           if (log_level >= LogLevel::Warning) {
    1310           0 :             ACE_ERROR((LM_WARNING,
    1311             :                        "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
    1312             :                        ex.code, ex.minor_code, ex.message.in()));
    1313             :           }
    1314           0 :           return false;
    1315             :         }
    1316           0 :       }
    1317             :     }
    1318             : #else
    1319             :     ACE_UNUSED_ARG(instance_data);
    1320             :     ACE_UNUSED_ARG(publication_handle);
    1321             :     ACE_UNUSED_ARG(header);
    1322             :     ACE_UNUSED_ARG(instance_ptr);
    1323             : #endif
    1324             : 
    1325           0 :     return true;
    1326             :   }
    1327             : 
    1328           0 :   DDS::ReturnCode_t read_i(MessageSequenceType& received_data,
    1329             :                            DDS::SampleInfoSeq& info_seq,
    1330             :                            CORBA::Long max_samples,
    1331             :                            DDS::SampleStateMask sample_states,
    1332             :                            DDS::ViewStateMask view_states,
    1333             :                            DDS::InstanceStateMask instance_states,
    1334             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1335             :                            DDS::QueryCondition_ptr a_condition)
    1336             : #else
    1337             :     int)
    1338             : #endif
    1339             : {
    1340             : 
    1341           0 :   typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
    1342             : 
    1343             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1344           0 :   if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
    1345           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1346             :   }
    1347             : 
    1348           0 :   const bool group_coherent_ordered =
    1349           0 :     subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
    1350           0 :     && subqos_.presentation.coherent_access
    1351           0 :     && subqos_.presentation.ordered_access;
    1352             : 
    1353           0 :   if (group_coherent_ordered && coherent_) {
    1354           0 :     max_samples = 1;
    1355             :   }
    1356             : #endif
    1357             : 
    1358           0 :   RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
    1359             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1360             :                                    a_condition,
    1361             : #endif
    1362             :                                    DDS_OPERATION_READ);
    1363             : 
    1364           0 :   const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
    1365             : 
    1366             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1367           0 :   if (!group_coherent_ordered) {
    1368             : #endif
    1369           0 :     const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
    1370           0 :     for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
    1371           0 :       ++next; // pre-increment iterator, in case updates cause changes to match set
    1372           0 :       const DDS::InstanceHandle_t handle = *it;
    1373           0 :       const SubscriptionInstance_rch inst = get_handle_instance(handle);
    1374           0 :       if (!inst) continue;
    1375             : 
    1376           0 :       size_t i(0);
    1377           0 :       for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
    1378           0 :            item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
    1379           0 :         results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
    1380             : 
    1381           0 :         const ValueDispatcher* vd = get_value_dispatcher();
    1382           0 :         if (observer && item->registered_data_ && vd) {
    1383           0 :           Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
    1384           0 :           observer->on_sample_read(this, s);
    1385             :         }
    1386             :       }
    1387             :     }
    1388             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1389             :   } else {
    1390           0 :     const RakeData item = group_coherent_ordered_data_.get_data();
    1391           0 :     results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
    1392           0 :     const ValueDispatcher* vd = get_value_dispatcher();
    1393           0 :     if (observer && item.rde_->registered_data_ && vd) {
    1394           0 :       typename InstanceMap::iterator i = instance_map_.begin();
    1395           0 :       const DDS::InstanceHandle_t handle = (i != instance_map_.end()) ? i->second : DDS::HANDLE_NIL;
    1396           0 :       Observer::Sample s(handle, item.si_->instance_state_->instance_state(), *item.rde_, *vd);
    1397           0 :       observer->on_sample_read(this, s);
    1398             :     }
    1399           0 :   }
    1400             : #endif
    1401             : 
    1402           0 :   results.copy_to_user();
    1403             : 
    1404           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
    1405           0 :   if (received_data.length()) {
    1406           0 :     ret = DDS::RETCODE_OK;
    1407           0 :     if (received_data.maximum() == 0) { // using ZeroCopy
    1408           0 :       received_data_p.set_loaner(this);
    1409             :     }
    1410             :   }
    1411             : 
    1412           0 :   post_read_or_take();
    1413           0 :   return ret;
    1414           0 : }
    1415             : 
    1416           0 : DDS::ReturnCode_t take_i(MessageSequenceType& received_data,
    1417             :                          DDS::SampleInfoSeq& info_seq,
    1418             :                          CORBA::Long max_samples,
    1419             :                          DDS::SampleStateMask sample_states,
    1420             :                          DDS::ViewStateMask view_states,
    1421             :                          DDS::InstanceStateMask instance_states,
    1422             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1423             :                          DDS::QueryCondition_ptr a_condition)
    1424             : #else
    1425             :   int)
    1426             : #endif
    1427             : {
    1428           0 :   typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
    1429             : 
    1430             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1431           0 :   if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
    1432           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1433             :   }
    1434             : 
    1435           0 :   const bool group_coherent_ordered =
    1436           0 :     subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
    1437           0 :     && subqos_.presentation.coherent_access
    1438           0 :     && subqos_.presentation.ordered_access;
    1439             : 
    1440           0 :   if (group_coherent_ordered && coherent_) {
    1441           0 :     max_samples = 1;
    1442             :   }
    1443             : #endif
    1444             : 
    1445           0 :   RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
    1446             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1447             :                                    a_condition,
    1448             : #endif
    1449             :                                    DDS_OPERATION_TAKE);
    1450             : 
    1451           0 :   const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
    1452             : 
    1453             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1454           0 :   if (!group_coherent_ordered) {
    1455             : #endif
    1456           0 :     const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
    1457           0 :     for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
    1458           0 :       ++next; // pre-increment iterator, in case updates cause changes to match set
    1459           0 :       const DDS::InstanceHandle_t handle = *it;
    1460           0 :       const SubscriptionInstance_rch inst = get_handle_instance(handle);
    1461           0 :       if (!inst) continue;
    1462             : 
    1463           0 :       size_t i(0);
    1464           0 :       for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
    1465           0 :            item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
    1466           0 :         results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
    1467             : 
    1468           0 :         const ValueDispatcher* vd = get_value_dispatcher();
    1469           0 :         if (observer && item->registered_data_ && vd) {
    1470           0 :           Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
    1471           0 :           observer->on_sample_taken(this, s);
    1472             :         }
    1473             :       }
    1474             :     }
    1475             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1476             :   } else {
    1477           0 :     const RakeData item = group_coherent_ordered_data_.get_data();
    1478           0 :     results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
    1479           0 :   }
    1480             : #endif
    1481             : 
    1482           0 :   results.copy_to_user();
    1483             : 
    1484           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
    1485           0 :   if (received_data.length()) {
    1486           0 :     ret = DDS::RETCODE_OK;
    1487           0 :     if (received_data.maximum() == 0) { // using ZeroCopy
    1488           0 :       received_data_p.set_loaner(this);
    1489             :     }
    1490             :   }
    1491             : 
    1492           0 :   post_read_or_take();
    1493           0 :   return ret;
    1494           0 : }
    1495             : 
    1496           0 : DDS::ReturnCode_t read_instance_i(MessageSequenceType& received_data,
    1497             :                                   DDS::SampleInfoSeq& info_seq,
    1498             :                                   CORBA::Long max_samples,
    1499             :                                   DDS::InstanceHandle_t a_handle,
    1500             :                                   DDS::SampleStateMask sample_states,
    1501             :                                   DDS::ViewStateMask view_states,
    1502             :                                   DDS::InstanceStateMask instance_states,
    1503             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1504             :                                   DDS::QueryCondition_ptr a_condition)
    1505             : #else
    1506             :   int)
    1507             : #endif
    1508             : {
    1509           0 :   const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
    1510           0 :   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
    1511             : 
    1512           0 :   typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
    1513             : 
    1514           0 :   RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
    1515             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1516             :                                    a_condition,
    1517             : #endif
    1518             :                                    DDS_OPERATION_READ);
    1519             : 
    1520           0 :   const InstanceState_rch state_obj = inst->instance_state_;
    1521           0 :   if (state_obj->match(view_states, instance_states)) {
    1522           0 :     const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
    1523           0 :     size_t i(0);
    1524           0 :     for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
    1525           0 :          item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
    1526           0 :       results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
    1527           0 :       const ValueDispatcher* vd = get_value_dispatcher();
    1528           0 :       if (observer && item->registered_data_ && vd) {
    1529           0 :         Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
    1530           0 :         observer->on_sample_read(this, s);
    1531             :       }
    1532             :     }
    1533           0 :   } else if (DCPS_debug_level >= 8) {
    1534           0 :     OPENDDS_STRING msg;
    1535           0 :     if ((state_obj->view_state() & view_states) == 0) {
    1536           0 :       msg = "view state is not valid";
    1537             :     }
    1538           0 :     if ((state_obj->instance_state() & instance_states) == 0) {
    1539           0 :       if (!msg.empty()) msg += " and ";
    1540           0 :       msg += "instance state is ";
    1541           0 :       msg += state_obj->instance_state_string();
    1542           0 :       msg += " while the validity mask is " + InstanceState::instance_state_mask_string(instance_states);
    1543             :     }
    1544           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl_T::read_instance_i: ")
    1545             :                ACE_TEXT("will return no data reading sub %C because:\n  %C\n"),
    1546             :                LogGuid(get_guid()).c_str(), msg.c_str()));
    1547           0 :   }
    1548             : 
    1549           0 :   results.copy_to_user();
    1550             : 
    1551           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
    1552           0 :   if (received_data.length()) {
    1553           0 :     ret = DDS::RETCODE_OK;
    1554           0 :     if (received_data.maximum() == 0) { // using ZeroCopy
    1555           0 :       received_data_p.set_loaner(this);
    1556             :     }
    1557             :   }
    1558             : 
    1559           0 :   post_read_or_take();
    1560           0 :   return ret;
    1561           0 : }
    1562             : 
    1563           0 : DDS::ReturnCode_t take_instance_i(MessageSequenceType& received_data,
    1564             :                                   DDS::SampleInfoSeq& info_seq,
    1565             :                                   CORBA::Long max_samples,
    1566             :                                   DDS::InstanceHandle_t a_handle,
    1567             :                                   DDS::SampleStateMask sample_states,
    1568             :                                   DDS::ViewStateMask view_states,
    1569             :                                   DDS::InstanceStateMask instance_states,
    1570             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1571             :                                   DDS::QueryCondition_ptr a_condition)
    1572             : #else
    1573             :   int)
    1574             : #endif
    1575             : {
    1576           0 :   const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
    1577           0 :   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
    1578             : 
    1579           0 :   typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
    1580             : 
    1581           0 :   RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
    1582             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1583             :                                    a_condition,
    1584             : #endif
    1585             :                                    DDS_OPERATION_TAKE);
    1586             : 
    1587           0 :   const InstanceState_rch state_obj = inst->instance_state_;
    1588           0 :   if (state_obj->match(view_states, instance_states)) {
    1589           0 :     const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
    1590           0 :     size_t i(0);
    1591           0 :     for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
    1592           0 :          item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
    1593           0 :       results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
    1594           0 :       const ValueDispatcher* vd = get_value_dispatcher();
    1595           0 :       if (observer && item->registered_data_ && vd) {
    1596           0 :         Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
    1597           0 :         observer->on_sample_taken(this, s);
    1598             :       }
    1599             :     }
    1600           0 :   }
    1601             : 
    1602           0 :   results.copy_to_user();
    1603             : 
    1604           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
    1605           0 :   if (received_data.length()) {
    1606           0 :     ret = DDS::RETCODE_OK;
    1607           0 :     if (received_data.maximum() == 0) { // using ZeroCopy
    1608           0 :       received_data_p.set_loaner(this);
    1609             :     }
    1610             :   }
    1611             : 
    1612           0 :   post_read_or_take();
    1613           0 :   return ret;
    1614           0 : }
    1615             : 
    1616           0 : DDS::ReturnCode_t read_next_instance_i(MessageSequenceType& received_data,
    1617             :                                        DDS::SampleInfoSeq& info_seq,
    1618             :                                        CORBA::Long max_samples,
    1619             :                                        DDS::InstanceHandle_t a_handle,
    1620             :                                        DDS::SampleStateMask sample_states,
    1621             :                                        DDS::ViewStateMask view_states,
    1622             :                                        DDS::InstanceStateMask instance_states,
    1623             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1624             :                                        DDS::QueryCondition_ptr a_condition)
    1625             : #else
    1626             :   int)
    1627             : #endif
    1628             : {
    1629           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
    1630             : 
    1631           0 :   typename InstanceMap::iterator it = instance_map_.begin();
    1632           0 :   const typename InstanceMap::iterator the_end = instance_map_.end();
    1633           0 :   if (a_handle != DDS::HANDLE_NIL) {
    1634           0 :     const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
    1635           0 :     if (pos != reverse_instance_map_.end()) {
    1636           0 :       it = pos->second;
    1637           0 :       ++it;
    1638             :     } else {
    1639           0 :       it = the_end;
    1640             :     }
    1641             :   }
    1642             : 
    1643           0 :   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
    1644           0 :   for (; it != the_end; ++it) {
    1645           0 :     handle = it->second;
    1646             :     const DDS::ReturnCode_t status =
    1647           0 :       read_instance_i(received_data, info_seq, max_samples, handle,
    1648             :                       sample_states, view_states, instance_states,
    1649             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1650             :                       a_condition);
    1651             : #else
    1652             :       0);
    1653             : #endif
    1654           0 :     if (status != DDS::RETCODE_NO_DATA) {
    1655           0 :       post_read_or_take();
    1656           0 :       return status;
    1657             :     }
    1658             :   }
    1659             : 
    1660           0 :   post_read_or_take();
    1661           0 :   return DDS::RETCODE_NO_DATA;
    1662           0 : }
    1663             : 
    1664           0 : DDS::ReturnCode_t take_next_instance_i(MessageSequenceType& received_data,
    1665             :                                        DDS::SampleInfoSeq& info_seq,
    1666             :                                        CORBA::Long max_samples,
    1667             :                                        DDS::InstanceHandle_t a_handle,
    1668             :                                        DDS::SampleStateMask sample_states,
    1669             :                                        DDS::ViewStateMask view_states,
    1670             :                                        DDS::InstanceStateMask instance_states,
    1671             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1672             :                                        DDS::QueryCondition_ptr a_condition)
    1673             : #else
    1674             :   int)
    1675             : #endif
    1676             : {
    1677           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
    1678             : 
    1679           0 :   typename InstanceMap::iterator it = instance_map_.begin();
    1680           0 :   const typename InstanceMap::iterator the_end = instance_map_.end();
    1681           0 :   if (a_handle != DDS::HANDLE_NIL) {
    1682           0 :     const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
    1683           0 :     if (pos != reverse_instance_map_.end()) {
    1684           0 :       it = pos->second;
    1685           0 :       ++it;
    1686             :     } else {
    1687           0 :       it = the_end;
    1688             :     }
    1689             :   }
    1690             : 
    1691           0 :   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
    1692           0 :   for (; it != the_end; ++it) {
    1693           0 :     handle = it->second;
    1694             :     const DDS::ReturnCode_t status =
    1695           0 :       take_instance_i(received_data, info_seq, max_samples, handle,
    1696             :                       sample_states, view_states, instance_states,
    1697             : #ifndef OPENDDS_NO_QUERY_CONDITION
    1698             :                       a_condition);
    1699             : #else
    1700             :       0);
    1701             : #endif
    1702           0 :     if (status != DDS::RETCODE_NO_DATA) {
    1703           0 :       total_samples();  // see if we are empty
    1704           0 :       post_read_or_take();
    1705           0 :       return status;
    1706             :     }
    1707             :   }
    1708             : 
    1709           0 :   post_read_or_take();
    1710           0 :   return DDS::RETCODE_NO_DATA;
    1711           0 : }
    1712             : 
    1713           0 : void store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data,
    1714             :                          DDS::InstanceHandle_t publication_handle,
    1715             :                          const OpenDDS::DCPS::DataSampleHeader& header,
    1716             :                          OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr,
    1717             :                          bool& just_registered,
    1718             :                          bool& filtered)
    1719             : {
    1720             :   ACE_UNUSED_ARG(publication_handle);
    1721             : 
    1722           0 :   const bool is_dispose_msg =
    1723           0 :     header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
    1724           0 :     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
    1725           0 :   const bool is_unregister_msg =
    1726           0 :     header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
    1727           0 :     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
    1728             : 
    1729           0 :   if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
    1730           0 :     return;
    1731             :   }
    1732             : 
    1733             :   // not filtering any data, except what is specifically identified as filtered below
    1734           0 :   filtered = false;
    1735             : 
    1736           0 :   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
    1737             : 
    1738             :   //!!! caller should already have the sample_lock_
    1739             :   //We will unlock it before calling into listeners
    1740             : 
    1741           0 :   typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
    1742             : 
    1743           0 :   if (it == instance_map_.end()) {
    1744           0 :     if (is_dispose_msg || is_unregister_msg) {
    1745           0 :       return;
    1746             :     }
    1747             : 
    1748           0 :     std::size_t instances_size = 0;
    1749             :     {
    1750           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1751           0 :       instances_size = instances_.size();
    1752           0 :     }
    1753           0 :     if ((qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
    1754           0 :       ((::CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
    1755             :     {
    1756           0 :       DDS::DataReaderListener_var listener
    1757             :         = listener_for (DDS::SAMPLE_REJECTED_STATUS);
    1758             : 
    1759           0 :       set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
    1760             : 
    1761           0 :       sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
    1762           0 :       ++sample_rejected_status_.total_count;
    1763           0 :       ++sample_rejected_status_.total_count_change;
    1764           0 :       sample_rejected_status_.last_instance_handle = handle;
    1765             : 
    1766           0 :       if (!CORBA::is_nil(listener.in()))
    1767             :       {
    1768           0 :         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    1769             : 
    1770           0 :         listener->on_sample_rejected(this, sample_rejected_status_);
    1771           0 :         sample_rejected_status_.total_count_change = 0;
    1772           0 :       }  // do we want to do something if listener is nil???
    1773           0 :       notify_status_condition_no_sample_lock();
    1774             : 
    1775           0 :       return;
    1776           0 :     }
    1777             : 
    1778             :     {
    1779           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1780             : 
    1781             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1782           0 :       SharedInstanceMap_rch inst;
    1783           0 :       OwnershipManagerScopedAccess ownership_scoped_access;
    1784           0 :       OwnershipManagerPtr owner_manager = ownership_manager();
    1785             : 
    1786           0 :       bool new_handle = true;
    1787           0 :       if (is_exclusive_ownership_) {
    1788           0 :         OwnershipManagerScopedAccess temp(owner_manager);
    1789           0 :         temp.swap(ownership_scoped_access);
    1790           0 :         if (!owner_manager || ownership_scoped_access.lock_result_ != 0) {
    1791           0 :           if (DCPS_debug_level > 0) {
    1792           0 :             ACE_ERROR ((LM_ERROR,
    1793             :                         ACE_TEXT("(%P|%t) ")
    1794             :                         ACE_TEXT("%CDataReaderImpl::")
    1795             :                         ACE_TEXT("store_instance_data, ")
    1796             :                         ACE_TEXT("acquire instance_lock failed.\n"), TraitsType::type_name()));
    1797             :           }
    1798           0 :           return;
    1799             :         }
    1800             : 
    1801           0 :         inst = dynamic_rchandle_cast<SharedInstanceMap>(
    1802           0 :           owner_manager->get_instance_map(topic_servant_->type_name(), this));
    1803           0 :         if (inst != 0) {
    1804           0 :           typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
    1805           0 :           if (iter != inst->end ()) {
    1806           0 :             handle = iter->second;
    1807           0 :             new_handle = false;
    1808             :           }
    1809             :         }
    1810           0 :       }
    1811             : #endif
    1812             : 
    1813           0 :       just_registered = true;
    1814           0 :       DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
    1815           0 :       bool owns_handle = false;
    1816           0 :       if (handle == DDS::HANDLE_NIL) {
    1817           0 :         handle = get_next_handle(key);
    1818           0 :         owns_handle = true;
    1819             :       }
    1820           0 :       OpenDDS::DCPS::SubscriptionInstance_rch instance =
    1821             :         OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
    1822             :           rchandle_from(this),
    1823           0 :           qos_,
    1824           0 :           ref(instances_lock_),
    1825             :           handle, owns_handle);
    1826             : 
    1827             :       const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
    1828           0 :         instances_.insert(typename SubscriptionInstanceMapType::value_type(handle, instance));
    1829             : 
    1830           0 :       if (bpair.second == false) {
    1831           0 :         if (DCPS_debug_level > 0) {
    1832           0 :           ACE_ERROR((LM_ERROR,
    1833             :                      ACE_TEXT("(%P|%t) ")
    1834             :                      ACE_TEXT("%CDataReaderImpl::")
    1835             :                      ACE_TEXT("store_instance_data, ")
    1836             :                      ACE_TEXT("insert handle failed.\n"), TraitsType::type_name()));
    1837             :         }
    1838           0 :         return;
    1839             :       }
    1840           0 :       update_lookup_maps(bpair.first);
    1841             : 
    1842             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1843           0 :       if (owner_manager) {
    1844           0 :         if (!inst) {
    1845           0 :           inst = make_rch<SharedInstanceMap>();
    1846           0 :           owner_manager->set_instance_map(
    1847           0 :             topic_servant_->type_name(),
    1848             :             inst,
    1849             :             this);
    1850             :         }
    1851             : 
    1852           0 :         if (new_handle) {
    1853             :           const std::pair<typename InstanceMap::iterator, bool> bpair =
    1854           0 :             inst->insert(typename InstanceMap::value_type(*instance_data, handle));
    1855           0 :           if (!bpair.second) {
    1856           0 :             if (DCPS_debug_level > 0) {
    1857           0 :               ACE_ERROR ((LM_ERROR,
    1858             :                           ACE_TEXT("(%P|%t) ")
    1859             :                           ACE_TEXT("%CDataReaderImpl::")
    1860             :                           ACE_TEXT("store_instance_data, ")
    1861             :                           ACE_TEXT("insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
    1862             :             }
    1863           0 :             return;
    1864             :           }
    1865             :         }
    1866             : 
    1867           0 :         OwnershipManagerScopedAccess temp;
    1868           0 :         temp.swap(ownership_scoped_access);
    1869           0 :         if (temp.release() != 0) {
    1870           0 :           if (DCPS_debug_level > 0) {
    1871           0 :             ACE_ERROR ((LM_ERROR,
    1872             :                         ACE_TEXT("(%P|%t) ")
    1873             :                         ACE_TEXT("%CDataReaderImpl::")
    1874             :                         ACE_TEXT("store_instance_data, ")
    1875             :                         ACE_TEXT("release instance_lock failed.\n"), TraitsType::type_name()));
    1876             :           }
    1877           0 :           return;
    1878             :         }
    1879           0 :       }
    1880             : #endif
    1881           0 :     } // scope for instances_lock_
    1882             : 
    1883             :     std::pair<typename InstanceMap::iterator, bool> bpair =
    1884           0 :       instance_map_.insert(typename InstanceMap::value_type(*instance_data,
    1885             :         handle));
    1886           0 :     if (bpair.second == false)
    1887             :     {
    1888           0 :       if (DCPS_debug_level > 0) {
    1889           0 :         ACE_ERROR ((LM_ERROR,
    1890             :                     ACE_TEXT("(%P|%t) ")
    1891             :                     ACE_TEXT("%CDataReaderImpl::")
    1892             :                     ACE_TEXT("store_instance_data, ")
    1893             :                     ACE_TEXT("insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
    1894             :       }
    1895           0 :       return;
    1896             :     }
    1897           0 :     reverse_instance_map_[handle] = bpair.first;
    1898             :   }
    1899             :   else
    1900             :   {
    1901           0 :     just_registered = false;
    1902           0 :     handle = it->second;
    1903             :   }
    1904             : 
    1905           0 :   if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
    1906             :   {
    1907           0 :     instance_ptr = get_handle_instance(handle);
    1908           0 :     OPENDDS_ASSERT(instance_ptr);
    1909             : 
    1910           0 :     if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
    1911             :     {
    1912             :       {
    1913           0 :         ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1914           0 :         filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
    1915           0 :       }
    1916             : 
    1917           0 :       MonotonicTimePoint now;
    1918           0 :       MonotonicTimePoint deadline;
    1919           0 :       if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
    1920           0 :         filtered = true;
    1921           0 :         if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
    1922           0 :           delay_sample(handle, move(instance_data), header, just_registered, now, deadline);
    1923             :         }
    1924             :       } else {
    1925             :         // nothing time based filtered now
    1926           0 :         clear_sample(handle);
    1927             : 
    1928             :       }
    1929             : 
    1930           0 :       if (filtered) {
    1931           0 :         return;
    1932             :       }
    1933           0 :     }
    1934             : 
    1935           0 :     finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
    1936             :   }
    1937             :   else
    1938             :   {
    1939           0 :     instance_ptr = get_handle_instance(handle);
    1940           0 :     OPENDDS_ASSERT(instance_ptr);
    1941           0 :     instance_ptr->instance_state_->lively(header.publication_id_);
    1942             :   }
    1943             : }
    1944             : 
    1945           0 : void finish_store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data, const DataSampleHeader& header,
    1946             :   SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg )
    1947             : {
    1948           0 :   if ((qos_.resource_limits.max_samples_per_instance !=
    1949           0 :         DDS::LENGTH_UNLIMITED) &&
    1950           0 :       (instance_ptr->rcvd_samples_.size() >=
    1951           0 :         static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
    1952             : 
    1953             :     // According to spec 1.2, Samples that contain no data do not
    1954             :     // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
    1955             :     // so do not remove the oldest sample when unregister/dispose
    1956             :     // message arrives.
    1957             : 
    1958           0 :     if (!is_dispose_msg && !is_unregister_msg
    1959           0 :       && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
    1960             :     {
    1961           0 :       DDS::DataReaderListener_var listener
    1962             :         = listener_for(DDS::SAMPLE_REJECTED_STATUS);
    1963             : 
    1964           0 :       set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
    1965             : 
    1966           0 :       sample_rejected_status_.last_reason =
    1967             :         DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
    1968           0 :       ++sample_rejected_status_.total_count;
    1969           0 :       ++sample_rejected_status_.total_count_change;
    1970           0 :       sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
    1971             : 
    1972           0 :       if (!CORBA::is_nil(listener.in()))
    1973             :       {
    1974           0 :         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    1975             : 
    1976           0 :         listener->on_sample_rejected(this, sample_rejected_status_);
    1977           0 :         sample_rejected_status_.total_count_change = 0;
    1978           0 :       }  // do we want to do something if listener is nil???
    1979           0 :       notify_status_condition_no_sample_lock();
    1980           0 :       return;
    1981           0 :     }
    1982           0 :     else if (!is_dispose_msg && !is_unregister_msg)
    1983             :     {
    1984             :       // Discard the oldest previously-read sample
    1985             :       OpenDDS::DCPS::ReceivedDataElement* item =
    1986           0 :         instance_ptr->rcvd_samples_.remove_head();
    1987           0 :       item->dec_ref();
    1988             :     }
    1989             :   }
    1990           0 :   else if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
    1991             :   {
    1992           0 :     CORBA::Long total_samples = 0;
    1993             :     {
    1994           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
    1995           0 :       for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
    1996           0 :         iter != instances_.end();
    1997           0 :         ++iter) {
    1998           0 :         OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
    1999             : 
    2000           0 :         total_samples += (CORBA::Long) ptr->rcvd_samples_.size();
    2001             :       }
    2002           0 :     }
    2003             : 
    2004           0 :     if (total_samples >= qos_.resource_limits.max_samples)
    2005             :     {
    2006             :       // According to spec 1.2, Samples that contain no data do not
    2007             :       // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
    2008             :       // so do not remove the oldest sample when unregister/dispose
    2009             :       // message arrives.
    2010             : 
    2011           0 :       if (!is_dispose_msg && !is_unregister_msg
    2012           0 :         && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
    2013             :       {
    2014           0 :         DDS::DataReaderListener_var listener
    2015             :           = listener_for(DDS::SAMPLE_REJECTED_STATUS);
    2016             : 
    2017           0 :         set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
    2018             : 
    2019           0 :         sample_rejected_status_.last_reason =
    2020             :           DDS::REJECTED_BY_SAMPLES_LIMIT;
    2021           0 :         ++sample_rejected_status_.total_count;
    2022           0 :         ++sample_rejected_status_.total_count_change;
    2023           0 :         sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
    2024           0 :         if (!CORBA::is_nil(listener.in()))
    2025             :         {
    2026           0 :           ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2027             : 
    2028           0 :           listener->on_sample_rejected(this, sample_rejected_status_);
    2029           0 :           sample_rejected_status_.total_count_change = 0;
    2030           0 :         }  // do we want to do something if listener is nil???
    2031           0 :         notify_status_condition_no_sample_lock();
    2032             : 
    2033           0 :         return;
    2034           0 :       }
    2035           0 :       else if (!is_dispose_msg && !is_unregister_msg)
    2036             :       {
    2037             :         // Discard the oldest previously-read sample
    2038             :         OpenDDS::DCPS::ReceivedDataElement *item =
    2039           0 :           instance_ptr->rcvd_samples_.remove_head();
    2040           0 :         item->dec_ref();
    2041             :       }
    2042             :     }
    2043             :   }
    2044             : 
    2045           0 :   bool event_notify = false;
    2046             : 
    2047           0 :   if (is_dispose_msg) {
    2048           0 :     event_notify = instance_ptr->instance_state_->dispose_was_received(header.publication_id_);
    2049             :   }
    2050             : 
    2051           0 :   if (is_unregister_msg) {
    2052           0 :     if (instance_ptr->instance_state_->unregister_was_received(header.publication_id_)) {
    2053           0 :       event_notify = true;
    2054             :     }
    2055             :   }
    2056             : 
    2057           0 :   if (!is_dispose_msg && !is_unregister_msg) {
    2058           0 :     event_notify = true;
    2059           0 :     instance_ptr->instance_state_->data_was_received(header.publication_id_);
    2060             :   }
    2061             : 
    2062           0 :   if (!event_notify) {
    2063           0 :     return;
    2064             :   }
    2065             : 
    2066           0 :   ReceivedDataElement* const ptr =
    2067           0 :     new (*rd_allocator_.get()) ReceivedDataElementWithType<MessageTypeWithAllocator>(
    2068           0 :       header, instance_data.release(), &sample_lock_);
    2069             : 
    2070           0 :   ptr->disposed_generation_count_ =
    2071           0 :     instance_ptr->instance_state_->disposed_generation_count();
    2072           0 :   ptr->no_writers_generation_count_ =
    2073           0 :     instance_ptr->instance_state_->no_writers_generation_count();
    2074             : 
    2075           0 :   instance_ptr->last_sequence_ = header.sequence_;
    2076             : 
    2077           0 :   instance_ptr->rcvd_strategy_->add(ptr);
    2078             : 
    2079           0 :   if (! is_dispose_msg  && ! is_unregister_msg
    2080           0 :       && instance_ptr->rcvd_samples_.size() > get_depth())
    2081             :     {
    2082             :       OpenDDS::DCPS::ReceivedDataElement* head_ptr =
    2083           0 :         instance_ptr->rcvd_samples_.remove_head();
    2084             : 
    2085           0 :       if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
    2086             :         {
    2087           0 :           DDS::DataReaderListener_var listener
    2088             :             = listener_for (DDS::SAMPLE_LOST_STATUS);
    2089             : 
    2090           0 :           ++sample_lost_status_.total_count;
    2091           0 :           ++sample_lost_status_.total_count_change;
    2092             : 
    2093           0 :           set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
    2094             : 
    2095           0 :           if (!CORBA::is_nil(listener.in()))
    2096             :             {
    2097           0 :               ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2098             : 
    2099           0 :               listener->on_sample_lost(this, sample_lost_status_);
    2100             : 
    2101           0 :               sample_lost_status_.total_count_change = 0;
    2102           0 :             }
    2103             : 
    2104           0 :           notify_status_condition_no_sample_lock();
    2105           0 :         }
    2106             : 
    2107           0 :       head_ptr->dec_ref();
    2108             :     }
    2109             : 
    2110             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    2111           0 :   if (! ptr->coherent_change_) {
    2112             : #endif
    2113           0 :     RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant();
    2114           0 :     if (!sub || get_deleted())
    2115           0 :       return;
    2116             : 
    2117           0 :     sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true);
    2118             : 
    2119           0 :     set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
    2120             : 
    2121           0 :     DDS::SubscriberListener_var sub_listener =
    2122             :         sub->listener_for(DDS::DATA_ON_READERS_STATUS);
    2123           0 :     if (!CORBA::is_nil(sub_listener.in()) && !coherent_) {
    2124           0 :       if (!is_bit()) {
    2125           0 :         sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
    2126           0 :         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2127           0 :         sub_listener->on_data_on_readers(sub.in());
    2128           0 :       } else {
    2129           0 :         TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(sub, sub_listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, false));
    2130             :       }
    2131             :     } else {
    2132           0 :       sub->notify_status_condition();
    2133             : 
    2134           0 :       DDS::DataReaderListener_var listener =
    2135             :         listener_for (DDS::DATA_AVAILABLE_STATUS);
    2136             : 
    2137           0 :       if (!CORBA::is_nil(listener.in())) {
    2138           0 :         if (!is_bit()) {
    2139           0 :           set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
    2140           0 :           sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
    2141           0 :           sub.reset();
    2142           0 :           ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2143           0 :           listener->on_data_available(this);
    2144           0 :         } else {
    2145           0 :           TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, true, true));
    2146             :         }
    2147             :       } else {
    2148           0 :         notify_status_condition_no_sample_lock();
    2149             :       }
    2150           0 :     }
    2151             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    2152           0 :   }
    2153             : #endif
    2154             : }
    2155             : 
    2156             : /// Release sample_lock_ during status notifications in store_instance_data()
    2157             : /// as the lock is not needed and could cause deadlock condition.
    2158             : /// See comments in member function implementation for details.
    2159           0 : void notify_status_condition_no_sample_lock()
    2160             : {
    2161             :   // This member function avoids a deadlock condition which otherwise
    2162             :   // could occur as follows:
    2163             :   // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
    2164             :   // eventually DataReaderImpl::sample_lock_ to lock in call to
    2165             :   // DataReaderImpl::contains_samples().
    2166             :   // Thread2: Call to DataReaderImpl::data_received()
    2167             :   // causes DataReaderImpl::sample_lock_ to lock and eventually
    2168             :   // during notify of status condition a call to WaitSet::signal()
    2169             :   // causes WaitSet::lock_ to lock.
    2170             :   // Because the DataReaderImpl::sample_lock_ is not needed during
    2171             :   // status notification this member function is used in
    2172             :   // store_instance_data() to release sample_lock_ before making
    2173             :   // the notification.
    2174           0 :   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
    2175           0 :   notify_status_condition();
    2176           0 : }
    2177             : 
    2178             : 
    2179             : /// Common input read* & take* input processing and precondition checks
    2180           0 : DDS::ReturnCode_t check_inputs(const char* method_name,
    2181             :                                MessageSequenceType& received_data,
    2182             :                                DDS::SampleInfoSeq& info_seq,
    2183             :                                ::CORBA::Long max_samples)
    2184             : {
    2185           0 :   typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
    2186             : 
    2187             :   // ---- start of preconditions common to read and take -----
    2188             :   // SPEC ref v1.2 7.1.2.5.3.8 #1
    2189             :   // NOTE: We can't check maximum() or release() here since those are
    2190             :   //       implementation details of the sequences.  In general, the
    2191             :   //       info_seq will have release() == true and maximum() == 0.
    2192             :   //       If we're in zero-copy mode, the received_data will have
    2193             :   //       release() == false and maximum() == 0.  If it's not
    2194             :   //       zero-copy then received_data will have release == true()
    2195             :   //       and maximum() == anything.
    2196           0 :   if (received_data.length() != info_seq.length())
    2197             :     {
    2198           0 :       ACE_DEBUG((LM_DEBUG,
    2199             :                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
    2200             :                  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
    2201             :                  ACE_TEXT("sequences do not match.\n"),
    2202             :                  TraitsType::type_name(),
    2203             :                  method_name ));
    2204           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
    2205             :     }
    2206             : 
    2207             :   //SPEC ref v1.2 7.1.2.5.3.8 #4
    2208           0 :   if ((received_data.maximum() > 0) && (received_data.release() == false))
    2209             :     {
    2210           0 :       ACE_DEBUG((LM_DEBUG,
    2211             :                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
    2212             :                  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
    2213             :                  ACE_TEXT("maximum %d and owns %d\n"),
    2214             :                  TraitsType::type_name(),
    2215             :                  method_name,
    2216             :                  received_data.maximum(),
    2217             :                  received_data.release() ));
    2218             : 
    2219           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
    2220             :     }
    2221             : 
    2222           0 :   if (received_data.maximum() == 0)
    2223             :     {
    2224             :       // not in SPEC but needed.
    2225           0 :       if (max_samples == DDS::LENGTH_UNLIMITED)
    2226             :         {
    2227           0 :           max_samples =
    2228           0 :             static_cast< ::CORBA::Long> (received_data_p.max_slots());
    2229             :         }
    2230             :     }
    2231             :   else
    2232             :     {
    2233           0 :       if (max_samples == DDS::LENGTH_UNLIMITED)
    2234             :         {
    2235             :           //SPEC ref v1.2 7.1.2.5.3.8 #5a
    2236           0 :           max_samples = received_data.maximum();
    2237             :         }
    2238           0 :       else if (
    2239           0 :                max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
    2240             :         {
    2241             :           //SPEC ref v1.2 7.1.2.5.3.8 #5c
    2242           0 :           ACE_DEBUG((LM_DEBUG,
    2243             :                      ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
    2244             :                      ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
    2245             :                      TraitsType::type_name(),
    2246             :                      method_name,
    2247             :                      max_samples,
    2248             :                      received_data.maximum()));
    2249           0 :           return DDS::RETCODE_PRECONDITION_NOT_MET;
    2250             :         }
    2251             :       //else
    2252             :       //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
    2253             :     }
    2254             : 
    2255             :   // The spec does not say what to do in this case but it appears to be a good thing.
    2256             :   // Note: max_slots is the greater of the sequence's maximum and init_size.
    2257           0 :   if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
    2258             :     {
    2259           0 :       max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
    2260             :     }
    2261             :   //---- end of preconditions common to read and take -----
    2262             : 
    2263           0 :   return DDS::RETCODE_OK;
    2264             : }
    2265             : 
    2266           0 : void delay_sample(DDS::InstanceHandle_t handle,
    2267             :                   unique_ptr<MessageTypeWithAllocator> data,
    2268             :                   const OpenDDS::DCPS::DataSampleHeader& header,
    2269             :                   const bool just_registered,
    2270             :                   const MonotonicTimePoint& now,
    2271             :                   const MonotonicTimePoint& deadline)
    2272             : {
    2273             :   // sample_lock_ should already be held
    2274           0 :   DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header));
    2275             : 
    2276           0 :   typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
    2277           0 :   if (i == filter_delayed_sample_map_.end()) {
    2278             : 
    2279             :     // emplace()/insert() only if the sample is going to be
    2280             :     // new (otherwise we call move(data) twice).
    2281             :     std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
    2282             : #ifdef ACE_HAS_CPP11
    2283           0 :       filter_delayed_sample_map_.emplace(std::piecewise_construct,
    2284             :                                          std::forward_as_tuple(handle),
    2285           0 :                                          std::forward_as_tuple(move(data), hdr, just_registered));
    2286             : #else
    2287             :       filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
    2288             : #endif
    2289           0 :     FilterDelayedSample& sample = result.first->second;
    2290           0 :     sample.expiration_time = deadline;
    2291           0 :     const bool schedule = filter_delayed_sample_queue_.empty();
    2292           0 :     filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
    2293           0 :     if (schedule) {
    2294           0 :       filter_delayed_sample_task_->schedule(now - deadline);
    2295           0 :     } else if (filter_delayed_sample_queue_.begin()->second == handle) {
    2296           0 :       filter_delayed_sample_task_->cancel();
    2297           0 :       filter_delayed_sample_task_->schedule(now - deadline);
    2298             :     }
    2299             :   } else {
    2300           0 :     FilterDelayedSample& sample = i->second;
    2301             :     // we only care about the most recently filtered sample, so clean up the last one
    2302             : 
    2303           0 :     sample.message = move(data);
    2304           0 :     sample.header = hdr;
    2305           0 :     sample.new_instance = just_registered;
    2306             :     // already scheduled for timeout at the desired time
    2307             :   }
    2308           0 : }
    2309             : 
    2310           0 : void clear_sample(DDS::InstanceHandle_t handle)
    2311             : {
    2312             :   // sample_lock_ should already be held
    2313             : 
    2314           0 :   typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
    2315           0 :   if (sample != filter_delayed_sample_map_.end()) {
    2316             :     // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
    2317           0 :     sample->second.message.reset();
    2318             :   }
    2319           0 : }
    2320             : 
    2321           0 : void drop_sample(DDS::InstanceHandle_t handle)
    2322             : {
    2323             :   // sample_lock_ should already be held
    2324             : 
    2325           0 :   typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
    2326           0 :   if (sample != filter_delayed_sample_map_.end()) {
    2327           0 :     for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
    2328           0 :       if (pos->second == handle) {
    2329           0 :         filter_delayed_sample_queue_.erase(pos);
    2330           0 :         break;
    2331             :       }
    2332             :     }
    2333             : 
    2334             :     // use the handle to erase, since the sample lock was released
    2335           0 :     filter_delayed_sample_map_.erase(handle);
    2336             :   }
    2337           0 : }
    2338             : 
    2339           0 : void filter_delayed(const MonotonicTimePoint& now)
    2340             : {
    2341           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    2342             : 
    2343             :   // Make a copy because finish_store_instance_data will release the sample lock.
    2344             :   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) Handles;
    2345           0 :   Handles handles;
    2346             : 
    2347           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
    2348             : 
    2349           0 :   for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
    2350           0 :     handles.push_back(pos->second);
    2351           0 :     filter_delayed_sample_queue_.erase(pos++);
    2352             :   }
    2353             : 
    2354           0 :   const TimeDuration interval(qos_.time_based_filter.minimum_separation);
    2355             : 
    2356           0 :   for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
    2357           0 :     const DDS::InstanceHandle_t handle = *pos;
    2358             : 
    2359           0 :     SubscriptionInstance_rch instance = get_handle_instance(handle);
    2360           0 :     if (!instance) {
    2361           0 :       continue;
    2362             :     }
    2363             : 
    2364           0 :     typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
    2365           0 :     if (data == filter_delayed_sample_map_.end()) {
    2366           0 :       continue;
    2367             :     }
    2368             : 
    2369           0 :     if (data->second.message) {
    2370           0 :       const bool NOT_DISPOSE_MSG = false;
    2371           0 :       const bool NOT_UNREGISTER_MSG = false;
    2372             :       // clear the message, since ownership is being transferred to finish_store_instance_data.
    2373             : 
    2374           0 :       instance->last_accepted_.set_to_now();
    2375           0 :       const DataSampleHeader_ptr header = data->second.header;
    2376           0 :       const bool new_instance = data->second.new_instance;
    2377             : 
    2378             :       // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
    2379           0 :       finish_store_instance_data(move(data->second.message),
    2380           0 :                                  *header,
    2381             :                                  instance,
    2382             :                                  NOT_DISPOSE_MSG,
    2383             :                                  NOT_UNREGISTER_MSG);
    2384             : 
    2385           0 :       accept_sample_processing(instance, *header, new_instance);
    2386             : 
    2387             :       // Refresh the iterator.
    2388           0 :       data = filter_delayed_sample_map_.find(handle);
    2389           0 :       if (data == filter_delayed_sample_map_.end()) {
    2390           0 :         continue;
    2391             :       }
    2392             : 
    2393             :       // Reschedule.
    2394           0 :       data->second.expiration_time = now + interval;
    2395           0 :       filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
    2396             : 
    2397           0 :     } else {
    2398             :       // this check is performed to handle the corner case where
    2399             :       // store_instance_data received and delivered a sample, while this
    2400             :       // method was waiting for the lock
    2401           0 :       if (MonotonicTimePoint::now() - instance->last_sample_tv_ >= interval) {
    2402             :         // no new data to process, so remove from container
    2403           0 :         filter_delayed_sample_map_.erase(data);
    2404             :       }
    2405             :     }
    2406             :   }
    2407             : 
    2408           0 :   if (!filter_delayed_sample_queue_.empty()) {
    2409           0 :     filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
    2410             :   }
    2411           0 : }
    2412             : 
    2413           0 : unique_ptr<DataAllocator>& data_allocator() { return data_allocator_; }
    2414             : 
    2415             : unique_ptr<DataAllocator> data_allocator_;
    2416             : 
    2417             : InstanceMap instance_map_;
    2418             : ReverseInstanceMap reverse_instance_map_;
    2419             : 
    2420             : typedef DCPS::PmfSporadicTask<DataReaderImpl_T> DRISporadicTask;
    2421             : 
    2422             : RcHandle<DRISporadicTask> filter_delayed_sample_task_;
    2423             : #ifdef OPENDDS_HAS_STD_SHARED_PTR
    2424             : typedef std::shared_ptr<const OpenDDS::DCPS::DataSampleHeader> DataSampleHeader_ptr;
    2425             : #else
    2426             : typedef ACE_Strong_Bound_Ptr<const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex> DataSampleHeader_ptr;
    2427             : #endif
    2428             : struct FilterDelayedSample {
    2429           0 :   FilterDelayedSample(unique_ptr<MessageTypeWithAllocator> msg, DataSampleHeader_ptr hdr, bool new_inst)
    2430           0 :     : message(move(msg))
    2431           0 :     , header(hdr)
    2432           0 :     , new_instance(new_inst)
    2433           0 :   {}
    2434             :   container_supported_unique_ptr<MessageTypeWithAllocator> message;
    2435             :   DataSampleHeader_ptr header;
    2436             :   bool new_instance;
    2437             :   MonotonicTimePoint expiration_time;
    2438             : };
    2439             : typedef OPENDDS_MAP(DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap;
    2440             : FilterDelayedSampleMap filter_delayed_sample_map_;
    2441             : typedef OPENDDS_MULTIMAP(MonotonicTimePoint, DDS::InstanceHandle_t) FilterDelayedSampleQueue;
    2442             : FilterDelayedSampleQueue filter_delayed_sample_queue_;
    2443             : 
    2444             : bool marshal_skip_serialize_;
    2445             : 
    2446             : };
    2447             : 
    2448             : template <typename MessageType>
    2449           0 : void* DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator new(size_t , ACE_New_Allocator& pool)
    2450             : {
    2451             :   typedef typename DataReaderImpl_T<MessageType>::MessageTypeMemoryBlock MessageTypeMemoryBlock;
    2452             :   MessageTypeMemoryBlock* block =
    2453           0 :     static_cast<MessageTypeMemoryBlock*>(pool.malloc(sizeof(MessageTypeMemoryBlock)));
    2454           0 :   block->allocator_ = &pool;
    2455           0 :   return block;
    2456             : }
    2457             : 
    2458             : template <typename MessageType>
    2459           0 : void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory)
    2460             : {
    2461           0 :   if (memory) {
    2462           0 :     MessageTypeMemoryBlock* block = static_cast<MessageTypeMemoryBlock*>(memory);
    2463           0 :     block->allocator_->free(block);
    2464             :   }
    2465           0 : }
    2466             : 
    2467             : template <typename MessageType>
    2468           0 : void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory, ACE_New_Allocator&)
    2469             : {
    2470           0 :   operator delete(memory);
    2471           0 : }
    2472             : 
    2473             : }
    2474             : }
    2475             : 
    2476             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
    2477             : 
    2478             : #endif /* OPENDDS_DDS_DCPS_DATAREADERIMPL_T_H */

Generated by: LCOV version 1.16