DataReaderImpl_T.h

Go to the documentation of this file.
00001 #ifndef dds_DCPS_DataReaderImpl_T_h
00002 #define dds_DCPS_DataReaderImpl_T_h
00003 #include "dds/DCPS/MultiTopicImpl.h"
00004 #include "dds/DCPS/RakeResults_T.h"
00005 #include "dds/DCPS/SubscriberImpl.h"
00006 #include "dds/DCPS/BuiltInTopicUtils.h"
00007 #include "dds/DCPS/Util.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dds/DCPS/Watchdog.h"
00010 #include "dcps_export.h"
00011 #include "dds/DCPS/GuidConverter.h"
00012 
00013 #include "ace/Bound_Ptr.h"
00014 #include "ace/Time_Value.h"
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019   namespace DCPS {
00020 
00021   /** Servant for DataReader interface of Traits::MessageType data type.
00022    *
00023    * See the DDS specification, OMG formal/04-12-02, for a description of
00024    * this interface.
00025    *
00026    */
00027   template <typename MessageType>
00028     class
00029 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00030     OpenDDS_Dcps_Export
00031 #endif
00032     DataReaderImpl_T
00033     : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataReaderType>,
00034       public virtual OpenDDS::DCPS::DataReaderImpl
00035   {
00036   public:
00037     typedef DDSTraits<MessageType> TraitsType;
00038     typedef typename TraitsType::MessageSequenceType MessageSequenceType;
00039 
00040     typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
00041                               typename TraitsType::LessThanType) InstanceMap;
00042 
00043     class SharedInstanceMap
00044       : public RcObject
00045       , public InstanceMap
00046     {
00047     };
00048 
00049     typedef RcHandle<SharedInstanceMap> SharedInstanceMap_rch;
00050 
00051     class MessageTypeWithAllocator
00052       : public MessageType
00053       , public EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>
00054     {
00055     public:
00056       void* operator new(size_t size, ACE_New_Allocator& pool);
00057       void operator delete(void* memory, ACE_New_Allocator& pool);
00058       void operator delete(void* memory);
00059 
00060       MessageTypeWithAllocator(){}
00061       MessageTypeWithAllocator(const MessageType& other)
00062         : MessageType(other)
00063       {
00064       }
00065     };
00066 
00067     struct MessageTypeMemoryBlock {
00068       MessageTypeWithAllocator element_;
00069       ACE_New_Allocator* allocator_;
00070     };
00071 
00072     typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageTypeMemoryBlock, ACE_Null_Mutex>  DataAllocator;
00073 
00074     typedef typename TraitsType::DataReaderType Interface;
00075 
00076     DataReaderImpl_T (void)
00077     : filter_delayed_handler_(make_rch<FilterDelayedHandler>(ref(*this)))
00078     {
00079     }
00080 
00081     virtual ~DataReaderImpl_T (void)
00082     {
00083       for (typename InstanceMap::iterator it = instance_map_.begin();
00084            it != instance_map_.end(); ++it)
00085         {
00086           OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
00087           this->purge_data(ptr);
00088         }
00089       //X SHH release the data samples in the instance_map_.
00090     }
00091 
00092     /**
00093      * Do parts of enable specific to the datatype.
00094      * Called by DataReaderImpl::enable().
00095      */
00096     virtual DDS::ReturnCode_t enable_specific ()
00097     {
00098       data_allocator().reset(new DataAllocator(get_n_chunks ()));
00099       if (OpenDDS::DCPS::DCPS_debug_level >= 2)
00100         ACE_DEBUG((LM_DEBUG,
00101                    ACE_TEXT("(%P|%t) %CDataReaderImpl::")
00102                    ACE_TEXT("enable_specific-data")
00103                    ACE_TEXT(" Cached_Allocator_With_Overflow ")
00104                    ACE_TEXT("%x with %d chunks\n"),
00105                    TraitsType::type_name(),
00106                    data_allocator().get(),
00107                    this->get_n_chunks ()));
00108 
00109       return DDS::RETCODE_OK;
00110     }
00111 
00112     virtual DDS::ReturnCode_t read (
00113                                     MessageSequenceType & received_data,
00114                                     DDS::SampleInfoSeq & info_seq,
00115                                     ::CORBA::Long max_samples,
00116                                     DDS::SampleStateMask sample_states,
00117                                     DDS::ViewStateMask view_states,
00118                                     DDS::InstanceStateMask instance_states)
00119     {
00120       DDS::ReturnCode_t const precond =
00121         check_inputs("read", received_data, info_seq, max_samples);
00122       if (DDS::RETCODE_OK != precond)
00123         {
00124           return precond;
00125         }
00126 
00127       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00128                         guard,
00129                         this->sample_lock_,
00130                         DDS::RETCODE_ERROR);
00131 
00132       return read_i(received_data, info_seq, max_samples, sample_states,
00133                     view_states, instance_states, 0);
00134     }
00135 
00136     virtual DDS::ReturnCode_t take (
00137                                       MessageSequenceType & received_data,
00138                                       DDS::SampleInfoSeq & info_seq,
00139                                       ::CORBA::Long max_samples,
00140                                       DDS::SampleStateMask sample_states,
00141                                       DDS::ViewStateMask view_states,
00142                                       DDS::InstanceStateMask instance_states)
00143     {
00144       DDS::ReturnCode_t const precond =
00145         check_inputs("take", received_data, info_seq, max_samples);
00146       if (DDS::RETCODE_OK != precond)
00147         {
00148           return precond;
00149         }
00150 
00151       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00152                         guard,
00153                         this->sample_lock_,
00154                         DDS::RETCODE_ERROR);
00155 
00156       return take_i(received_data, info_seq, max_samples, sample_states,
00157                     view_states, instance_states, 0);
00158     }
00159 
00160     virtual DDS::ReturnCode_t read_w_condition (
00161                                                   MessageSequenceType & received_data,
00162                                                   DDS::SampleInfoSeq & sample_info,
00163                                                   ::CORBA::Long max_samples,
00164                                                   DDS::ReadCondition_ptr a_condition)
00165     {
00166       DDS::ReturnCode_t const precond =
00167         check_inputs("read_w_condition", received_data, sample_info, max_samples);
00168       if (DDS::RETCODE_OK != precond)
00169         {
00170           return precond;
00171         }
00172 
00173       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00174                         DDS::RETCODE_ERROR);
00175 
00176       if (!has_readcondition(a_condition))
00177         {
00178           return DDS::RETCODE_PRECONDITION_NOT_MET;
00179         }
00180 
00181       return read_i(received_data, sample_info, max_samples,
00182                     a_condition->get_sample_state_mask(),
00183                     a_condition->get_view_state_mask(),
00184                     a_condition->get_instance_state_mask(),
00185 #ifndef OPENDDS_NO_QUERY_CONDITION
00186                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
00187 #else
00188       0);
00189 #endif
00190   }
00191 
00192     virtual DDS::ReturnCode_t take_w_condition (
00193                                                   MessageSequenceType & received_data,
00194                                                   DDS::SampleInfoSeq & sample_info,
00195                                                   ::CORBA::Long max_samples,
00196                                                   DDS::ReadCondition_ptr a_condition)
00197     {
00198       DDS::ReturnCode_t const precond =
00199         check_inputs("take_w_condition", received_data, sample_info, max_samples);
00200       if (DDS::RETCODE_OK != precond)
00201         {
00202           return precond;
00203         }
00204 
00205       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00206                         DDS::RETCODE_ERROR);
00207 
00208       if (!has_readcondition(a_condition))
00209         {
00210           return DDS::RETCODE_PRECONDITION_NOT_MET;
00211         }
00212 
00213       return take_i(received_data, sample_info, max_samples,
00214                     a_condition->get_sample_state_mask(),
00215                     a_condition->get_view_state_mask(),
00216                     a_condition->get_instance_state_mask(),
00217 #ifndef OPENDDS_NO_QUERY_CONDITION
00218                     dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
00219 #else
00220                     0
00221 #endif
00222                     );
00223     }
00224 
00225   virtual DDS::ReturnCode_t read_next_sample (
00226                                                 MessageType & received_data,
00227                                                 DDS::SampleInfo & sample_info)
00228   {
00229 
00230     bool found_data = false;
00231 
00232     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00233                       guard,
00234                       this->sample_lock_,
00235                       DDS::RETCODE_ERROR);
00236 
00237     typename InstanceMap::iterator const the_end = instance_map_.end ();
00238     for (typename InstanceMap::iterator it = instance_map_.begin ();
00239          it != the_end;
00240          ++it)
00241       {
00242         DDS::InstanceHandle_t handle = it->second;
00243         OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00244 
00245         bool mrg = false; //most_recent_generation
00246 
00247         if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00248             (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00249           {
00250             for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_;
00251                  item != 0;
00252                  item = item->next_data_sample_)
00253               {
00254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00255                 if (item->coherent_change_) continue;
00256 #endif
00257 
00258                 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00259                   {
00260                     if (item->registered_data_ != 0)
00261                       {
00262                         received_data =
00263                           *static_cast< MessageType *> (item->registered_data_);
00264                       }
00265                     ptr->instance_state_.sample_info(sample_info, item);
00266 
00267                     item->sample_state_ = DDS::READ_SAMPLE_STATE;
00268 
00269 
00270                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00271 
00272                     found_data = true;
00273                   }
00274                 if (found_data)
00275                   {
00276                     break;
00277                   }
00278               }
00279           }
00280 
00281         if (found_data)
00282           {
00283             if (mrg) ptr->instance_state_.accessed();
00284 
00285             // Get the sample_ranks, generation_ranks, and
00286             // absolute_generation_ranks for this info_seq
00287             this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00288 
00289             break;
00290           }
00291       }
00292     post_read_or_take();
00293     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00294   }
00295 
00296   virtual DDS::ReturnCode_t take_next_sample (
00297                                                 MessageType & received_data,
00298                                                 DDS::SampleInfo & sample_info)
00299   {
00300     bool found_data = false;
00301 
00302 
00303     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00304                       guard,
00305                       this->sample_lock_,
00306                       DDS::RETCODE_ERROR);
00307 
00308     typename InstanceMap::iterator const the_end = instance_map_.end ();
00309     for (typename InstanceMap::iterator it = instance_map_.begin ();
00310          it != the_end;
00311          ++it)
00312       {
00313         DDS::InstanceHandle_t handle = it->second;
00314         OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00315 
00316         bool mrg = false; //most_recent_generation
00317 
00318         OpenDDS::DCPS::ReceivedDataElement *tail = 0;
00319         if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00320             (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00321           {
00322 
00323             OpenDDS::DCPS::ReceivedDataElement *next;
00324             tail = 0;
00325             OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
00326             while (item)
00327               {
00328 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00329                 if (item->coherent_change_)
00330                   {
00331                     item = item->next_data_sample_;
00332                     continue;
00333                   }
00334 #endif
00335                 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00336                   {
00337                     if (item->registered_data_ != 0)
00338                       {
00339                         received_data =
00340                           *static_cast< MessageType *> (item->registered_data_);
00341                       }
00342                     ptr->instance_state_.sample_info(sample_info, item);
00343 
00344                     item->sample_state_ = DDS::READ_SAMPLE_STATE;
00345 
00346                     if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00347 
00348                     if (item == ptr->rcvd_samples_.tail_)
00349                       {
00350                         tail = ptr->rcvd_samples_.tail_;
00351                         item = item->next_data_sample_;
00352                       }
00353                     else
00354                       {
00355                         next = item->next_data_sample_;
00356 
00357                         ptr->rcvd_samples_.remove(item);
00358                         item->dec_ref();
00359 
00360                         item = next;
00361                       }
00362 
00363                     found_data = true;
00364                   }
00365                 if (found_data)
00366                   {
00367                     break;
00368                   }
00369               }
00370           }
00371 
00372         if (found_data)
00373           {
00374             if (mrg) ptr->instance_state_.accessed();
00375 
00376             //
00377             // Get the sample_ranks, generation_ranks, and
00378             // absolute_generation_ranks for this info_seq
00379             //
00380             if (tail)
00381               {
00382                 this->sample_info(sample_info, tail);
00383 
00384                 ptr->rcvd_samples_.remove(tail);
00385                 tail->dec_ref();
00386               }
00387             else
00388               {
00389                 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00390               }
00391 
00392             break;
00393           }
00394       }
00395     post_read_or_take();
00396     return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00397   }
00398 
00399   virtual DDS::ReturnCode_t read_instance (
00400                                              MessageSequenceType & received_data,
00401                                              DDS::SampleInfoSeq & info_seq,
00402                                              ::CORBA::Long max_samples,
00403                                              DDS::InstanceHandle_t a_handle,
00404                                              DDS::SampleStateMask sample_states,
00405                                              DDS::ViewStateMask view_states,
00406                                              DDS::InstanceStateMask instance_states)
00407   {
00408     DDS::ReturnCode_t const precond =
00409       check_inputs("read_instance", received_data, info_seq, max_samples);
00410     if (DDS::RETCODE_OK != precond)
00411       {
00412         return precond;
00413       }
00414 
00415     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00416                       guard,
00417                       this->sample_lock_,
00418                       DDS::RETCODE_ERROR);
00419     return read_instance_i(received_data, info_seq, max_samples, a_handle,
00420                            sample_states, view_states, instance_states, 0);
00421   }
00422 
00423   virtual DDS::ReturnCode_t take_instance (
00424                                              MessageSequenceType & received_data,
00425                                              DDS::SampleInfoSeq & info_seq,
00426                                              ::CORBA::Long max_samples,
00427                                              DDS::InstanceHandle_t a_handle,
00428                                              DDS::SampleStateMask sample_states,
00429                                              DDS::ViewStateMask view_states,
00430                                              DDS::InstanceStateMask instance_states)
00431   {
00432     DDS::ReturnCode_t const precond =
00433       check_inputs("take_instance", received_data, info_seq, max_samples);
00434     if (DDS::RETCODE_OK != precond)
00435       {
00436         return precond;
00437       }
00438 
00439     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00440                       guard,
00441                       this->sample_lock_,
00442                       DDS::RETCODE_ERROR);
00443     return take_instance_i(received_data, info_seq, max_samples, a_handle,
00444                            sample_states, view_states, instance_states, 0);
00445   }
00446 
00447   virtual DDS::ReturnCode_t read_instance_w_condition (
00448                                                        MessageSequenceType & received_data,
00449                                                        DDS::SampleInfoSeq & info_seq,
00450                                                        ::CORBA::Long max_samples,
00451                                                        DDS::InstanceHandle_t a_handle,
00452                                                        DDS::ReadCondition_ptr a_condition)
00453   {
00454     DDS::ReturnCode_t const precond =
00455       check_inputs("read_instance_w_condition", received_data, info_seq,
00456                    max_samples);
00457     if (DDS::RETCODE_OK != precond)
00458       {
00459         return precond;
00460       }
00461 
00462     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00463                       DDS::RETCODE_ERROR);
00464 
00465     if (!has_readcondition(a_condition))
00466       {
00467         return DDS::RETCODE_PRECONDITION_NOT_MET;
00468       }
00469 
00470 #ifndef OPENDDS_NO_QUERY_CONDITION
00471     DDS::QueryCondition_ptr query_condition =
00472         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00473 #endif
00474 
00475     return read_instance_i(received_data, info_seq, max_samples, a_handle,
00476                            a_condition->get_sample_state_mask(),
00477                            a_condition->get_view_state_mask(),
00478                            a_condition->get_instance_state_mask(),
00479 #ifndef OPENDDS_NO_QUERY_CONDITION
00480                            query_condition
00481 #else
00482                            0
00483 #endif
00484                            );
00485   }
00486 
00487   virtual DDS::ReturnCode_t take_instance_w_condition (
00488                                                        MessageSequenceType & received_data,
00489                                                        DDS::SampleInfoSeq & info_seq,
00490                                                        ::CORBA::Long max_samples,
00491                                                        DDS::InstanceHandle_t a_handle,
00492                                                        DDS::ReadCondition_ptr a_condition)
00493   {
00494     DDS::ReturnCode_t const precond =
00495       check_inputs("take_instance_w_condition", received_data, info_seq,
00496                    max_samples);
00497     if (DDS::RETCODE_OK != precond)
00498       {
00499         return precond;
00500       }
00501 
00502     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00503                       DDS::RETCODE_ERROR);
00504 
00505     if (!has_readcondition(a_condition))
00506       {
00507         return DDS::RETCODE_PRECONDITION_NOT_MET;
00508       }
00509 
00510 #ifndef OPENDDS_NO_QUERY_CONDITION
00511     DDS::QueryCondition_ptr query_condition =
00512         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00513 #endif
00514 
00515     return take_instance_i(received_data, info_seq, max_samples, a_handle,
00516                            a_condition->get_sample_state_mask(),
00517                            a_condition->get_view_state_mask(),
00518                            a_condition->get_instance_state_mask(),
00519 #ifndef OPENDDS_NO_QUERY_CONDITION
00520                            query_condition
00521 #else
00522                            0
00523 #endif
00524                            );
00525   }
00526 
00527   virtual DDS::ReturnCode_t read_next_instance (
00528                                                   MessageSequenceType & received_data,
00529                                                   DDS::SampleInfoSeq & info_seq,
00530                                                   ::CORBA::Long max_samples,
00531                                                   DDS::InstanceHandle_t a_handle,
00532                                                   DDS::SampleStateMask sample_states,
00533                                                   DDS::ViewStateMask view_states,
00534                                                   DDS::InstanceStateMask instance_states)
00535   {
00536     DDS::ReturnCode_t const precond =
00537       check_inputs("read_next_instance", received_data, info_seq, max_samples);
00538     if (DDS::RETCODE_OK != precond)
00539       {
00540         return precond;
00541       }
00542 
00543     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00544                                 sample_states, view_states, instance_states, 0);
00545   }
00546 
00547   virtual DDS::ReturnCode_t take_next_instance (
00548                                                   MessageSequenceType & received_data,
00549                                                   DDS::SampleInfoSeq & info_seq,
00550                                                   ::CORBA::Long max_samples,
00551                                                   DDS::InstanceHandle_t a_handle,
00552                                                   DDS::SampleStateMask sample_states,
00553                                                   DDS::ViewStateMask view_states,
00554                                                   DDS::InstanceStateMask instance_states)
00555   {
00556     DDS::ReturnCode_t const precond =
00557       check_inputs("take_next_instance", received_data, info_seq, max_samples);
00558     if (DDS::RETCODE_OK != precond)
00559       {
00560         return precond;
00561       }
00562 
00563     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00564                                 sample_states, view_states, instance_states, 0);
00565   }
00566 
00567   virtual DDS::ReturnCode_t read_next_instance_w_condition (
00568                                                               MessageSequenceType & received_data,
00569                                                               DDS::SampleInfoSeq & info_seq,
00570                                                               ::CORBA::Long max_samples,
00571                                                               DDS::InstanceHandle_t a_handle,
00572                                                               DDS::ReadCondition_ptr a_condition)
00573   {
00574     DDS::ReturnCode_t const precond =
00575       check_inputs("read_next_instance_w_condition", received_data, info_seq,
00576                    max_samples);
00577     if (DDS::RETCODE_OK != precond)
00578       {
00579         return precond;
00580       }
00581 
00582     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00583                       DDS::RETCODE_ERROR);
00584 
00585     if (!has_readcondition(a_condition))
00586       {
00587         return DDS::RETCODE_PRECONDITION_NOT_MET;
00588       }
00589 
00590 #ifndef OPENDDS_NO_QUERY_CONDITION
00591     DDS::QueryCondition_ptr query_condition =
00592         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00593 #endif
00594 
00595     return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00596                                 a_condition->get_sample_state_mask(),
00597                                 a_condition->get_view_state_mask(),
00598                                 a_condition->get_instance_state_mask(),
00599 #ifndef OPENDDS_NO_QUERY_CONDITION
00600                                 query_condition
00601 #else
00602                                 0
00603 #endif
00604                                 );
00605   }
00606 
00607   virtual DDS::ReturnCode_t take_next_instance_w_condition (
00608                                                               MessageSequenceType & received_data,
00609                                                               DDS::SampleInfoSeq & info_seq,
00610                                                               ::CORBA::Long max_samples,
00611                                                               DDS::InstanceHandle_t a_handle,
00612                                                               DDS::ReadCondition_ptr a_condition)
00613   {
00614     DDS::ReturnCode_t const precond =
00615       check_inputs("take_next_instance_w_condition", received_data, info_seq,
00616                    max_samples);
00617     if (DDS::RETCODE_OK != precond)
00618       {
00619         return precond;
00620       }
00621 
00622     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00623                       DDS::RETCODE_ERROR);
00624 
00625     if (!has_readcondition(a_condition))
00626       {
00627         return DDS::RETCODE_PRECONDITION_NOT_MET;
00628       }
00629 
00630 #ifndef OPENDDS_NO_QUERY_CONDITION
00631     DDS::QueryCondition_ptr query_condition =
00632         dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00633 #endif
00634 
00635     return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00636                                 a_condition->get_sample_state_mask(),
00637                                 a_condition->get_view_state_mask(),
00638                                 a_condition->get_instance_state_mask(),
00639 #ifndef OPENDDS_NO_QUERY_CONDITION
00640                                 query_condition
00641 #else
00642                                 0
00643 #endif
00644                                 );
00645   }
00646 
00647   virtual DDS::ReturnCode_t return_loan (
00648                                            MessageSequenceType & received_data,
00649                                            DDS::SampleInfoSeq & info_seq)
00650   {
00651     // Some incomplete tests to see that the data and info are from the
00652     // same read.
00653     if (received_data.length() != info_seq.length())
00654       {
00655         return DDS::RETCODE_PRECONDITION_NOT_MET;
00656       }
00657 
00658     if (received_data.release())
00659       {
00660         // nothing to do because this is not zero-copy data
00661         return DDS::RETCODE_OK;
00662       }
00663     else
00664       {
00665         info_seq.length(0);
00666         received_data.length(0);
00667       }
00668     return DDS::RETCODE_OK;
00669   }
00670 
00671   virtual DDS::ReturnCode_t get_key_value (
00672                                              MessageType & key_holder,
00673                                              DDS::InstanceHandle_t handle)
00674   {
00675     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00676                       guard,
00677                       this->sample_lock_,
00678                       DDS::RETCODE_ERROR);
00679 
00680     typename InstanceMap::iterator const the_end = instance_map_.end ();
00681     for (typename InstanceMap::iterator it = instance_map_.begin ();
00682          it != the_end;
00683          ++it)
00684       {
00685         if (it->second == handle)
00686           {
00687             key_holder = it->first;
00688             return DDS::RETCODE_OK;
00689           }
00690       }
00691 
00692     return DDS::RETCODE_BAD_PARAMETER;
00693   }
00694 
00695   virtual DDS::InstanceHandle_t lookup_instance (const MessageType & instance_data)
00696   {
00697     typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00698 
00699     if (it == instance_map_.end())
00700       {
00701         return DDS::HANDLE_NIL;
00702       }
00703     else
00704       {
00705         return it->second;
00706       }
00707   }
00708 
00709   virtual DDS::ReturnCode_t auto_return_loan(void* seq)
00710   {
00711     MessageSequenceType& received_data =
00712       *static_cast< MessageSequenceType*> (seq);
00713 
00714     if (!received_data.release())
00715       {
00716         // this->release_loan(received_data);
00717         received_data.length(0);
00718       }
00719     return DDS::RETCODE_OK;
00720   }
00721 
00722   void release_loan (MessageSequenceType & received_data)
00723   {
00724     received_data.length(0);
00725   }
00726 
00727 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00728   bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00729                                 DDS::ViewStateMask view_states,
00730                                 DDS::InstanceStateMask instance_states,
00731                                 const OpenDDS::DCPS::FilterEvaluator& evaluator,
00732                                 const DDS::StringSeq& params)
00733   {
00734     using namespace OpenDDS::DCPS;
00735     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
00736     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
00737 
00738     for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
00739            end = instances_.end(); iter != end; ++iter) {
00740       SubscriptionInstance& inst = *iter->second;
00741 
00742       if ((inst.instance_state_.view_state() & view_states) &&
00743           (inst.instance_state_.instance_state() & instance_states)) {
00744         for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
00745              item = item->next_data_sample_) {
00746           if (item->sample_state_ & sample_states
00747 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00748               && !item->coherent_change_
00749 #endif
00750               && item->registered_data_) {
00751             if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) {
00752               return true;
00753             }
00754           }
00755         }
00756       }
00757     }
00758 
00759     return false;
00760   }
00761 
00762   DDS::ReturnCode_t read_generic(
00763                                    OpenDDS::DCPS::DataReaderImpl::GenericBundle& gen,
00764                                    DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00765                                    DDS::InstanceStateMask instance_states,
00766                                    bool adjust_ref_count=false)
00767   {
00768 
00769     MessageSequenceType data;
00770     DDS::ReturnCode_t rc;
00771     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00772                       guard,
00773                       this->sample_lock_,
00774                       DDS::RETCODE_ERROR);
00775     {
00776       rc = read_i(data, gen.info_,
00777                   DDS::LENGTH_UNLIMITED,
00778                   sample_states, view_states, instance_states, 0);
00779       if (true == adjust_ref_count ) {
00780         data.increment_references();
00781       }
00782     }
00783     gen.samples_.reserve(data.length());
00784     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00785       gen.samples_.push_back(&data[i]);
00786     }
00787     return rc;
00788 
00789   }
00790 
00791   DDS::InstanceHandle_t lookup_instance_generic(const void* data)
00792   {
00793     return lookup_instance(*static_cast<const MessageType*>(data));
00794   }
00795 
00796   virtual DDS::ReturnCode_t take(
00797                                  OpenDDS::DCPS::AbstractSamples& samples,
00798                                  DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00799                                  DDS::InstanceStateMask instance_states)
00800   {
00801 
00802     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00803                       guard,
00804                       this->sample_lock_,
00805                       DDS::RETCODE_ERROR);
00806 
00807     MessageSequenceType data;
00808     DDS::SampleInfoSeq infos;
00809     DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
00810                                   sample_states, view_states, instance_states, 0);
00811 
00812     samples.reserve(data.length());
00813 
00814     for (CORBA::ULong i = 0; i < data.length(); ++i) {
00815       samples.push_back(infos[i], &data[i]);
00816     }
00817 
00818     return rc;
00819   }
00820 
00821   DDS::ReturnCode_t read_instance_generic(void*& data,
00822                                           DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00823                                           DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00824                                           DDS::InstanceStateMask instance_states)
00825   {
00826     MessageSequenceType dataseq;
00827     DDS::SampleInfoSeq infoseq;
00828     const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
00829                                                  DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
00830                                                  instance_states, 0);
00831     if (rc != DDS::RETCODE_NO_DATA)
00832       {
00833         const CORBA::ULong last = dataseq.length() - 1;
00834         data = new MessageType(dataseq[last]);
00835         info = infoseq[last];
00836       }
00837     return rc;
00838   }
00839 
00840   DDS::ReturnCode_t read_next_instance_generic(void*& data,
00841                                                DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00842                                                DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00843                                                DDS::InstanceStateMask instance_states)
00844   {
00845     MessageSequenceType dataseq;
00846     DDS::SampleInfoSeq infoseq;
00847     const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
00848                                                       DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
00849                                                       instance_states, 0);
00850     if (rc != DDS::RETCODE_NO_DATA)
00851       {
00852         const CORBA::ULong last = dataseq.length() - 1;
00853         data = new MessageType(dataseq[last]);
00854         info = infoseq[last];
00855       }
00856     return rc;
00857   }
00858 
00859 #endif
00860 
00861   DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
00862                                              DDS::ViewStateKind view)
00863   {
00864     using namespace OpenDDS::DCPS;
00865     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
00866                      DDS::HANDLE_NIL);
00867 
00868 #ifndef OPENDDS_NO_MULTI_TOPIC
00869     DDS::TopicDescription_var descr = get_topicdescription();
00870     if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
00871       if (!mt->filter(sample)) {
00872         return DDS::HANDLE_NIL;
00873       }
00874     }
00875 #endif
00876 
00877     get_subscriber_servant()->data_received(this);
00878 
00879     DDS::InstanceHandle_t inst = lookup_instance(sample);
00880     bool filtered = false;
00881     SubscriptionInstance_rch instance;
00882 
00883     // Call store_instance_data() once or twice, depending on if we need to
00884     // process the INSTANCE_REGISTRATION.  In either case, store_instance_data()
00885     // owns the memory for the sample and it must come from the correct allocator.
00886     for (int i = 0; i < 2; ++i) {
00887       if (i == 0 && inst != DDS::HANDLE_NIL) continue;
00888 
00889       DataSampleHeader header;
00890       const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
00891       header.message_id_ = static_cast<char>(msg);
00892       bool just_registered;
00893       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
00894       store_instance_data(move(data), header, instance, just_registered, filtered);
00895       if (instance) inst = instance->instance_handle_;
00896     }
00897 
00898     if (!filtered) {
00899       if (view == DDS::NOT_NEW_VIEW_STATE) {
00900         if (instance) instance->instance_state_.accessed();
00901       }
00902       notify_read_conditions();
00903     }
00904     return inst;
00905   }
00906 
00907   void set_instance_state(DDS::InstanceHandle_t instance,
00908                           DDS::InstanceStateKind state)
00909   {
00910     using namespace OpenDDS::DCPS;
00911     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00912 
00913     SubscriptionInstance_rch si = get_handle_instance(instance);
00914     if (si && state != DDS::ALIVE_INSTANCE_STATE) {
00915       DataSampleHeader header;
00916       const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
00917         ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
00918       header.message_id_ = static_cast<char>(msg);
00919       bool just_registered, filtered;
00920       unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
00921       get_key_value(*data, instance);
00922       store_instance_data(move(data), header, si, just_registered, filtered);
00923       if (!filtered)
00924       {
00925         notify_read_conditions();
00926       }
00927     }
00928   }
00929 
00930   virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00931                                OpenDDS::DCPS::SubscriptionInstance_rch& instance)
00932   {
00933     //!!! caller should already have the sample_lock_
00934 
00935     MessageType data;
00936 
00937     const bool cdr = sample.header_.cdr_encapsulation_;
00938 
00939     OpenDDS::DCPS::Serializer ser(
00940       sample.sample_.get(),
00941       sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00942       cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00943           : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00944 
00945     if (cdr) {
00946       ACE_CDR::ULong header;
00947       if (!(ser >> header)) {
00948         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00949                   ACE_TEXT("deserialization header failed.\n"),
00950                   TraitsType::type_name()));
00951         return;
00952       }
00953 
00954       if (Serializer::use_rti_serialization()) {
00955         // Start counting byte-offset AFTER header
00956         ser.reset_alignment();
00957       }
00958     }
00959 
00960     if (sample.header_.key_fields_only_) {
00961       ser >> OpenDDS::DCPS::KeyOnly< MessageType>(data);
00962     } else {
00963       ser >> data;
00964     }
00965 
00966     if (!ser.good_bit()) {
00967       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00968                  ACE_TEXT("deserialization failed.\n"),
00969                  TraitsType::type_name()));
00970       return;
00971     }
00972 
00973     DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
00974     typename InstanceMap::const_iterator const it = instance_map_.find(data);
00975     if (it != instance_map_.end()) {
00976       handle = it->second;
00977     }
00978 
00979     if (handle == DDS::HANDLE_NIL) {
00980       instance.reset();
00981     } else {
00982       instance = get_handle_instance(handle);
00983     }
00984   }
00985 
00986   virtual void qos_change(const DDS::DataReaderQos& qos)
00987   {
00988     // reliability is not changeable, just time_based_filter
00989     if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00990       if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
00991         const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
00992         if (qos_.time_based_filter.minimum_separation != zero) {
00993           if (qos.time_based_filter.minimum_separation != zero) {
00994             const ACE_Time_Value new_interval = duration_to_time_value(qos.time_based_filter.minimum_separation);
00995             filter_delayed_handler_->reset_interval(new_interval);
00996           } else {
00997             filter_delayed_handler_->cancel();
00998           }
00999         }
01000         // else no existing timers to change/cancel
01001       }
01002       // else no qos change so nothing to change
01003     }
01004 
01005     DataReaderImpl::qos_change(qos);
01006   }
01007 
01008 protected:
01009 
01010   virtual void dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample& sample,
01011                              OpenDDS::DCPS::SubscriptionInstance_rch& instance,
01012                              bool & just_registered,
01013                              bool & filtered,
01014                              OpenDDS::DCPS::MarshalingType marshaling_type)
01015   {
01016     unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
01017     const bool cdr = sample.header_.cdr_encapsulation_;
01018 
01019     OpenDDS::DCPS::Serializer ser(
01020                                   sample.sample_.get(),
01021                                   sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01022                                   cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE);
01023 
01024     if (cdr) {
01025       ACE_CDR::ULong header;
01026       if (!(ser >> header)) {
01027         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01028                   ACE_TEXT("deserialization header failed, dropping sample.\n"),
01029                   TraitsType::type_name()));
01030         return;
01031       }
01032 
01033       if (Serializer::use_rti_serialization()) {
01034         // Start counting byte-offset AFTER header
01035         ser.reset_alignment();
01036       }
01037     }
01038 
01039     if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
01040       ser >> OpenDDS::DCPS::KeyOnly< MessageType>(*data);
01041     } else {
01042       ser >> *data;
01043     }
01044 
01045     if (!ser.good_bit()) {
01046       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01047                  ACE_TEXT("deserialization failed, dropping sample.\n"),
01048                  TraitsType::type_name()));
01049       return;
01050     }
01051 
01052 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01053     if (!sample.header_.content_filter_) { // if this is true, the writer has already filtered
01054       using OpenDDS::DCPS::ContentFilteredTopicImpl;
01055       if (content_filtered_topic_) {
01056         if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA
01057             && !content_filtered_topic_->filter(static_cast<MessageType&>(*data))) {
01058           filtered = true;
01059           return;
01060         }
01061       }
01062     }
01063 #endif
01064 
01065     store_instance_data(move(data), sample.header_, instance, just_registered, filtered);
01066   }
01067 
01068   virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample& sample,
01069                                   OpenDDS::DCPS::SubscriptionInstance_rch& instance)
01070   {
01071     //!!! caller should already have the sample_lock_
01072 
01073     // The data sample in this dispose message does not contain any valid data.
01074     // What it needs here is the key value to identify the instance to dispose.
01075     // The demarshal push this "sample" to received sample list so the user
01076     // can be notified the dispose event.
01077     bool just_registered = false;
01078     bool filtered = false;
01079     OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
01080     if (sample.header_.key_fields_only_) {
01081       marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
01082     }
01083     this->dds_demarshal(sample, instance, just_registered, filtered, marshaling);
01084   }
01085 
01086   virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
01087   {
01088     filter_delayed_handler_->drop_sample(instance->instance_handle_);
01089 
01090 
01091     instance->instance_state_.cancel_release();
01092 
01093     while (instance->rcvd_samples_.size_ > 0)
01094       {
01095         OpenDDS::DCPS::ReceivedDataElement* head =
01096           instance->rcvd_samples_.remove_head();
01097         head->dec_ref();
01098       }
01099   }
01100 
01101   virtual void release_instance_i (DDS::InstanceHandle_t handle)
01102   {
01103     typename InstanceMap::iterator const the_end = instance_map_.end ();
01104     typename InstanceMap::iterator it = instance_map_.begin ();
01105     while (it != the_end)
01106       {
01107         if (it->second == handle)
01108           {
01109             typename InstanceMap::iterator curIt = it;
01110             ++ it;
01111             instance_map_.erase (curIt);
01112           }
01113         else
01114           ++ it;
01115       }
01116   }
01117 
01118 private:
01119 
01120   DDS::ReturnCode_t read_i (
01121                               MessageSequenceType & received_data,
01122                               DDS::SampleInfoSeq & info_seq,
01123                               ::CORBA::Long max_samples,
01124                               DDS::SampleStateMask sample_states,
01125                               DDS::ViewStateMask view_states,
01126                               DDS::InstanceStateMask instance_states,
01127 #ifndef OPENDDS_NO_QUERY_CONDITION
01128                               DDS::QueryCondition_ptr a_condition)
01129 #else
01130   int ignored)
01131 #endif
01132 {
01133 #ifdef OPENDDS_NO_QUERY_CONDITION
01134   ACE_UNUSED_ARG(ignored);
01135 #endif
01136 
01137   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01138 
01139 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01140   if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01141       && ! this->coherent_) {
01142     return DDS::RETCODE_PRECONDITION_NOT_MET;
01143   }
01144 
01145   bool group_coherent_ordered
01146     = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01147     && this->subqos_.presentation.coherent_access
01148     && this->subqos_.presentation.ordered_access;
01149 
01150   if (group_coherent_ordered && this->coherent_) {
01151     max_samples = 1;
01152   }
01153 #endif
01154 
01155   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01156     results(this, received_data, info_seq, max_samples,
01157             this->subqos_.presentation,
01158 #ifndef OPENDDS_NO_QUERY_CONDITION
01159             a_condition,
01160 #endif
01161             OpenDDS::DCPS::DDS_OPERATION_READ);
01162 
01163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01164   if (! group_coherent_ordered) {
01165 #endif
01166     for (typename InstanceMap::iterator it = instance_map_.begin(),
01167            the_end = instance_map_.end(); it != the_end; ++it)
01168       {
01169         DDS::InstanceHandle_t handle = it->second;
01170 
01171         OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01172 
01173         if ((inst->instance_state_.view_state() & view_states) &&
01174             (inst->instance_state_.instance_state() & instance_states))
01175           {
01176             size_t i(0);
01177             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01178                  item != 0; item = item->next_data_sample_)
01179               {
01180                 if (item->sample_state_ & sample_states
01181 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01182                     && !item->coherent_change_
01183 #endif
01184                     )
01185                   {
01186                     results.insert_sample(item, inst, ++i);
01187                   }
01188               }
01189           }
01190       }
01191 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01192   }
01193   else {
01194     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01195     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01196   }
01197 #endif
01198 
01199   results.copy_to_user();
01200 
01201   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01202   if (received_data.length())
01203     {
01204       ret = DDS::RETCODE_OK;
01205       if (received_data.maximum() == 0) //using ZeroCopy
01206         {
01207           received_data_p.set_loaner(this);
01208         }
01209     }
01210 
01211   post_read_or_take();
01212 
01213   return ret;
01214 }
01215 
01216 DDS::ReturnCode_t take_i (
01217                             MessageSequenceType & received_data,
01218                             DDS::SampleInfoSeq & info_seq,
01219                             ::CORBA::Long max_samples,
01220                             DDS::SampleStateMask sample_states,
01221                             DDS::ViewStateMask view_states,
01222                             DDS::InstanceStateMask instance_states,
01223 #ifndef OPENDDS_NO_QUERY_CONDITION
01224                             DDS::QueryCondition_ptr a_condition)
01225 #else
01226   int ignored)
01227 #endif
01228 {
01229 #ifdef OPENDDS_NO_QUERY_CONDITION
01230   ACE_UNUSED_ARG(ignored);
01231 #endif
01232 
01233   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01234 
01235 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01236   if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01237       && ! this->coherent_) {
01238     return DDS::RETCODE_PRECONDITION_NOT_MET;
01239   }
01240 
01241   bool group_coherent_ordered
01242     = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01243     && this->subqos_.presentation.coherent_access
01244     && this->subqos_.presentation.ordered_access;
01245 
01246   if (group_coherent_ordered && this->coherent_) {
01247     max_samples = 1;
01248   }
01249 #endif
01250 
01251   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01252     results(this, received_data, info_seq, max_samples,
01253             this->subqos_.presentation,
01254 #ifndef OPENDDS_NO_QUERY_CONDITION
01255             a_condition,
01256 #endif
01257             OpenDDS::DCPS::DDS_OPERATION_TAKE);
01258 
01259 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01260   if (! group_coherent_ordered) {
01261 #endif
01262 
01263     for (typename InstanceMap::iterator it = instance_map_.begin(),
01264            the_end = instance_map_.end(); it != the_end; ++it)
01265       {
01266         DDS::InstanceHandle_t handle = it->second;
01267 
01268         OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01269 
01270         if ((inst->instance_state_.view_state() & view_states) &&
01271             (inst->instance_state_.instance_state() & instance_states))
01272           {
01273             size_t i(0);
01274             for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01275                  item != 0; item = item->next_data_sample_)
01276               {
01277                 if (item->sample_state_ & sample_states
01278 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01279                     && !item->coherent_change_
01280 #endif
01281                     )
01282                   {
01283                     results.insert_sample(item, inst, ++i);
01284                   }
01285               }
01286           }
01287       }
01288 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01289   }
01290   else {
01291     OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01292     results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01293   }
01294 #endif
01295 
01296   results.copy_to_user();
01297 
01298   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01299   if (received_data.length())
01300     {
01301       ret = DDS::RETCODE_OK;
01302       if (received_data.maximum() == 0) //using ZeroCopy
01303         {
01304           received_data_p.set_loaner(this);
01305         }
01306     }
01307 
01308   post_read_or_take();
01309   return ret;
01310 }
01311 
01312 DDS::ReturnCode_t read_instance_i (
01313                                      MessageSequenceType & received_data,
01314                                      DDS::SampleInfoSeq & info_seq,
01315                                      ::CORBA::Long max_samples,
01316                                      DDS::InstanceHandle_t a_handle,
01317                                      DDS::SampleStateMask sample_states,
01318                                      DDS::ViewStateMask view_states,
01319                                      DDS::InstanceStateMask instance_states,
01320 #ifndef OPENDDS_NO_QUERY_CONDITION
01321                                      DDS::QueryCondition_ptr a_condition)
01322 #else
01323 int ignored)
01324 #endif
01325 {
01326 #ifdef OPENDDS_NO_QUERY_CONDITION
01327   ACE_UNUSED_ARG(ignored);
01328 #endif
01329 
01330   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01331 
01332   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01333     results(this, received_data, info_seq, max_samples,
01334             this->subqos_.presentation,
01335 #ifndef OPENDDS_NO_QUERY_CONDITION
01336             a_condition,
01337 #endif
01338             OpenDDS::DCPS::DDS_OPERATION_READ);
01339 
01340   OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01341   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01342 
01343   InstanceState& state_obj = inst->instance_state_;
01344   bool valid_view_state = state_obj.view_state() & view_states;
01345   bool valid_instance_state = state_obj.instance_state() & instance_states;
01346   if (valid_view_state && valid_instance_state)
01347     {
01348       size_t i(0);
01349       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01350            item; item = item->next_data_sample_)
01351         {
01352           if (item->sample_state_ & sample_states
01353 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01354               && !item->coherent_change_
01355 #endif
01356               )
01357             {
01358               results.insert_sample(item, inst, ++i);
01359             }
01360         }
01361       }
01362     else
01363       {
01364         if (OpenDDS::DCPS::DCPS_debug_level >= 8) {
01365           OPENDDS_STRING msg;
01366           if (!valid_view_state) {
01367             msg += "view state is not valid";
01368             if (!valid_instance_state) {
01369               msg += " and ";
01370             }
01371           }
01372           if (!valid_instance_state) {
01373             msg = msg
01374               + "instance state is "
01375               + state_obj.instance_state_string()
01376               + " while the validity mask is "
01377               + InstanceState::instance_state_string(instance_states);
01378           }
01379           GuidConverter conv(get_subscription_id());
01380           ACE_DEBUG((LM_DEBUG,
01381             ACE_TEXT(
01382               "(%P|%t) DataReaderImpl_T::read_instance_i: "
01383               "will return no data reading sub %C because:\n  %C\n"
01384             ),
01385             OPENDDS_STRING(conv).c_str(), msg.c_str()
01386           ));
01387         }
01388       }
01389 
01390   results.copy_to_user();
01391 
01392   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01393   if (received_data.length())
01394     {
01395       ret = DDS::RETCODE_OK;
01396       if (received_data.maximum() == 0) //using ZeroCopy
01397         {
01398           received_data_p.set_loaner(this);
01399         }
01400     }
01401 
01402   post_read_or_take();
01403   return ret;
01404 }
01405 
01406 DDS::ReturnCode_t take_instance_i (
01407                                    MessageSequenceType & received_data,
01408                                    DDS::SampleInfoSeq & info_seq,
01409                                    ::CORBA::Long max_samples,
01410                                    DDS::InstanceHandle_t a_handle,
01411                                    DDS::SampleStateMask sample_states,
01412                                    DDS::ViewStateMask view_states,
01413                                    DDS::InstanceStateMask instance_states,
01414 #ifndef OPENDDS_NO_QUERY_CONDITION
01415                                    DDS::QueryCondition_ptr a_condition)
01416 #else
01417                                    int ignored)
01418 #endif
01419 {
01420 #ifdef OPENDDS_NO_QUERY_CONDITION
01421   ACE_UNUSED_ARG(ignored);
01422 #endif
01423 
01424   typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01425 
01426   OpenDDS::DCPS::RakeResults< MessageSequenceType >
01427     results(this, received_data, info_seq, max_samples,
01428             this->subqos_.presentation,
01429 #ifndef OPENDDS_NO_QUERY_CONDITION
01430             a_condition,
01431 #endif
01432             OpenDDS::DCPS::DDS_OPERATION_TAKE);
01433 
01434   OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01435   if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01436 
01437   if ((inst->instance_state_.view_state() & view_states) &&
01438       (inst->instance_state_.instance_state() & instance_states))
01439     {
01440       size_t i(0);
01441       for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01442            item; item = item->next_data_sample_)
01443         {
01444           if (item->sample_state_ & sample_states
01445 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01446               && !item->coherent_change_
01447 #endif
01448               )
01449             {
01450               results.insert_sample(item, inst, ++i);
01451             }
01452         }
01453     }
01454 
01455   results.copy_to_user();
01456 
01457   DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01458   if (received_data.length())
01459     {
01460       ret = DDS::RETCODE_OK;
01461       if (received_data.maximum() == 0) //using ZeroCopy
01462         {
01463           received_data_p.set_loaner(this);
01464         }
01465     }
01466 
01467   post_read_or_take();
01468   return ret;
01469 }
01470 
01471 DDS::ReturnCode_t read_next_instance_i (
01472                                         MessageSequenceType & received_data,
01473                                         DDS::SampleInfoSeq & info_seq,
01474                                         ::CORBA::Long max_samples,
01475                                         DDS::InstanceHandle_t a_handle,
01476                                         DDS::SampleStateMask sample_states,
01477                                         DDS::ViewStateMask view_states,
01478                                         DDS::InstanceStateMask instance_states,
01479 #ifndef OPENDDS_NO_QUERY_CONDITION
01480                                         DDS::QueryCondition_ptr a_condition)
01481 #else
01482                                         int ignored)
01483 #endif
01484 {
01485 #ifdef OPENDDS_NO_QUERY_CONDITION
01486   ACE_UNUSED_ARG(ignored);
01487 #endif
01488 
01489   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01490 
01491   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01492                     guard,
01493                     this->sample_lock_,
01494                     DDS::RETCODE_ERROR);
01495 
01496   typename InstanceMap::iterator it;
01497   typename InstanceMap::iterator const the_end = instance_map_.end ();
01498 
01499   if (a_handle == DDS::HANDLE_NIL)
01500     {
01501       it = instance_map_.begin ();
01502     }
01503   else
01504     {
01505       for (it = instance_map_.begin ();
01506            it != the_end;
01507            ++it)
01508         {
01509           if (a_handle == it->second)
01510             {
01511               ++it;
01512               break;
01513             }
01514         }
01515     }
01516 
01517   for (; it != the_end; ++it)
01518     {
01519       handle = it->second;
01520       DDS::ReturnCode_t const status =
01521           read_instance_i(received_data, info_seq, max_samples, handle,
01522                           sample_states, view_states, instance_states,
01523 #ifndef OPENDDS_NO_QUERY_CONDITION
01524                           a_condition);
01525 #else
01526       0);
01527 #endif
01528   if (status != DDS::RETCODE_NO_DATA)
01529     {
01530       post_read_or_take();
01531       return status;
01532     }
01533 }
01534 
01535 post_read_or_take();
01536 return DDS::RETCODE_NO_DATA;
01537 }
01538 
01539 DDS::ReturnCode_t take_next_instance_i (
01540                                         MessageSequenceType & received_data,
01541                                         DDS::SampleInfoSeq & info_seq,
01542                                         ::CORBA::Long max_samples,
01543                                         DDS::InstanceHandle_t a_handle,
01544                                         DDS::SampleStateMask sample_states,
01545                                         DDS::ViewStateMask view_states,
01546                                         DDS::InstanceStateMask instance_states,
01547 #ifndef OPENDDS_NO_QUERY_CONDITION
01548                                         DDS::QueryCondition_ptr a_condition)
01549 #else
01550                                         int ignored)
01551 #endif
01552 {
01553 #ifdef OPENDDS_NO_QUERY_CONDITION
01554   ACE_UNUSED_ARG(ignored);
01555 #endif
01556 
01557   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01558 
01559   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01560                     guard,
01561                     this->sample_lock_,
01562                     DDS::RETCODE_ERROR);
01563 
01564   typename InstanceMap::iterator it;
01565   typename InstanceMap::iterator const the_end = instance_map_.end ();
01566 
01567   if (a_handle == DDS::HANDLE_NIL)
01568     {
01569       it = instance_map_.begin ();
01570     }
01571   else
01572     {
01573       for (it = instance_map_.begin (); it != the_end; ++it)
01574         {
01575           if (a_handle == it->second)
01576             {
01577               ++it;
01578               break;
01579             }
01580         }
01581     }
01582 
01583   for (; it != the_end; ++it)
01584     {
01585       handle = it->second;
01586       DDS::ReturnCode_t const status =
01587           take_instance_i(received_data, info_seq, max_samples, handle,
01588                           sample_states, view_states, instance_states,
01589 #ifndef OPENDDS_NO_QUERY_CONDITION
01590                           a_condition);
01591 #else
01592       0);
01593 #endif
01594     if (status != DDS::RETCODE_NO_DATA)
01595       {
01596         total_samples();  // see if we are empty
01597         post_read_or_take();
01598         return status;
01599       }
01600   }
01601   post_read_or_take();
01602   return DDS::RETCODE_NO_DATA;
01603 }
01604 
01605 void store_instance_data(
01606                          unique_ptr<MessageTypeWithAllocator> instance_data,
01607                          const OpenDDS::DCPS::DataSampleHeader& header,
01608                          OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr,
01609                          bool & just_registered,
01610                          bool & filtered)
01611 {
01612   const bool is_dispose_msg =
01613     header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
01614     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01615   const bool is_unregister_msg =
01616     header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
01617     header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01618 
01619   // not filtering any data, except what is specifically identified as filtered below
01620   filtered = false;
01621 
01622   DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01623 
01624   //!!! caller should already have the sample_lock_
01625   //We will unlock it before calling into listeners
01626 
01627   typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
01628 
01629   if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end())
01630   {
01631      return;
01632   }
01633 
01634 
01635   if (it == instance_map_.end())
01636   {
01637     std::size_t instances_size = 0;
01638     {
01639       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01640       instances_size = instances_.size();
01641     }
01642     if ((this->qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
01643       ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances))
01644     {
01645       DDS::DataReaderListener_var listener
01646         = listener_for (DDS::SAMPLE_REJECTED_STATUS);
01647 
01648       set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
01649 
01650       sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
01651       ++sample_rejected_status_.total_count;
01652       ++sample_rejected_status_.total_count_change;
01653       sample_rejected_status_.last_instance_handle = handle;
01654 
01655       if (!CORBA::is_nil(listener.in()))
01656       {
01657         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01658 
01659         listener->on_sample_rejected(this, sample_rejected_status_);
01660         sample_rejected_status_.total_count_change = 0;
01661       }  // do we want to do something if listener is nil???
01662       notify_status_condition_no_sample_lock();
01663 
01664       return;
01665     }
01666 
01667 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01668     SharedInstanceMap_rch inst;
01669     bool new_handle = true;
01670     if (this->is_exclusive_ownership_) {
01671       OwnershipManagerPtr owner_manager = this->ownership_manager();
01672 
01673       if (!owner_manager || owner_manager->instance_lock_acquire () != 0) {
01674         ACE_ERROR ((LM_ERROR,
01675                     ACE_TEXT("(%P|%t) ")
01676                     ACE_TEXT("%CDataReaderImpl::")
01677                     ACE_TEXT("store_instance_data, ")
01678                     ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name()));
01679         return;
01680       }
01681 
01682       inst = dynamic_rchandle_cast<SharedInstanceMap>(
01683         owner_manager->get_instance_map(this->topic_servant_->type_name(), this));
01684       if (inst != 0) {
01685         typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
01686         if (iter != inst->end ()) {
01687           handle = iter->second;
01688           new_handle = false;
01689         }
01690       }
01691     }
01692 #endif
01693 
01694     just_registered = true;
01695     DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
01696     handle = handle == DDS::HANDLE_NIL ? this->get_next_handle( key) : handle;
01697     OpenDDS::DCPS::SubscriptionInstance_rch instance =
01698       OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
01699         this,
01700         this->qos_,
01701         ref(this->instances_lock_),
01702         handle);
01703 
01704     instance->instance_handle_ = handle;
01705 
01706     {
01707       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01708       int ret = OpenDDS::DCPS::bind(instances_, handle, instance);
01709 
01710       if (ret != 0)
01711       {
01712         ACE_ERROR ((LM_ERROR,
01713                     ACE_TEXT("(%P|%t) ")
01714                     ACE_TEXT("%CDataReaderImpl::")
01715                     ACE_TEXT("store_instance_data, ")
01716                     ACE_TEXT("insert handle failed. \n"), TraitsType::type_name()));
01717         return;
01718       }
01719     }
01720 
01721 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01722     OwnershipManagerPtr owner_manager = this->ownership_manager();
01723 
01724     if (owner_manager) {
01725       if (!inst) {
01726         inst = make_rch<SharedInstanceMap>();
01727         owner_manager->set_instance_map(
01728           this->topic_servant_->type_name(),
01729           inst,
01730           this);
01731       }
01732 
01733       if (new_handle) {
01734         std::pair<typename InstanceMap::iterator, bool> bpair =
01735           inst->insert(typename InstanceMap::value_type(*instance_data,
01736             handle));
01737         if (bpair.second == false)
01738         {
01739           ACE_ERROR ((LM_ERROR,
01740                       ACE_TEXT("(%P|%t) ")
01741                       ACE_TEXT("%CDataReaderImpl::")
01742                       ACE_TEXT("store_instance_data, ")
01743                       ACE_TEXT("insert to participant scope %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01744           return;
01745         }
01746       }
01747 
01748       if (owner_manager->instance_lock_release () != 0) {
01749         ACE_ERROR ((LM_ERROR,
01750                     ACE_TEXT("(%P|%t) ")
01751                     ACE_TEXT("%CDataReaderImpl::")
01752                     ACE_TEXT("store_instance_data, ")
01753                     ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name()));
01754         return;
01755       }
01756     }
01757 #endif
01758 
01759     std::pair<typename InstanceMap::iterator, bool> bpair =
01760       instance_map_.insert(typename InstanceMap::value_type(*instance_data,
01761         handle));
01762     if (bpair.second == false)
01763     {
01764       ACE_ERROR ((LM_ERROR,
01765                   ACE_TEXT("(%P|%t) ")
01766                   ACE_TEXT("%CDataReaderImpl::")
01767                   ACE_TEXT("store_instance_data, ")
01768                   ACE_TEXT("insert %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01769       return;
01770     }
01771   }
01772   else
01773   {
01774     just_registered = false;
01775     handle = it->second;
01776   }
01777 
01778   if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
01779   {
01780     instance_ptr = get_handle_instance(handle);
01781 
01782     if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
01783     {
01784       filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
01785 
01786       ACE_Time_Value filter_time_expired;
01787       if (!filtered &&
01788           time_based_filter_instance(instance_ptr, filter_time_expired)) {
01789         filtered = true;
01790         if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01791           filter_delayed_handler_->delay_sample(handle, move(instance_data), header, just_registered, filter_time_expired);
01792 
01793         }
01794       } else {
01795         // nothing time based filtered now
01796         filter_delayed_handler_->clear_sample(handle);
01797 
01798       }
01799 
01800       if (filtered)
01801       {
01802         return;
01803       }
01804     }
01805 
01806     finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
01807   }
01808   else
01809   {
01810     instance_ptr = this->get_handle_instance(handle);
01811     instance_ptr->instance_state_.lively(header.publication_id_);
01812   }
01813 }
01814 
01815 void finish_store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data, const DataSampleHeader& header,
01816   SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg )
01817 {
01818   if ((this->qos_.resource_limits.max_samples_per_instance !=
01819         DDS::LENGTH_UNLIMITED) &&
01820       (instance_ptr->rcvd_samples_.size_ >=
01821         this->qos_.resource_limits.max_samples_per_instance)) {
01822 
01823     // According to spec 1.2, Samples that contain no data do not
01824     // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01825     // so do not remove the oldest sample when unregister/dispose
01826     // message arrives.
01827 
01828     if (!is_dispose_msg && !is_unregister_msg
01829       && instance_ptr->rcvd_samples_.head_->sample_state_
01830       == DDS::NOT_READ_SAMPLE_STATE)
01831     {
01832       // for now the implemented QoS means that if the head sample
01833       // is NOT_READ then none are read.
01834       // TBD - in future we will reads may not read in order so
01835       //       just looking at the head will not be enough.
01836       DDS::DataReaderListener_var listener
01837         = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01838 
01839       set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01840 
01841       sample_rejected_status_.last_reason =
01842         DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
01843       ++sample_rejected_status_.total_count;
01844       ++sample_rejected_status_.total_count_change;
01845       sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01846 
01847       if (!CORBA::is_nil(listener.in()))
01848       {
01849         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01850 
01851         listener->on_sample_rejected(this, sample_rejected_status_);
01852         sample_rejected_status_.total_count_change = 0;
01853       }  // do we want to do something if listener is nil???
01854       notify_status_condition_no_sample_lock();
01855       return;
01856     }
01857     else if (!is_dispose_msg && !is_unregister_msg)
01858     {
01859       // Discard the oldest previously-read sample
01860       OpenDDS::DCPS::ReceivedDataElement *item =
01861         instance_ptr->rcvd_samples_.head_;
01862       instance_ptr->rcvd_samples_.remove(item);
01863       item->dec_ref();
01864     }
01865   }
01866   else if (this->qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
01867   {
01868     CORBA::Long total_samples = 0;
01869     {
01870       ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01871       for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
01872         iter != instances_.end();
01873         ++iter) {
01874         OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
01875 
01876         total_samples += (CORBA::Long) ptr->rcvd_samples_.size_;
01877       }
01878     }
01879 
01880     if (total_samples >= this->qos_.resource_limits.max_samples)
01881     {
01882       // According to spec 1.2, Samples that contain no data do not
01883       // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
01884       // so do not remove the oldest sample when unregister/dispose
01885       // message arrives.
01886 
01887       if (!is_dispose_msg && !is_unregister_msg
01888         && instance_ptr->rcvd_samples_.head_->sample_state_
01889         == DDS::NOT_READ_SAMPLE_STATE)
01890       {
01891         // for now the implemented QoS means that if the head sample
01892         // is NOT_READ then none are read.
01893         // TBD - in future we will reads may not read in order so
01894         //       just looking at the head will not be enough.
01895         DDS::DataReaderListener_var listener
01896           = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01897 
01898         set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01899 
01900         sample_rejected_status_.last_reason =
01901           DDS::REJECTED_BY_SAMPLES_LIMIT;
01902         ++sample_rejected_status_.total_count;
01903         ++sample_rejected_status_.total_count_change;
01904         sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01905         if (!CORBA::is_nil(listener.in()))
01906         {
01907           ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01908 
01909           listener->on_sample_rejected(this, sample_rejected_status_);
01910           sample_rejected_status_.total_count_change = 0;
01911         }  // do we want to do something if listener is nil???
01912         notify_status_condition_no_sample_lock();
01913 
01914         return;
01915       }
01916       else if (!is_dispose_msg && !is_unregister_msg)
01917       {
01918         // Discard the oldest previously-read sample
01919         OpenDDS::DCPS::ReceivedDataElement *item =
01920           instance_ptr->rcvd_samples_.head_;
01921         instance_ptr->rcvd_samples_.remove(item);
01922         item->dec_ref();
01923       }
01924     }
01925   }
01926 
01927   if (is_dispose_msg || is_unregister_msg)
01928   {
01929     instance_data.reset();
01930   }
01931 
01932   bool event_notify = false;
01933 
01934   if (is_dispose_msg) {
01935     event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_);
01936   }
01937 
01938   if (is_unregister_msg) {
01939     if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) {
01940       event_notify = true;
01941     }
01942   }
01943 
01944   if (!is_dispose_msg && !is_unregister_msg) {
01945     event_notify = true;
01946     instance_ptr->instance_state_.data_was_received(header.publication_id_);
01947   }
01948 
01949   if (!event_notify) {
01950     return;
01951   }
01952 
01953   OpenDDS::DCPS::ReceivedDataElement *ptr =
01954     new (*rd_allocator_.get()) OpenDDS::DCPS::ReceivedDataElementWithType<MessageTypeWithAllocator>(header,instance_data.release(), &this->sample_lock_);
01955 
01956   ptr->disposed_generation_count_ =
01957     instance_ptr->instance_state_.disposed_generation_count();
01958   ptr->no_writers_generation_count_ =
01959     instance_ptr->instance_state_.no_writers_generation_count();
01960 
01961   instance_ptr->last_sequence_ = header.sequence_;
01962 
01963   instance_ptr->rcvd_strategy_->add(ptr);
01964 
01965   if (! is_dispose_msg  && ! is_unregister_msg
01966       && instance_ptr->rcvd_samples_.size_ > get_depth())
01967     {
01968       OpenDDS::DCPS::ReceivedDataElement* head_ptr =
01969         instance_ptr->rcvd_samples_.head_;
01970 
01971       instance_ptr->rcvd_samples_.remove(head_ptr);
01972 
01973       if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
01974         {
01975           DDS::DataReaderListener_var listener
01976             = listener_for (DDS::SAMPLE_LOST_STATUS);
01977 
01978           ++sample_lost_status_.total_count;
01979           ++sample_lost_status_.total_count_change;
01980 
01981           set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
01982 
01983           if (!CORBA::is_nil(listener.in()))
01984             {
01985               ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01986 
01987               listener->on_sample_lost(this, sample_lost_status_);
01988 
01989               sample_lost_status_.total_count_change = 0;
01990             }
01991 
01992           notify_status_condition_no_sample_lock();
01993         }
01994 
01995       head_ptr->dec_ref();
01996     }
01997 
01998 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01999   if (! ptr->coherent_change_) {
02000 #endif
02001     RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant ();
02002     if (!sub)
02003       return;
02004 
02005     sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true);
02006 
02007     set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
02008 
02009     DDS::SubscriberListener_var sub_listener =
02010         sub->listener_for(DDS::DATA_ON_READERS_STATUS);
02011     if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_)
02012       {
02013         ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02014 
02015         sub_listener->on_data_on_readers(sub.in());
02016         sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02017       }
02018     else
02019       {
02020         sub->notify_status_condition();
02021 
02022         DDS::DataReaderListener_var listener =
02023             listener_for (DDS::DATA_AVAILABLE_STATUS);
02024 
02025         if (!CORBA::is_nil(listener.in()))
02026           {
02027             ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02028 
02029             listener->on_data_available(this);
02030             set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02031             sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02032           }
02033         else
02034           {
02035             notify_status_condition_no_sample_lock();
02036           }
02037       }
02038 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02039   }
02040 #endif
02041 }
02042 
02043 /// Release sample_lock_ during status notifications in store_instance_data()
02044 /// as the lock is not needed and could cause deadlock condition.
02045 /// See comments in member function implementation for details.
02046 void notify_status_condition_no_sample_lock()
02047 {
02048   // This member function avoids a deadlock condition which otherwise
02049   // could occur as follows:
02050   // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
02051   // eventually DataReaderImpl::sample_lock_ to lock in call to
02052   // DataReaderImpl::contains_samples().
02053   // Thread2: Call to DataReaderImpl::data_received()
02054   // causes DataReaderImpl::sample_lock_ to lock and eventually
02055   // during notify of status condition a call to WaitSet::signal()
02056   // causes WaitSet::lock_ to lock.
02057   // Because the DataReaderImpl::sample_lock_ is not needed during
02058   // status notification this member function is used in
02059   // store_instance_data() to release sample_lock_ before making
02060   // the notification.
02061   ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02062   notify_status_condition();
02063 }
02064 
02065 
02066 /// Common input read* & take* input processing and precondition checks
02067 DDS::ReturnCode_t check_inputs (
02068                                 const char* method_name,
02069                                 MessageSequenceType & received_data,
02070                                 DDS::SampleInfoSeq & info_seq,
02071                                 ::CORBA::Long max_samples)
02072 {
02073   typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data);
02074 
02075   // ---- start of preconditions common to read and take -----
02076   // SPEC ref v1.2 7.1.2.5.3.8 #1
02077   // NOTE: We can't check maximum() or release() here since those are
02078   //       implementation details of the sequences.  In general, the
02079   //       info_seq will have release() == true and maximum() == 0.
02080   //       If we're in zero-copy mode, the received_data will have
02081   //       release() == false and maximum() == 0.  If it's not
02082   //       zero-copy then received_data will have release == true()
02083   //       and maximum() == anything.
02084   if (received_data.length() != info_seq.length())
02085     {
02086       ACE_DEBUG((LM_DEBUG,
02087                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02088                  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
02089                  ACE_TEXT("sequences do not match.\n"),
02090                  TraitsType::type_name(),
02091                  method_name ));
02092       return DDS::RETCODE_PRECONDITION_NOT_MET;
02093     }
02094 
02095   //SPEC ref v1.2 7.1.2.5.3.8 #4
02096   if ((received_data.maximum() > 0) && (received_data.release() == false))
02097     {
02098       ACE_DEBUG((LM_DEBUG,
02099                  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02100                  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
02101                  ACE_TEXT("maximum %d and owns %d\n"),
02102                  TraitsType::type_name(),
02103                  method_name,
02104                  received_data.maximum(),
02105                  received_data.release() ));
02106 
02107       return DDS::RETCODE_PRECONDITION_NOT_MET;
02108     }
02109 
02110   if (received_data.maximum() == 0)
02111     {
02112       // not in SPEC but needed.
02113       if (max_samples == DDS::LENGTH_UNLIMITED)
02114         {
02115           max_samples =
02116             static_cast< ::CORBA::Long> (received_data_p.max_slots());
02117         }
02118     }
02119   else
02120     {
02121       if (max_samples == DDS::LENGTH_UNLIMITED)
02122         {
02123           //SPEC ref v1.2 7.1.2.5.3.8 #5a
02124           max_samples = received_data.maximum();
02125         }
02126       else if (
02127                max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
02128         {
02129           //SPEC ref v1.2 7.1.2.5.3.8 #5c
02130           ACE_DEBUG((LM_DEBUG,
02131                      ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02132                      ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
02133                      TraitsType::type_name(),
02134                      method_name,
02135                      max_samples,
02136                      received_data.maximum()));
02137           return DDS::RETCODE_PRECONDITION_NOT_MET;
02138         }
02139       //else
02140       //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
02141     }
02142 
02143   // The spec does not say what to do in this case but it appears to be a good thing.
02144   // Note: max_slots is the greater of the sequence's maximum and init_size.
02145   if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
02146     {
02147       max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
02148     }
02149   //---- end of preconditions common to read and take -----
02150 
02151   return DDS::RETCODE_OK;
02152 }
02153 
02154 class FilterDelayedHandler : public Watchdog {
02155 public:
02156   FilterDelayedHandler(DataReaderImpl_T<MessageType>& data_reader_impl)
02157   // Watchdog's interval_ only used for resetting current intervals
02158   : Watchdog(ACE_Time_Value(0))
02159   , data_reader_impl_(data_reader_impl)
02160   {
02161   }
02162 
02163   virtual ~FilterDelayedHandler()
02164   {
02165   }
02166 
02167   void cancel()
02168   {
02169     cancel_all();
02170     cleanup();
02171   }
02172 
02173   void delay_sample(DDS::InstanceHandle_t handle,
02174                     unique_ptr<MessageTypeWithAllocator> data,
02175                     const OpenDDS::DCPS::DataSampleHeader& header,
02176                     const bool just_registered,
02177                     const ACE_Time_Value& filter_time_expired)
02178   {
02179     // sample_lock_ should already be held
02180     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02181 
02182     if (!data_reader_impl) {
02183       return;
02184     }
02185 
02186     MessageTypeWithAllocator* instance_data = data.get();
02187 
02188     DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header));
02189 
02190     typename FilterDelayedSampleMap::iterator i = map_.find(handle);
02191     if (i == map_.end()) {
02192 
02193       // emplace()/insert() only if the sample is going to be
02194       // new (otherwise we call move(data) twice).
02195       std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
02196 #ifdef ACE_HAS_CPP11
02197       map_.emplace(std::piecewise_construct,
02198                    std::forward_as_tuple(handle),
02199                    std::forward_as_tuple(move(data), hdr, just_registered));
02200 #else
02201       map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
02202 #endif
02203       FilterDelayedSample& sample = result.first->second;
02204 
02205       const ACE_Time_Value interval = duration_to_time_value(
02206         data_reader_impl->qos_.time_based_filter.minimum_separation);
02207 
02208       const ACE_Time_Value filter_time_remaining = duration_to_time_value(
02209         data_reader_impl->qos_.time_based_filter.minimum_separation) - filter_time_expired;
02210 
02211       long timer_id = -1;
02212 
02213       {
02214         ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02215         timer_id = schedule_timer(reinterpret_cast<const void*>(intptr_t(handle)),
02216           filter_time_remaining, interval);
02217       }
02218 
02219       // ensure that another sample has not replaced this while the lock was released
02220       if (instance_data == sample.message.get()) {
02221         sample.timer_id = timer_id;
02222       }
02223     } else {
02224       FilterDelayedSample& sample = i->second;
02225       // we only care about the most recently filtered sample, so clean up the last one
02226 
02227       sample.message = move(data);
02228       sample.header = hdr;
02229       sample.new_instance = just_registered;
02230       // already scheduled for timeout at the desired time
02231     }
02232   }
02233 
02234   void clear_sample(DDS::InstanceHandle_t handle)
02235   {
02236     // sample_lock_ should already be held
02237 
02238     typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02239     if (sample != map_.end()) {
02240       // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
02241       sample->second.message.reset();
02242     }
02243   }
02244 
02245   void drop_sample(DDS::InstanceHandle_t handle)
02246   {
02247     // sample_lock_ should already be held
02248 
02249     typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02250     if (sample != map_.end()) {
02251       {
02252         RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02253         if (data_reader_impl) {
02254           ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02255           cancel_timer(sample->second.timer_id);
02256         }
02257       }
02258 
02259       // use the handle to erase, since the sample lock was released
02260       map_.erase(handle);
02261     }
02262   }
02263 
02264 private:
02265 
02266 
02267 
02268   int handle_timeout(const ACE_Time_Value&, const void* act)
02269   {
02270     DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
02271 
02272     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02273     if (!data_reader_impl)
02274       return -1;
02275 
02276     SubscriptionInstance_rch instance = data_reader_impl->get_handle_instance(handle);
02277 
02278     if (!instance)
02279       return 0;
02280 
02281     long cancel_timer_id = -1;
02282 
02283     {
02284       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_, -1);
02285 
02286       typename FilterDelayedSampleMap::iterator data = map_.find(handle);
02287       if (data == map_.end()) {
02288         return 0;
02289       }
02290 
02291       if (data->second.message) {
02292         const bool NOT_DISPOSE_MSG = false;
02293         const bool NOT_UNREGISTER_MSG = false;
02294         // clear the message, since ownership is being transfered to finish_store_instance_data.
02295 
02296         instance->last_accepted_ = ACE_OS::gettimeofday();
02297         const DataSampleHeader_ptr header = data->second.header;
02298         const bool new_instance = data->second.new_instance;
02299 
02300         // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
02301         data_reader_impl->finish_store_instance_data(
02302           move(data->second.message),
02303           *header,
02304           instance,
02305           NOT_DISPOSE_MSG,
02306           NOT_UNREGISTER_MSG);
02307 
02308         data_reader_impl->accept_sample_processing(instance, *header, new_instance);
02309       } else {
02310         // this check is performed to handle the corner case where store_instance_data received and delivered a sample, while this
02311         // method was waiting for the lock
02312         const ACE_Time_Value interval = duration_to_time_value(data_reader_impl->qos_.time_based_filter.minimum_separation);
02313         if (ACE_OS::gettimeofday() - instance->last_sample_tv_ >= interval) {
02314           // nothing to process, so unregister this handle for timeout
02315           cancel_timer_id = data->second.timer_id;
02316           // no new data to process, so remove from container
02317           map_.erase(data);
02318         }
02319       }
02320     }
02321 
02322     if (cancel_timer_id != -1) {
02323       cancel_timer(cancel_timer_id);
02324     }
02325     return 0;
02326   }
02327 
02328   virtual void reschedule_deadline()
02329   {
02330     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02331 
02332     if (data_reader_impl) {
02333       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02334 
02335       for (typename FilterDelayedSampleMap::iterator sample = map_.begin(); sample != map_.end(); ++sample) {
02336         reset_timer_interval(sample->second.timer_id);
02337       }
02338     }
02339   }
02340 
02341   void cleanup()
02342   {
02343     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02344     if (data_reader_impl) {
02345       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02346       // insure instance_ptrs get freed
02347       map_.clear();
02348     }
02349   }
02350 
02351   WeakRcHandle<DataReaderImpl_T<MessageType> > data_reader_impl_;
02352 
02353   typedef ACE_Strong_Bound_Ptr<const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex> DataSampleHeader_ptr;
02354 
02355   struct FilterDelayedSample {
02356 
02357     FilterDelayedSample(unique_ptr<MessageTypeWithAllocator> msg, DataSampleHeader_ptr hdr, bool new_inst)
02358     : message(move(msg))
02359     , header(hdr)
02360     , new_instance(new_inst)
02361     , timer_id(-1) {
02362     }
02363 
02364     container_supported_unique_ptr<MessageTypeWithAllocator> message;
02365     DataSampleHeader_ptr header;
02366     bool new_instance;
02367     long timer_id;
02368   };
02369 
02370 
02371   typedef OPENDDS_MAP(DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap;
02372 
02373   FilterDelayedSampleMap map_;
02374 public:
02375   typedef typename DataReaderImpl_T<MessageType>::DataAllocator DataAllocator;
02376   //We put the data_allocator_ inside FilterDelayedHandler because the reactor thread in FilterDelayedHandler may be still alive
02377   // after the containing DataReaderImpl is destroyed. This avoids access violation during cleanup.
02378   unique_ptr<DataAllocator> data_allocator_;
02379 };
02380 
02381 unique_ptr<DataAllocator>& data_allocator() { return filter_delayed_handler_->data_allocator_; }
02382 
02383 RcHandle<FilterDelayedHandler> filter_delayed_handler_;
02384 
02385 InstanceMap  instance_map_;
02386 };
02387 
02388 template <typename MessageType>
02389 void* DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator new(size_t , ACE_New_Allocator& pool)
02390 {
02391   typedef typename DataReaderImpl_T<MessageType>::MessageTypeMemoryBlock MessageTypeMemoryBlock;
02392   MessageTypeMemoryBlock* block =
02393     static_cast<MessageTypeMemoryBlock*>(pool.malloc(sizeof(MessageTypeMemoryBlock)));
02394   block->allocator_ = &pool;
02395   return block;
02396 }
02397 
02398 template <typename MessageType>
02399 void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory)
02400 {
02401   if (memory) {
02402     MessageTypeMemoryBlock* block = static_cast<MessageTypeMemoryBlock*>(memory);
02403     block->allocator_->free(block);
02404   }
02405 }
02406 
02407 template <typename MessageType>
02408 void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory, ACE_New_Allocator&)
02409 {
02410   operator delete(memory);
02411 }
02412 
02413 }
02414 }
02415 
02416 OPENDDS_END_VERSIONED_NAMESPACE_DECL
02417 
02418 #endif /* dds_DCPS_DataReaderImpl_T_h */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1