OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler Class Reference

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler:
Collaboration graph
[legend]

List of all members.

Classes

struct  FilterDelayedSample

Public Types

typedef DataReaderImpl_T
< MessageType >::DataAllocator 
DataAllocator

Public Member Functions

 FilterDelayedHandler (DataReaderImpl_T< MessageType > &data_reader_impl)
virtual ~FilterDelayedHandler ()
void cancel ()
void delay_sample (DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const ACE_Time_Value &filter_time_expired)
void clear_sample (DDS::InstanceHandle_t handle)
void drop_sample (DDS::InstanceHandle_t handle)

Public Attributes

unique_ptr< DataAllocatordata_allocator_

Private Types

typedef ACE_Strong_Bound_Ptr
< const
OpenDDS::DCPS::DataSampleHeader,
ACE_Null_Mutex
DataSampleHeader_ptr

Private Member Functions

int handle_timeout (const ACE_Time_Value &, const void *act)
virtual void reschedule_deadline ()
 Re-schedule timer with new interval.
void cleanup ()
typedef OPENDDS_MAP (DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap

Private Attributes

WeakRcHandle< DataReaderImpl_T
< MessageType > > 
data_reader_impl_
FilterDelayedSampleMap map_

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler

Definition at line 2154 of file DataReaderImpl_T.h.


Member Typedef Documentation

template<typename MessageType>
typedef DataReaderImpl_T<MessageType>::DataAllocator OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::DataAllocator

Definition at line 2375 of file DataReaderImpl_T.h.

Definition at line 2353 of file DataReaderImpl_T.h.


Constructor & Destructor Documentation

template<typename MessageType>
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedHandler ( DataReaderImpl_T< MessageType > &  data_reader_impl  )  [inline]

Definition at line 2156 of file DataReaderImpl_T.h.

02158   : Watchdog(ACE_Time_Value(0))
02159   , data_reader_impl_(data_reader_impl)
02160   {
02161   }

template<typename MessageType>
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::~FilterDelayedHandler (  )  [inline, virtual]

Definition at line 2163 of file DataReaderImpl_T.h.

02164   {
02165   }


Member Function Documentation

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::cancel ( void   )  [inline]

Definition at line 2167 of file DataReaderImpl_T.h.

References cleanup().

02168   {
02169     cancel_all();
02170     cleanup();
02171   }

Here is the call graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::cleanup ( void   )  [inline, private]

Definition at line 2341 of file DataReaderImpl_T.h.

References cleanup().

02342   {
02343     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02344     if (data_reader_impl) {
02345       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02346       // insure instance_ptrs get freed
02347       map_.clear();
02348     }
02349   }

Here is the call graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::clear_sample ( DDS::InstanceHandle_t  handle  )  [inline]

Definition at line 2234 of file DataReaderImpl_T.h.

02235   {
02236     // sample_lock_ should already be held
02237 
02238     typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02239     if (sample != map_.end()) {
02240       // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
02241       sample->second.message.reset();
02242     }
02243   }

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::delay_sample ( DDS::InstanceHandle_t  handle,
unique_ptr< MessageTypeWithAllocator data,
const OpenDDS::DCPS::DataSampleHeader header,
const bool  just_registered,
const ACE_Time_Value filter_time_expired 
) [inline]

Definition at line 2173 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::container_supported_unique_ptr< T >::get(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::header, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::message, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::new_instance, and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::timer_id.

02178   {
02179     // sample_lock_ should already be held
02180     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02181 
02182     if (!data_reader_impl) {
02183       return;
02184     }
02185 
02186     MessageTypeWithAllocator* instance_data = data.get();
02187 
02188     DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header));
02189 
02190     typename FilterDelayedSampleMap::iterator i = map_.find(handle);
02191     if (i == map_.end()) {
02192 
02193       // emplace()/insert() only if the sample is going to be
02194       // new (otherwise we call move(data) twice).
02195       std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
02196 #ifdef ACE_HAS_CPP11
02197       map_.emplace(std::piecewise_construct,
02198                    std::forward_as_tuple(handle),
02199                    std::forward_as_tuple(move(data), hdr, just_registered));
02200 #else
02201       map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
02202 #endif
02203       FilterDelayedSample& sample = result.first->second;
02204 
02205       const ACE_Time_Value interval = duration_to_time_value(
02206         data_reader_impl->qos_.time_based_filter.minimum_separation);
02207 
02208       const ACE_Time_Value filter_time_remaining = duration_to_time_value(
02209         data_reader_impl->qos_.time_based_filter.minimum_separation) - filter_time_expired;
02210 
02211       long timer_id = -1;
02212 
02213       {
02214         ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02215         timer_id = schedule_timer(reinterpret_cast<const void*>(intptr_t(handle)),
02216           filter_time_remaining, interval);
02217       }
02218 
02219       // ensure that another sample has not replaced this while the lock was released
02220       if (instance_data == sample.message.get()) {
02221         sample.timer_id = timer_id;
02222       }
02223     } else {
02224       FilterDelayedSample& sample = i->second;
02225       // we only care about the most recently filtered sample, so clean up the last one
02226 
02227       sample.message = move(data);
02228       sample.header = hdr;
02229       sample.new_instance = just_registered;
02230       // already scheduled for timeout at the desired time
02231     }
02232   }

Here is the call graph for this function:

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::drop_sample ( DDS::InstanceHandle_t  handle  )  [inline]

Definition at line 2245 of file DataReaderImpl_T.h.

02246   {
02247     // sample_lock_ should already be held
02248 
02249     typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02250     if (sample != map_.end()) {
02251       {
02252         RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02253         if (data_reader_impl) {
02254           ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02255           cancel_timer(sample->second.timer_id);
02256         }
02257       }
02258 
02259       // use the handle to erase, since the sample lock was released
02260       map_.erase(handle);
02261     }
02262   }

template<typename MessageType>
int OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::handle_timeout ( const ACE_Time_Value ,
const void *  act 
) [inline, private, virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 2268 of file DataReaderImpl_T.h.

References OpenDDS::DCPS::duration_to_time_value(), ACE_OS::gettimeofday(), header, and OpenDDS::DCPS::move().

02269   {
02270     DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
02271 
02272     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02273     if (!data_reader_impl)
02274       return -1;
02275 
02276     SubscriptionInstance_rch instance = data_reader_impl->get_handle_instance(handle);
02277 
02278     if (!instance)
02279       return 0;
02280 
02281     long cancel_timer_id = -1;
02282 
02283     {
02284       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_, -1);
02285 
02286       typename FilterDelayedSampleMap::iterator data = map_.find(handle);
02287       if (data == map_.end()) {
02288         return 0;
02289       }
02290 
02291       if (data->second.message) {
02292         const bool NOT_DISPOSE_MSG = false;
02293         const bool NOT_UNREGISTER_MSG = false;
02294         // clear the message, since ownership is being transfered to finish_store_instance_data.
02295 
02296         instance->last_accepted_ = ACE_OS::gettimeofday();
02297         const DataSampleHeader_ptr header = data->second.header;
02298         const bool new_instance = data->second.new_instance;
02299 
02300         // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
02301         data_reader_impl->finish_store_instance_data(
02302           move(data->second.message),
02303           *header,
02304           instance,
02305           NOT_DISPOSE_MSG,
02306           NOT_UNREGISTER_MSG);
02307 
02308         data_reader_impl->accept_sample_processing(instance, *header, new_instance);
02309       } else {
02310         // this check is performed to handle the corner case where store_instance_data received and delivered a sample, while this
02311         // method was waiting for the lock
02312         const ACE_Time_Value interval = duration_to_time_value(data_reader_impl->qos_.time_based_filter.minimum_separation);
02313         if (ACE_OS::gettimeofday() - instance->last_sample_tv_ >= interval) {
02314           // nothing to process, so unregister this handle for timeout
02315           cancel_timer_id = data->second.timer_id;
02316           // no new data to process, so remove from container
02317           map_.erase(data);
02318         }
02319       }
02320     }
02321 
02322     if (cancel_timer_id != -1) {
02323       cancel_timer(cancel_timer_id);
02324     }
02325     return 0;
02326   }

Here is the call graph for this function:

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
FilterDelayedSample   
) [private]
template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::reschedule_deadline (  )  [inline, private, virtual]

Re-schedule timer with new interval.

Implements OpenDDS::DCPS::Watchdog.

Definition at line 2328 of file DataReaderImpl_T.h.

02329   {
02330     RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02331 
02332     if (data_reader_impl) {
02333       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02334 
02335       for (typename FilterDelayedSampleMap::iterator sample = map_.begin(); sample != map_.end(); ++sample) {
02336         reset_timer_interval(sample->second.timer_id);
02337       }
02338     }
02339   }


Member Data Documentation

Definition at line 2378 of file DataReaderImpl_T.h.

template<typename MessageType>
WeakRcHandle<DataReaderImpl_T<MessageType> > OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::data_reader_impl_ [private]

Definition at line 2351 of file DataReaderImpl_T.h.

template<typename MessageType>
FilterDelayedSampleMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::map_ [private]

Definition at line 2373 of file DataReaderImpl_T.h.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1