Definition at line 2154 of file DataReaderImpl_T.h.
typedef DataReaderImpl_T<MessageType>::DataAllocator OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::DataAllocator |
Definition at line 2375 of file DataReaderImpl_T.h.
typedef ACE_Strong_Bound_Ptr<const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::DataSampleHeader_ptr [private] |
Definition at line 2353 of file DataReaderImpl_T.h.
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedHandler | ( | DataReaderImpl_T< MessageType > & | data_reader_impl | ) | [inline] |
Definition at line 2156 of file DataReaderImpl_T.h.
02158 : Watchdog(ACE_Time_Value(0)) 02159 , data_reader_impl_(data_reader_impl) 02160 { 02161 }
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::~FilterDelayedHandler | ( | ) | [inline, virtual] |
Definition at line 2163 of file DataReaderImpl_T.h.
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::cancel | ( | void | ) | [inline] |
Definition at line 2167 of file DataReaderImpl_T.h.
References cleanup().
02168 { 02169 cancel_all(); 02170 cleanup(); 02171 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::cleanup | ( | void | ) | [inline, private] |
Definition at line 2341 of file DataReaderImpl_T.h.
References cleanup().
02342 { 02343 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock()); 02344 if (data_reader_impl) { 02345 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_); 02346 // insure instance_ptrs get freed 02347 map_.clear(); 02348 } 02349 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::clear_sample | ( | DDS::InstanceHandle_t | handle | ) | [inline] |
Definition at line 2234 of file DataReaderImpl_T.h.
02235 { 02236 // sample_lock_ should already be held 02237 02238 typename FilterDelayedSampleMap::iterator sample = map_.find(handle); 02239 if (sample != map_.end()) { 02240 // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring 02241 sample->second.message.reset(); 02242 } 02243 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::delay_sample | ( | DDS::InstanceHandle_t | handle, | |
unique_ptr< MessageTypeWithAllocator > | data, | |||
const OpenDDS::DCPS::DataSampleHeader & | header, | |||
const bool | just_registered, | |||
const ACE_Time_Value & | filter_time_expired | |||
) | [inline] |
Definition at line 2173 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::container_supported_unique_ptr< T >::get(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::header, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::message, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::new_instance, and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::FilterDelayedSample::timer_id.
02178 { 02179 // sample_lock_ should already be held 02180 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock()); 02181 02182 if (!data_reader_impl) { 02183 return; 02184 } 02185 02186 MessageTypeWithAllocator* instance_data = data.get(); 02187 02188 DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header)); 02189 02190 typename FilterDelayedSampleMap::iterator i = map_.find(handle); 02191 if (i == map_.end()) { 02192 02193 // emplace()/insert() only if the sample is going to be 02194 // new (otherwise we call move(data) twice). 02195 std::pair<typename FilterDelayedSampleMap::iterator, bool> result = 02196 #ifdef ACE_HAS_CPP11 02197 map_.emplace(std::piecewise_construct, 02198 std::forward_as_tuple(handle), 02199 std::forward_as_tuple(move(data), hdr, just_registered)); 02200 #else 02201 map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered))); 02202 #endif 02203 FilterDelayedSample& sample = result.first->second; 02204 02205 const ACE_Time_Value interval = duration_to_time_value( 02206 data_reader_impl->qos_.time_based_filter.minimum_separation); 02207 02208 const ACE_Time_Value filter_time_remaining = duration_to_time_value( 02209 data_reader_impl->qos_.time_based_filter.minimum_separation) - filter_time_expired; 02210 02211 long timer_id = -1; 02212 02213 { 02214 ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_); 02215 timer_id = schedule_timer(reinterpret_cast<const void*>(intptr_t(handle)), 02216 filter_time_remaining, interval); 02217 } 02218 02219 // ensure that another sample has not replaced this while the lock was released 02220 if (instance_data == sample.message.get()) { 02221 sample.timer_id = timer_id; 02222 } 02223 } else { 02224 FilterDelayedSample& sample = i->second; 02225 // we only care about the most recently filtered sample, so clean up the last one 02226 02227 sample.message = move(data); 02228 sample.header = hdr; 02229 sample.new_instance = just_registered; 02230 // already scheduled for timeout at the desired time 02231 } 02232 }
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::drop_sample | ( | DDS::InstanceHandle_t | handle | ) | [inline] |
Definition at line 2245 of file DataReaderImpl_T.h.
02246 { 02247 // sample_lock_ should already be held 02248 02249 typename FilterDelayedSampleMap::iterator sample = map_.find(handle); 02250 if (sample != map_.end()) { 02251 { 02252 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock()); 02253 if (data_reader_impl) { 02254 ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_); 02255 cancel_timer(sample->second.timer_id); 02256 } 02257 } 02258 02259 // use the handle to erase, since the sample lock was released 02260 map_.erase(handle); 02261 } 02262 }
int OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::handle_timeout | ( | const ACE_Time_Value & | , | |
const void * | act | |||
) | [inline, private, virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 2268 of file DataReaderImpl_T.h.
References OpenDDS::DCPS::duration_to_time_value(), ACE_OS::gettimeofday(), header, and OpenDDS::DCPS::move().
02269 { 02270 DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act)); 02271 02272 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock()); 02273 if (!data_reader_impl) 02274 return -1; 02275 02276 SubscriptionInstance_rch instance = data_reader_impl->get_handle_instance(handle); 02277 02278 if (!instance) 02279 return 0; 02280 02281 long cancel_timer_id = -1; 02282 02283 { 02284 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_, -1); 02285 02286 typename FilterDelayedSampleMap::iterator data = map_.find(handle); 02287 if (data == map_.end()) { 02288 return 0; 02289 } 02290 02291 if (data->second.message) { 02292 const bool NOT_DISPOSE_MSG = false; 02293 const bool NOT_UNREGISTER_MSG = false; 02294 // clear the message, since ownership is being transfered to finish_store_instance_data. 02295 02296 instance->last_accepted_ = ACE_OS::gettimeofday(); 02297 const DataSampleHeader_ptr header = data->second.header; 02298 const bool new_instance = data->second.new_instance; 02299 02300 // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_ 02301 data_reader_impl->finish_store_instance_data( 02302 move(data->second.message), 02303 *header, 02304 instance, 02305 NOT_DISPOSE_MSG, 02306 NOT_UNREGISTER_MSG); 02307 02308 data_reader_impl->accept_sample_processing(instance, *header, new_instance); 02309 } else { 02310 // this check is performed to handle the corner case where store_instance_data received and delivered a sample, while this 02311 // method was waiting for the lock 02312 const ACE_Time_Value interval = duration_to_time_value(data_reader_impl->qos_.time_based_filter.minimum_separation); 02313 if (ACE_OS::gettimeofday() - instance->last_sample_tv_ >= interval) { 02314 // nothing to process, so unregister this handle for timeout 02315 cancel_timer_id = data->second.timer_id; 02316 // no new data to process, so remove from container 02317 map_.erase(data); 02318 } 02319 } 02320 } 02321 02322 if (cancel_timer_id != -1) { 02323 cancel_timer(cancel_timer_id); 02324 } 02325 return 0; 02326 }
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::OPENDDS_MAP | ( | DDS::InstanceHandle_t | , | |
FilterDelayedSample | ||||
) | [private] |
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::reschedule_deadline | ( | ) | [inline, private, virtual] |
Re-schedule timer with new interval.
Implements OpenDDS::DCPS::Watchdog.
Definition at line 2328 of file DataReaderImpl_T.h.
02329 { 02330 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock()); 02331 02332 if (data_reader_impl) { 02333 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_); 02334 02335 for (typename FilterDelayedSampleMap::iterator sample = map_.begin(); sample != map_.end(); ++sample) { 02336 reset_timer_interval(sample->second.timer_id); 02337 } 02338 } 02339 }
unique_ptr<DataAllocator> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::data_allocator_ |
Definition at line 2378 of file DataReaderImpl_T.h.
WeakRcHandle<DataReaderImpl_T<MessageType> > OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::data_reader_impl_ [private] |
Definition at line 2351 of file DataReaderImpl_T.h.
FilterDelayedSampleMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::FilterDelayedHandler::map_ [private] |
Definition at line 2373 of file DataReaderImpl_T.h.