00001 #ifndef dds_DCPS_DataReaderImpl_T_h
00002 #define dds_DCPS_DataReaderImpl_T_h
00003 #include "dds/DCPS/MultiTopicImpl.h"
00004 #include "dds/DCPS/RakeResults_T.h"
00005 #include "dds/DCPS/SubscriberImpl.h"
00006 #include "dds/DCPS/BuiltInTopicUtils.h"
00007 #include "dds/DCPS/Util.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dds/DCPS/Watchdog.h"
00010 #include "dcps_export.h"
00011 #include "dds/DCPS/GuidConverter.h"
00012
00013 #include "ace/Bound_Ptr.h"
00014 #include "ace/Time_Value.h"
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021
00022
00023
00024
00025
00026
00027 template <typename MessageType>
00028 class
00029 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00030 OpenDDS_Dcps_Export
00031 #endif
00032 DataReaderImpl_T
00033 : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataReaderType>,
00034 public virtual OpenDDS::DCPS::DataReaderImpl
00035 {
00036 public:
00037 typedef DDSTraits<MessageType> TraitsType;
00038 typedef typename TraitsType::MessageSequenceType MessageSequenceType;
00039
00040 typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
00041 typename TraitsType::LessThanType) InstanceMap;
00042
00043 class SharedInstanceMap
00044 : public RcObject
00045 , public InstanceMap
00046 {
00047 };
00048
00049 typedef RcHandle<SharedInstanceMap> SharedInstanceMap_rch;
00050
00051 class MessageTypeWithAllocator
00052 : public MessageType
00053 , public EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>
00054 {
00055 public:
00056 void* operator new(size_t size, ACE_New_Allocator& pool);
00057 void operator delete(void* memory, ACE_New_Allocator& pool);
00058 void operator delete(void* memory);
00059
00060 MessageTypeWithAllocator(){}
00061 MessageTypeWithAllocator(const MessageType& other)
00062 : MessageType(other)
00063 {
00064 }
00065 };
00066
00067 struct MessageTypeMemoryBlock {
00068 MessageTypeWithAllocator element_;
00069 ACE_New_Allocator* allocator_;
00070 };
00071
00072 typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageTypeMemoryBlock, ACE_Null_Mutex> DataAllocator;
00073
00074 typedef typename TraitsType::DataReaderType Interface;
00075
00076 DataReaderImpl_T (void)
00077 : filter_delayed_handler_(make_rch<FilterDelayedHandler>(ref(*this)))
00078 {
00079 }
00080
00081 virtual ~DataReaderImpl_T (void)
00082 {
00083 for (typename InstanceMap::iterator it = instance_map_.begin();
00084 it != instance_map_.end(); ++it)
00085 {
00086 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
00087 this->purge_data(ptr);
00088 }
00089
00090 }
00091
00092
00093
00094
00095
00096 virtual DDS::ReturnCode_t enable_specific ()
00097 {
00098 data_allocator().reset(new DataAllocator(get_n_chunks ()));
00099 if (OpenDDS::DCPS::DCPS_debug_level >= 2)
00100 ACE_DEBUG((LM_DEBUG,
00101 ACE_TEXT("(%P|%t) %CDataReaderImpl::")
00102 ACE_TEXT("enable_specific-data")
00103 ACE_TEXT(" Cached_Allocator_With_Overflow ")
00104 ACE_TEXT("%x with %d chunks\n"),
00105 TraitsType::type_name(),
00106 data_allocator().get(),
00107 this->get_n_chunks ()));
00108
00109 return DDS::RETCODE_OK;
00110 }
00111
00112 virtual DDS::ReturnCode_t read (
00113 MessageSequenceType & received_data,
00114 DDS::SampleInfoSeq & info_seq,
00115 ::CORBA::Long max_samples,
00116 DDS::SampleStateMask sample_states,
00117 DDS::ViewStateMask view_states,
00118 DDS::InstanceStateMask instance_states)
00119 {
00120 DDS::ReturnCode_t const precond =
00121 check_inputs("read", received_data, info_seq, max_samples);
00122 if (DDS::RETCODE_OK != precond)
00123 {
00124 return precond;
00125 }
00126
00127 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00128 guard,
00129 this->sample_lock_,
00130 DDS::RETCODE_ERROR);
00131
00132 return read_i(received_data, info_seq, max_samples, sample_states,
00133 view_states, instance_states, 0);
00134 }
00135
00136 virtual DDS::ReturnCode_t take (
00137 MessageSequenceType & received_data,
00138 DDS::SampleInfoSeq & info_seq,
00139 ::CORBA::Long max_samples,
00140 DDS::SampleStateMask sample_states,
00141 DDS::ViewStateMask view_states,
00142 DDS::InstanceStateMask instance_states)
00143 {
00144 DDS::ReturnCode_t const precond =
00145 check_inputs("take", received_data, info_seq, max_samples);
00146 if (DDS::RETCODE_OK != precond)
00147 {
00148 return precond;
00149 }
00150
00151 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00152 guard,
00153 this->sample_lock_,
00154 DDS::RETCODE_ERROR);
00155
00156 return take_i(received_data, info_seq, max_samples, sample_states,
00157 view_states, instance_states, 0);
00158 }
00159
00160 virtual DDS::ReturnCode_t read_w_condition (
00161 MessageSequenceType & received_data,
00162 DDS::SampleInfoSeq & sample_info,
00163 ::CORBA::Long max_samples,
00164 DDS::ReadCondition_ptr a_condition)
00165 {
00166 DDS::ReturnCode_t const precond =
00167 check_inputs("read_w_condition", received_data, sample_info, max_samples);
00168 if (DDS::RETCODE_OK != precond)
00169 {
00170 return precond;
00171 }
00172
00173 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00174 DDS::RETCODE_ERROR);
00175
00176 if (!has_readcondition(a_condition))
00177 {
00178 return DDS::RETCODE_PRECONDITION_NOT_MET;
00179 }
00180
00181 return read_i(received_data, sample_info, max_samples,
00182 a_condition->get_sample_state_mask(),
00183 a_condition->get_view_state_mask(),
00184 a_condition->get_instance_state_mask(),
00185 #ifndef OPENDDS_NO_QUERY_CONDITION
00186 dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
00187 #else
00188 0);
00189 #endif
00190 }
00191
00192 virtual DDS::ReturnCode_t take_w_condition (
00193 MessageSequenceType & received_data,
00194 DDS::SampleInfoSeq & sample_info,
00195 ::CORBA::Long max_samples,
00196 DDS::ReadCondition_ptr a_condition)
00197 {
00198 DDS::ReturnCode_t const precond =
00199 check_inputs("take_w_condition", received_data, sample_info, max_samples);
00200 if (DDS::RETCODE_OK != precond)
00201 {
00202 return precond;
00203 }
00204
00205 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00206 DDS::RETCODE_ERROR);
00207
00208 if (!has_readcondition(a_condition))
00209 {
00210 return DDS::RETCODE_PRECONDITION_NOT_MET;
00211 }
00212
00213 return take_i(received_data, sample_info, max_samples,
00214 a_condition->get_sample_state_mask(),
00215 a_condition->get_view_state_mask(),
00216 a_condition->get_instance_state_mask(),
00217 #ifndef OPENDDS_NO_QUERY_CONDITION
00218 dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
00219 #else
00220 0
00221 #endif
00222 );
00223 }
00224
00225 virtual DDS::ReturnCode_t read_next_sample (
00226 MessageType & received_data,
00227 DDS::SampleInfo & sample_info)
00228 {
00229
00230 bool found_data = false;
00231
00232 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00233 guard,
00234 this->sample_lock_,
00235 DDS::RETCODE_ERROR);
00236
00237 typename InstanceMap::iterator const the_end = instance_map_.end ();
00238 for (typename InstanceMap::iterator it = instance_map_.begin ();
00239 it != the_end;
00240 ++it)
00241 {
00242 DDS::InstanceHandle_t handle = it->second;
00243 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00244
00245 bool mrg = false;
00246
00247 if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00248 (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00249 {
00250 for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_;
00251 item != 0;
00252 item = item->next_data_sample_)
00253 {
00254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00255 if (item->coherent_change_) continue;
00256 #endif
00257
00258 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00259 {
00260 if (item->registered_data_ != 0)
00261 {
00262 received_data =
00263 *static_cast< MessageType *> (item->registered_data_);
00264 }
00265 ptr->instance_state_.sample_info(sample_info, item);
00266
00267 item->sample_state_ = DDS::READ_SAMPLE_STATE;
00268
00269
00270 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00271
00272 found_data = true;
00273 }
00274 if (found_data)
00275 {
00276 break;
00277 }
00278 }
00279 }
00280
00281 if (found_data)
00282 {
00283 if (mrg) ptr->instance_state_.accessed();
00284
00285
00286
00287 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00288
00289 break;
00290 }
00291 }
00292 post_read_or_take();
00293 return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00294 }
00295
00296 virtual DDS::ReturnCode_t take_next_sample (
00297 MessageType & received_data,
00298 DDS::SampleInfo & sample_info)
00299 {
00300 bool found_data = false;
00301
00302
00303 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00304 guard,
00305 this->sample_lock_,
00306 DDS::RETCODE_ERROR);
00307
00308 typename InstanceMap::iterator const the_end = instance_map_.end ();
00309 for (typename InstanceMap::iterator it = instance_map_.begin ();
00310 it != the_end;
00311 ++it)
00312 {
00313 DDS::InstanceHandle_t handle = it->second;
00314 OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(handle);
00315
00316 bool mrg = false;
00317
00318 OpenDDS::DCPS::ReceivedDataElement *tail = 0;
00319 if ((ptr->instance_state_.view_state() & DDS::ANY_VIEW_STATE) &&
00320 (ptr->instance_state_.instance_state() & DDS::ANY_INSTANCE_STATE))
00321 {
00322
00323 OpenDDS::DCPS::ReceivedDataElement *next;
00324 tail = 0;
00325 OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
00326 while (item)
00327 {
00328 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00329 if (item->coherent_change_)
00330 {
00331 item = item->next_data_sample_;
00332 continue;
00333 }
00334 #endif
00335 if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE)
00336 {
00337 if (item->registered_data_ != 0)
00338 {
00339 received_data =
00340 *static_cast< MessageType *> (item->registered_data_);
00341 }
00342 ptr->instance_state_.sample_info(sample_info, item);
00343
00344 item->sample_state_ = DDS::READ_SAMPLE_STATE;
00345
00346 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00347
00348 if (item == ptr->rcvd_samples_.tail_)
00349 {
00350 tail = ptr->rcvd_samples_.tail_;
00351 item = item->next_data_sample_;
00352 }
00353 else
00354 {
00355 next = item->next_data_sample_;
00356
00357 ptr->rcvd_samples_.remove(item);
00358 item->dec_ref();
00359
00360 item = next;
00361 }
00362
00363 found_data = true;
00364 }
00365 if (found_data)
00366 {
00367 break;
00368 }
00369 }
00370 }
00371
00372 if (found_data)
00373 {
00374 if (mrg) ptr->instance_state_.accessed();
00375
00376
00377
00378
00379
00380 if (tail)
00381 {
00382 this->sample_info(sample_info, tail);
00383
00384 ptr->rcvd_samples_.remove(tail);
00385 tail->dec_ref();
00386 }
00387 else
00388 {
00389 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00390 }
00391
00392 break;
00393 }
00394 }
00395 post_read_or_take();
00396 return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
00397 }
00398
00399 virtual DDS::ReturnCode_t read_instance (
00400 MessageSequenceType & received_data,
00401 DDS::SampleInfoSeq & info_seq,
00402 ::CORBA::Long max_samples,
00403 DDS::InstanceHandle_t a_handle,
00404 DDS::SampleStateMask sample_states,
00405 DDS::ViewStateMask view_states,
00406 DDS::InstanceStateMask instance_states)
00407 {
00408 DDS::ReturnCode_t const precond =
00409 check_inputs("read_instance", received_data, info_seq, max_samples);
00410 if (DDS::RETCODE_OK != precond)
00411 {
00412 return precond;
00413 }
00414
00415 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00416 guard,
00417 this->sample_lock_,
00418 DDS::RETCODE_ERROR);
00419 return read_instance_i(received_data, info_seq, max_samples, a_handle,
00420 sample_states, view_states, instance_states, 0);
00421 }
00422
00423 virtual DDS::ReturnCode_t take_instance (
00424 MessageSequenceType & received_data,
00425 DDS::SampleInfoSeq & info_seq,
00426 ::CORBA::Long max_samples,
00427 DDS::InstanceHandle_t a_handle,
00428 DDS::SampleStateMask sample_states,
00429 DDS::ViewStateMask view_states,
00430 DDS::InstanceStateMask instance_states)
00431 {
00432 DDS::ReturnCode_t const precond =
00433 check_inputs("take_instance", received_data, info_seq, max_samples);
00434 if (DDS::RETCODE_OK != precond)
00435 {
00436 return precond;
00437 }
00438
00439 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00440 guard,
00441 this->sample_lock_,
00442 DDS::RETCODE_ERROR);
00443 return take_instance_i(received_data, info_seq, max_samples, a_handle,
00444 sample_states, view_states, instance_states, 0);
00445 }
00446
00447 virtual DDS::ReturnCode_t read_instance_w_condition (
00448 MessageSequenceType & received_data,
00449 DDS::SampleInfoSeq & info_seq,
00450 ::CORBA::Long max_samples,
00451 DDS::InstanceHandle_t a_handle,
00452 DDS::ReadCondition_ptr a_condition)
00453 {
00454 DDS::ReturnCode_t const precond =
00455 check_inputs("read_instance_w_condition", received_data, info_seq,
00456 max_samples);
00457 if (DDS::RETCODE_OK != precond)
00458 {
00459 return precond;
00460 }
00461
00462 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00463 DDS::RETCODE_ERROR);
00464
00465 if (!has_readcondition(a_condition))
00466 {
00467 return DDS::RETCODE_PRECONDITION_NOT_MET;
00468 }
00469
00470 #ifndef OPENDDS_NO_QUERY_CONDITION
00471 DDS::QueryCondition_ptr query_condition =
00472 dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00473 #endif
00474
00475 return read_instance_i(received_data, info_seq, max_samples, a_handle,
00476 a_condition->get_sample_state_mask(),
00477 a_condition->get_view_state_mask(),
00478 a_condition->get_instance_state_mask(),
00479 #ifndef OPENDDS_NO_QUERY_CONDITION
00480 query_condition
00481 #else
00482 0
00483 #endif
00484 );
00485 }
00486
00487 virtual DDS::ReturnCode_t take_instance_w_condition (
00488 MessageSequenceType & received_data,
00489 DDS::SampleInfoSeq & info_seq,
00490 ::CORBA::Long max_samples,
00491 DDS::InstanceHandle_t a_handle,
00492 DDS::ReadCondition_ptr a_condition)
00493 {
00494 DDS::ReturnCode_t const precond =
00495 check_inputs("take_instance_w_condition", received_data, info_seq,
00496 max_samples);
00497 if (DDS::RETCODE_OK != precond)
00498 {
00499 return precond;
00500 }
00501
00502 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00503 DDS::RETCODE_ERROR);
00504
00505 if (!has_readcondition(a_condition))
00506 {
00507 return DDS::RETCODE_PRECONDITION_NOT_MET;
00508 }
00509
00510 #ifndef OPENDDS_NO_QUERY_CONDITION
00511 DDS::QueryCondition_ptr query_condition =
00512 dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00513 #endif
00514
00515 return take_instance_i(received_data, info_seq, max_samples, a_handle,
00516 a_condition->get_sample_state_mask(),
00517 a_condition->get_view_state_mask(),
00518 a_condition->get_instance_state_mask(),
00519 #ifndef OPENDDS_NO_QUERY_CONDITION
00520 query_condition
00521 #else
00522 0
00523 #endif
00524 );
00525 }
00526
00527 virtual DDS::ReturnCode_t read_next_instance (
00528 MessageSequenceType & received_data,
00529 DDS::SampleInfoSeq & info_seq,
00530 ::CORBA::Long max_samples,
00531 DDS::InstanceHandle_t a_handle,
00532 DDS::SampleStateMask sample_states,
00533 DDS::ViewStateMask view_states,
00534 DDS::InstanceStateMask instance_states)
00535 {
00536 DDS::ReturnCode_t const precond =
00537 check_inputs("read_next_instance", received_data, info_seq, max_samples);
00538 if (DDS::RETCODE_OK != precond)
00539 {
00540 return precond;
00541 }
00542
00543 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00544 sample_states, view_states, instance_states, 0);
00545 }
00546
00547 virtual DDS::ReturnCode_t take_next_instance (
00548 MessageSequenceType & received_data,
00549 DDS::SampleInfoSeq & info_seq,
00550 ::CORBA::Long max_samples,
00551 DDS::InstanceHandle_t a_handle,
00552 DDS::SampleStateMask sample_states,
00553 DDS::ViewStateMask view_states,
00554 DDS::InstanceStateMask instance_states)
00555 {
00556 DDS::ReturnCode_t const precond =
00557 check_inputs("take_next_instance", received_data, info_seq, max_samples);
00558 if (DDS::RETCODE_OK != precond)
00559 {
00560 return precond;
00561 }
00562
00563 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00564 sample_states, view_states, instance_states, 0);
00565 }
00566
00567 virtual DDS::ReturnCode_t read_next_instance_w_condition (
00568 MessageSequenceType & received_data,
00569 DDS::SampleInfoSeq & info_seq,
00570 ::CORBA::Long max_samples,
00571 DDS::InstanceHandle_t a_handle,
00572 DDS::ReadCondition_ptr a_condition)
00573 {
00574 DDS::ReturnCode_t const precond =
00575 check_inputs("read_next_instance_w_condition", received_data, info_seq,
00576 max_samples);
00577 if (DDS::RETCODE_OK != precond)
00578 {
00579 return precond;
00580 }
00581
00582 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00583 DDS::RETCODE_ERROR);
00584
00585 if (!has_readcondition(a_condition))
00586 {
00587 return DDS::RETCODE_PRECONDITION_NOT_MET;
00588 }
00589
00590 #ifndef OPENDDS_NO_QUERY_CONDITION
00591 DDS::QueryCondition_ptr query_condition =
00592 dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00593 #endif
00594
00595 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00596 a_condition->get_sample_state_mask(),
00597 a_condition->get_view_state_mask(),
00598 a_condition->get_instance_state_mask(),
00599 #ifndef OPENDDS_NO_QUERY_CONDITION
00600 query_condition
00601 #else
00602 0
00603 #endif
00604 );
00605 }
00606
00607 virtual DDS::ReturnCode_t take_next_instance_w_condition (
00608 MessageSequenceType & received_data,
00609 DDS::SampleInfoSeq & info_seq,
00610 ::CORBA::Long max_samples,
00611 DDS::InstanceHandle_t a_handle,
00612 DDS::ReadCondition_ptr a_condition)
00613 {
00614 DDS::ReturnCode_t const precond =
00615 check_inputs("take_next_instance_w_condition", received_data, info_seq,
00616 max_samples);
00617 if (DDS::RETCODE_OK != precond)
00618 {
00619 return precond;
00620 }
00621
00622 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00623 DDS::RETCODE_ERROR);
00624
00625 if (!has_readcondition(a_condition))
00626 {
00627 return DDS::RETCODE_PRECONDITION_NOT_MET;
00628 }
00629
00630 #ifndef OPENDDS_NO_QUERY_CONDITION
00631 DDS::QueryCondition_ptr query_condition =
00632 dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
00633 #endif
00634
00635 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00636 a_condition->get_sample_state_mask(),
00637 a_condition->get_view_state_mask(),
00638 a_condition->get_instance_state_mask(),
00639 #ifndef OPENDDS_NO_QUERY_CONDITION
00640 query_condition
00641 #else
00642 0
00643 #endif
00644 );
00645 }
00646
00647 virtual DDS::ReturnCode_t return_loan (
00648 MessageSequenceType & received_data,
00649 DDS::SampleInfoSeq & info_seq)
00650 {
00651
00652
00653 if (received_data.length() != info_seq.length())
00654 {
00655 return DDS::RETCODE_PRECONDITION_NOT_MET;
00656 }
00657
00658 if (received_data.release())
00659 {
00660
00661 return DDS::RETCODE_OK;
00662 }
00663 else
00664 {
00665 info_seq.length(0);
00666 received_data.length(0);
00667 }
00668 return DDS::RETCODE_OK;
00669 }
00670
00671 virtual DDS::ReturnCode_t get_key_value (
00672 MessageType & key_holder,
00673 DDS::InstanceHandle_t handle)
00674 {
00675 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00676 guard,
00677 this->sample_lock_,
00678 DDS::RETCODE_ERROR);
00679
00680 typename InstanceMap::iterator const the_end = instance_map_.end ();
00681 for (typename InstanceMap::iterator it = instance_map_.begin ();
00682 it != the_end;
00683 ++it)
00684 {
00685 if (it->second == handle)
00686 {
00687 key_holder = it->first;
00688 return DDS::RETCODE_OK;
00689 }
00690 }
00691
00692 return DDS::RETCODE_BAD_PARAMETER;
00693 }
00694
00695 virtual DDS::InstanceHandle_t lookup_instance (const MessageType & instance_data)
00696 {
00697 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00698
00699 if (it == instance_map_.end())
00700 {
00701 return DDS::HANDLE_NIL;
00702 }
00703 else
00704 {
00705 return it->second;
00706 }
00707 }
00708
00709 virtual DDS::ReturnCode_t auto_return_loan(void* seq)
00710 {
00711 MessageSequenceType& received_data =
00712 *static_cast< MessageSequenceType*> (seq);
00713
00714 if (!received_data.release())
00715 {
00716
00717 received_data.length(0);
00718 }
00719 return DDS::RETCODE_OK;
00720 }
00721
00722 void release_loan (MessageSequenceType & received_data)
00723 {
00724 received_data.length(0);
00725 }
00726
00727 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00728 bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00729 DDS::ViewStateMask view_states,
00730 DDS::InstanceStateMask instance_states,
00731 const OpenDDS::DCPS::FilterEvaluator& evaluator,
00732 const DDS::StringSeq& params)
00733 {
00734 using namespace OpenDDS::DCPS;
00735 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
00736 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
00737
00738 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
00739 end = instances_.end(); iter != end; ++iter) {
00740 SubscriptionInstance& inst = *iter->second;
00741
00742 if ((inst.instance_state_.view_state() & view_states) &&
00743 (inst.instance_state_.instance_state() & instance_states)) {
00744 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
00745 item = item->next_data_sample_) {
00746 if (item->sample_state_ & sample_states
00747 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00748 && !item->coherent_change_
00749 #endif
00750 && item->registered_data_) {
00751 if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) {
00752 return true;
00753 }
00754 }
00755 }
00756 }
00757 }
00758
00759 return false;
00760 }
00761
00762 DDS::ReturnCode_t read_generic(
00763 OpenDDS::DCPS::DataReaderImpl::GenericBundle& gen,
00764 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00765 DDS::InstanceStateMask instance_states,
00766 bool adjust_ref_count=false)
00767 {
00768
00769 MessageSequenceType data;
00770 DDS::ReturnCode_t rc;
00771 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00772 guard,
00773 this->sample_lock_,
00774 DDS::RETCODE_ERROR);
00775 {
00776 rc = read_i(data, gen.info_,
00777 DDS::LENGTH_UNLIMITED,
00778 sample_states, view_states, instance_states, 0);
00779 if (true == adjust_ref_count ) {
00780 data.increment_references();
00781 }
00782 }
00783 gen.samples_.reserve(data.length());
00784 for (CORBA::ULong i = 0; i < data.length(); ++i) {
00785 gen.samples_.push_back(&data[i]);
00786 }
00787 return rc;
00788
00789 }
00790
00791 DDS::InstanceHandle_t lookup_instance_generic(const void* data)
00792 {
00793 return lookup_instance(*static_cast<const MessageType*>(data));
00794 }
00795
00796 virtual DDS::ReturnCode_t take(
00797 OpenDDS::DCPS::AbstractSamples& samples,
00798 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00799 DDS::InstanceStateMask instance_states)
00800 {
00801
00802 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00803 guard,
00804 this->sample_lock_,
00805 DDS::RETCODE_ERROR);
00806
00807 MessageSequenceType data;
00808 DDS::SampleInfoSeq infos;
00809 DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
00810 sample_states, view_states, instance_states, 0);
00811
00812 samples.reserve(data.length());
00813
00814 for (CORBA::ULong i = 0; i < data.length(); ++i) {
00815 samples.push_back(infos[i], &data[i]);
00816 }
00817
00818 return rc;
00819 }
00820
00821 DDS::ReturnCode_t read_instance_generic(void*& data,
00822 DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00823 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00824 DDS::InstanceStateMask instance_states)
00825 {
00826 MessageSequenceType dataseq;
00827 DDS::SampleInfoSeq infoseq;
00828 const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
00829 DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
00830 instance_states, 0);
00831 if (rc != DDS::RETCODE_NO_DATA)
00832 {
00833 const CORBA::ULong last = dataseq.length() - 1;
00834 data = new MessageType(dataseq[last]);
00835 info = infoseq[last];
00836 }
00837 return rc;
00838 }
00839
00840 DDS::ReturnCode_t read_next_instance_generic(void*& data,
00841 DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00842 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00843 DDS::InstanceStateMask instance_states)
00844 {
00845 MessageSequenceType dataseq;
00846 DDS::SampleInfoSeq infoseq;
00847 const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
00848 DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
00849 instance_states, 0);
00850 if (rc != DDS::RETCODE_NO_DATA)
00851 {
00852 const CORBA::ULong last = dataseq.length() - 1;
00853 data = new MessageType(dataseq[last]);
00854 info = infoseq[last];
00855 }
00856 return rc;
00857 }
00858
00859 #endif
00860
00861 DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
00862 DDS::ViewStateKind view)
00863 {
00864 using namespace OpenDDS::DCPS;
00865 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
00866 DDS::HANDLE_NIL);
00867
00868 #ifndef OPENDDS_NO_MULTI_TOPIC
00869 DDS::TopicDescription_var descr = get_topicdescription();
00870 if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
00871 if (!mt->filter(sample)) {
00872 return DDS::HANDLE_NIL;
00873 }
00874 }
00875 #endif
00876
00877 get_subscriber_servant()->data_received(this);
00878
00879 DDS::InstanceHandle_t inst = lookup_instance(sample);
00880 bool filtered = false;
00881 SubscriptionInstance_rch instance;
00882
00883
00884
00885
00886 for (int i = 0; i < 2; ++i) {
00887 if (i == 0 && inst != DDS::HANDLE_NIL) continue;
00888
00889 DataSampleHeader header;
00890 const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
00891 header.message_id_ = static_cast<char>(msg);
00892 bool just_registered;
00893 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
00894 store_instance_data(move(data), header, instance, just_registered, filtered);
00895 if (instance) inst = instance->instance_handle_;
00896 }
00897
00898 if (!filtered) {
00899 if (view == DDS::NOT_NEW_VIEW_STATE) {
00900 if (instance) instance->instance_state_.accessed();
00901 }
00902 notify_read_conditions();
00903 }
00904 return inst;
00905 }
00906
00907 void set_instance_state(DDS::InstanceHandle_t instance,
00908 DDS::InstanceStateKind state)
00909 {
00910 using namespace OpenDDS::DCPS;
00911 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00912
00913 SubscriptionInstance_rch si = get_handle_instance(instance);
00914 if (si && state != DDS::ALIVE_INSTANCE_STATE) {
00915 DataSampleHeader header;
00916 const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
00917 ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
00918 header.message_id_ = static_cast<char>(msg);
00919 bool just_registered, filtered;
00920 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
00921 get_key_value(*data, instance);
00922 store_instance_data(move(data), header, si, just_registered, filtered);
00923 if (!filtered)
00924 {
00925 notify_read_conditions();
00926 }
00927 }
00928 }
00929
00930 virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00931 OpenDDS::DCPS::SubscriptionInstance_rch& instance)
00932 {
00933
00934
00935 MessageType data;
00936
00937 const bool cdr = sample.header_.cdr_encapsulation_;
00938
00939 OpenDDS::DCPS::Serializer ser(
00940 sample.sample_.get(),
00941 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00942 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00943 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00944
00945 if (cdr) {
00946 ACE_CDR::ULong header;
00947 if (!(ser >> header)) {
00948 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00949 ACE_TEXT("deserialization header failed.\n"),
00950 TraitsType::type_name()));
00951 return;
00952 }
00953
00954 if (Serializer::use_rti_serialization()) {
00955
00956 ser.reset_alignment();
00957 }
00958 }
00959
00960 if (sample.header_.key_fields_only_) {
00961 ser >> OpenDDS::DCPS::KeyOnly< MessageType>(data);
00962 } else {
00963 ser >> data;
00964 }
00965
00966 if (!ser.good_bit()) {
00967 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
00968 ACE_TEXT("deserialization failed.\n"),
00969 TraitsType::type_name()));
00970 return;
00971 }
00972
00973 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
00974 typename InstanceMap::const_iterator const it = instance_map_.find(data);
00975 if (it != instance_map_.end()) {
00976 handle = it->second;
00977 }
00978
00979 if (handle == DDS::HANDLE_NIL) {
00980 instance.reset();
00981 } else {
00982 instance = get_handle_instance(handle);
00983 }
00984 }
00985
00986 virtual void qos_change(const DDS::DataReaderQos& qos)
00987 {
00988
00989 if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00990 if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
00991 const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
00992 if (qos_.time_based_filter.minimum_separation != zero) {
00993 if (qos.time_based_filter.minimum_separation != zero) {
00994 const ACE_Time_Value new_interval = duration_to_time_value(qos.time_based_filter.minimum_separation);
00995 filter_delayed_handler_->reset_interval(new_interval);
00996 } else {
00997 filter_delayed_handler_->cancel();
00998 }
00999 }
01000
01001 }
01002
01003 }
01004
01005 DataReaderImpl::qos_change(qos);
01006 }
01007
01008 protected:
01009
01010 virtual void dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample& sample,
01011 OpenDDS::DCPS::SubscriptionInstance_rch& instance,
01012 bool & just_registered,
01013 bool & filtered,
01014 OpenDDS::DCPS::MarshalingType marshaling_type)
01015 {
01016 unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
01017 const bool cdr = sample.header_.cdr_encapsulation_;
01018
01019 OpenDDS::DCPS::Serializer ser(
01020 sample.sample_.get(),
01021 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01022 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE);
01023
01024 if (cdr) {
01025 ACE_CDR::ULong header;
01026 if (!(ser >> header)) {
01027 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01028 ACE_TEXT("deserialization header failed, dropping sample.\n"),
01029 TraitsType::type_name()));
01030 return;
01031 }
01032
01033 if (Serializer::use_rti_serialization()) {
01034
01035 ser.reset_alignment();
01036 }
01037 }
01038
01039 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
01040 ser >> OpenDDS::DCPS::KeyOnly< MessageType>(*data);
01041 } else {
01042 ser >> *data;
01043 }
01044
01045 if (!ser.good_bit()) {
01046 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
01047 ACE_TEXT("deserialization failed, dropping sample.\n"),
01048 TraitsType::type_name()));
01049 return;
01050 }
01051
01052 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01053 if (!sample.header_.content_filter_) {
01054 using OpenDDS::DCPS::ContentFilteredTopicImpl;
01055 if (content_filtered_topic_) {
01056 if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA
01057 && !content_filtered_topic_->filter(static_cast<MessageType&>(*data))) {
01058 filtered = true;
01059 return;
01060 }
01061 }
01062 }
01063 #endif
01064
01065 store_instance_data(move(data), sample.header_, instance, just_registered, filtered);
01066 }
01067
01068 virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample& sample,
01069 OpenDDS::DCPS::SubscriptionInstance_rch& instance)
01070 {
01071
01072
01073
01074
01075
01076
01077 bool just_registered = false;
01078 bool filtered = false;
01079 OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
01080 if (sample.header_.key_fields_only_) {
01081 marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
01082 }
01083 this->dds_demarshal(sample, instance, just_registered, filtered, marshaling);
01084 }
01085
01086 virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
01087 {
01088 filter_delayed_handler_->drop_sample(instance->instance_handle_);
01089
01090
01091 instance->instance_state_.cancel_release();
01092
01093 while (instance->rcvd_samples_.size_ > 0)
01094 {
01095 OpenDDS::DCPS::ReceivedDataElement* head =
01096 instance->rcvd_samples_.remove_head();
01097 head->dec_ref();
01098 }
01099 }
01100
01101 virtual void release_instance_i (DDS::InstanceHandle_t handle)
01102 {
01103 typename InstanceMap::iterator const the_end = instance_map_.end ();
01104 typename InstanceMap::iterator it = instance_map_.begin ();
01105 while (it != the_end)
01106 {
01107 if (it->second == handle)
01108 {
01109 typename InstanceMap::iterator curIt = it;
01110 ++ it;
01111 instance_map_.erase (curIt);
01112 }
01113 else
01114 ++ it;
01115 }
01116 }
01117
01118 private:
01119
01120 DDS::ReturnCode_t read_i (
01121 MessageSequenceType & received_data,
01122 DDS::SampleInfoSeq & info_seq,
01123 ::CORBA::Long max_samples,
01124 DDS::SampleStateMask sample_states,
01125 DDS::ViewStateMask view_states,
01126 DDS::InstanceStateMask instance_states,
01127 #ifndef OPENDDS_NO_QUERY_CONDITION
01128 DDS::QueryCondition_ptr a_condition)
01129 #else
01130 int ignored)
01131 #endif
01132 {
01133 #ifdef OPENDDS_NO_QUERY_CONDITION
01134 ACE_UNUSED_ARG(ignored);
01135 #endif
01136
01137 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01138
01139 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01140 if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01141 && ! this->coherent_) {
01142 return DDS::RETCODE_PRECONDITION_NOT_MET;
01143 }
01144
01145 bool group_coherent_ordered
01146 = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01147 && this->subqos_.presentation.coherent_access
01148 && this->subqos_.presentation.ordered_access;
01149
01150 if (group_coherent_ordered && this->coherent_) {
01151 max_samples = 1;
01152 }
01153 #endif
01154
01155 OpenDDS::DCPS::RakeResults< MessageSequenceType >
01156 results(this, received_data, info_seq, max_samples,
01157 this->subqos_.presentation,
01158 #ifndef OPENDDS_NO_QUERY_CONDITION
01159 a_condition,
01160 #endif
01161 OpenDDS::DCPS::DDS_OPERATION_READ);
01162
01163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01164 if (! group_coherent_ordered) {
01165 #endif
01166 for (typename InstanceMap::iterator it = instance_map_.begin(),
01167 the_end = instance_map_.end(); it != the_end; ++it)
01168 {
01169 DDS::InstanceHandle_t handle = it->second;
01170
01171 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01172
01173 if ((inst->instance_state_.view_state() & view_states) &&
01174 (inst->instance_state_.instance_state() & instance_states))
01175 {
01176 size_t i(0);
01177 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01178 item != 0; item = item->next_data_sample_)
01179 {
01180 if (item->sample_state_ & sample_states
01181 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01182 && !item->coherent_change_
01183 #endif
01184 )
01185 {
01186 results.insert_sample(item, inst, ++i);
01187 }
01188 }
01189 }
01190 }
01191 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01192 }
01193 else {
01194 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01195 results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01196 }
01197 #endif
01198
01199 results.copy_to_user();
01200
01201 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01202 if (received_data.length())
01203 {
01204 ret = DDS::RETCODE_OK;
01205 if (received_data.maximum() == 0)
01206 {
01207 received_data_p.set_loaner(this);
01208 }
01209 }
01210
01211 post_read_or_take();
01212
01213 return ret;
01214 }
01215
01216 DDS::ReturnCode_t take_i (
01217 MessageSequenceType & received_data,
01218 DDS::SampleInfoSeq & info_seq,
01219 ::CORBA::Long max_samples,
01220 DDS::SampleStateMask sample_states,
01221 DDS::ViewStateMask view_states,
01222 DDS::InstanceStateMask instance_states,
01223 #ifndef OPENDDS_NO_QUERY_CONDITION
01224 DDS::QueryCondition_ptr a_condition)
01225 #else
01226 int ignored)
01227 #endif
01228 {
01229 #ifdef OPENDDS_NO_QUERY_CONDITION
01230 ACE_UNUSED_ARG(ignored);
01231 #endif
01232
01233 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01234
01235 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01236 if (this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01237 && ! this->coherent_) {
01238 return DDS::RETCODE_PRECONDITION_NOT_MET;
01239 }
01240
01241 bool group_coherent_ordered
01242 = this->subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
01243 && this->subqos_.presentation.coherent_access
01244 && this->subqos_.presentation.ordered_access;
01245
01246 if (group_coherent_ordered && this->coherent_) {
01247 max_samples = 1;
01248 }
01249 #endif
01250
01251 OpenDDS::DCPS::RakeResults< MessageSequenceType >
01252 results(this, received_data, info_seq, max_samples,
01253 this->subqos_.presentation,
01254 #ifndef OPENDDS_NO_QUERY_CONDITION
01255 a_condition,
01256 #endif
01257 OpenDDS::DCPS::DDS_OPERATION_TAKE);
01258
01259 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01260 if (! group_coherent_ordered) {
01261 #endif
01262
01263 for (typename InstanceMap::iterator it = instance_map_.begin(),
01264 the_end = instance_map_.end(); it != the_end; ++it)
01265 {
01266 DDS::InstanceHandle_t handle = it->second;
01267
01268 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(handle);
01269
01270 if ((inst->instance_state_.view_state() & view_states) &&
01271 (inst->instance_state_.instance_state() & instance_states))
01272 {
01273 size_t i(0);
01274 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01275 item != 0; item = item->next_data_sample_)
01276 {
01277 if (item->sample_state_ & sample_states
01278 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01279 && !item->coherent_change_
01280 #endif
01281 )
01282 {
01283 results.insert_sample(item, inst, ++i);
01284 }
01285 }
01286 }
01287 }
01288 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01289 }
01290 else {
01291 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01292 results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01293 }
01294 #endif
01295
01296 results.copy_to_user();
01297
01298 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01299 if (received_data.length())
01300 {
01301 ret = DDS::RETCODE_OK;
01302 if (received_data.maximum() == 0)
01303 {
01304 received_data_p.set_loaner(this);
01305 }
01306 }
01307
01308 post_read_or_take();
01309 return ret;
01310 }
01311
01312 DDS::ReturnCode_t read_instance_i (
01313 MessageSequenceType & received_data,
01314 DDS::SampleInfoSeq & info_seq,
01315 ::CORBA::Long max_samples,
01316 DDS::InstanceHandle_t a_handle,
01317 DDS::SampleStateMask sample_states,
01318 DDS::ViewStateMask view_states,
01319 DDS::InstanceStateMask instance_states,
01320 #ifndef OPENDDS_NO_QUERY_CONDITION
01321 DDS::QueryCondition_ptr a_condition)
01322 #else
01323 int ignored)
01324 #endif
01325 {
01326 #ifdef OPENDDS_NO_QUERY_CONDITION
01327 ACE_UNUSED_ARG(ignored);
01328 #endif
01329
01330 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01331
01332 OpenDDS::DCPS::RakeResults< MessageSequenceType >
01333 results(this, received_data, info_seq, max_samples,
01334 this->subqos_.presentation,
01335 #ifndef OPENDDS_NO_QUERY_CONDITION
01336 a_condition,
01337 #endif
01338 OpenDDS::DCPS::DDS_OPERATION_READ);
01339
01340 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01341 if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01342
01343 InstanceState& state_obj = inst->instance_state_;
01344 bool valid_view_state = state_obj.view_state() & view_states;
01345 bool valid_instance_state = state_obj.instance_state() & instance_states;
01346 if (valid_view_state && valid_instance_state)
01347 {
01348 size_t i(0);
01349 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01350 item; item = item->next_data_sample_)
01351 {
01352 if (item->sample_state_ & sample_states
01353 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01354 && !item->coherent_change_
01355 #endif
01356 )
01357 {
01358 results.insert_sample(item, inst, ++i);
01359 }
01360 }
01361 }
01362 else
01363 {
01364 if (OpenDDS::DCPS::DCPS_debug_level >= 8) {
01365 OPENDDS_STRING msg;
01366 if (!valid_view_state) {
01367 msg += "view state is not valid";
01368 if (!valid_instance_state) {
01369 msg += " and ";
01370 }
01371 }
01372 if (!valid_instance_state) {
01373 msg = msg
01374 + "instance state is "
01375 + state_obj.instance_state_string()
01376 + " while the validity mask is "
01377 + InstanceState::instance_state_string(instance_states);
01378 }
01379 GuidConverter conv(get_subscription_id());
01380 ACE_DEBUG((LM_DEBUG,
01381 ACE_TEXT(
01382 "(%P|%t) DataReaderImpl_T::read_instance_i: "
01383 "will return no data reading sub %C because:\n %C\n"
01384 ),
01385 OPENDDS_STRING(conv).c_str(), msg.c_str()
01386 ));
01387 }
01388 }
01389
01390 results.copy_to_user();
01391
01392 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01393 if (received_data.length())
01394 {
01395 ret = DDS::RETCODE_OK;
01396 if (received_data.maximum() == 0)
01397 {
01398 received_data_p.set_loaner(this);
01399 }
01400 }
01401
01402 post_read_or_take();
01403 return ret;
01404 }
01405
01406 DDS::ReturnCode_t take_instance_i (
01407 MessageSequenceType & received_data,
01408 DDS::SampleInfoSeq & info_seq,
01409 ::CORBA::Long max_samples,
01410 DDS::InstanceHandle_t a_handle,
01411 DDS::SampleStateMask sample_states,
01412 DDS::ViewStateMask view_states,
01413 DDS::InstanceStateMask instance_states,
01414 #ifndef OPENDDS_NO_QUERY_CONDITION
01415 DDS::QueryCondition_ptr a_condition)
01416 #else
01417 int ignored)
01418 #endif
01419 {
01420 #ifdef OPENDDS_NO_QUERY_CONDITION
01421 ACE_UNUSED_ARG(ignored);
01422 #endif
01423
01424 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01425
01426 OpenDDS::DCPS::RakeResults< MessageSequenceType >
01427 results(this, received_data, info_seq, max_samples,
01428 this->subqos_.presentation,
01429 #ifndef OPENDDS_NO_QUERY_CONDITION
01430 a_condition,
01431 #endif
01432 OpenDDS::DCPS::DDS_OPERATION_TAKE);
01433
01434 OpenDDS::DCPS::SubscriptionInstance_rch inst = get_handle_instance(a_handle);
01435 if (!inst) return DDS::RETCODE_BAD_PARAMETER;
01436
01437 if ((inst->instance_state_.view_state() & view_states) &&
01438 (inst->instance_state_.instance_state() & instance_states))
01439 {
01440 size_t i(0);
01441 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01442 item; item = item->next_data_sample_)
01443 {
01444 if (item->sample_state_ & sample_states
01445 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01446 && !item->coherent_change_
01447 #endif
01448 )
01449 {
01450 results.insert_sample(item, inst, ++i);
01451 }
01452 }
01453 }
01454
01455 results.copy_to_user();
01456
01457 DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
01458 if (received_data.length())
01459 {
01460 ret = DDS::RETCODE_OK;
01461 if (received_data.maximum() == 0)
01462 {
01463 received_data_p.set_loaner(this);
01464 }
01465 }
01466
01467 post_read_or_take();
01468 return ret;
01469 }
01470
01471 DDS::ReturnCode_t read_next_instance_i (
01472 MessageSequenceType & received_data,
01473 DDS::SampleInfoSeq & info_seq,
01474 ::CORBA::Long max_samples,
01475 DDS::InstanceHandle_t a_handle,
01476 DDS::SampleStateMask sample_states,
01477 DDS::ViewStateMask view_states,
01478 DDS::InstanceStateMask instance_states,
01479 #ifndef OPENDDS_NO_QUERY_CONDITION
01480 DDS::QueryCondition_ptr a_condition)
01481 #else
01482 int ignored)
01483 #endif
01484 {
01485 #ifdef OPENDDS_NO_QUERY_CONDITION
01486 ACE_UNUSED_ARG(ignored);
01487 #endif
01488
01489 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01490
01491 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01492 guard,
01493 this->sample_lock_,
01494 DDS::RETCODE_ERROR);
01495
01496 typename InstanceMap::iterator it;
01497 typename InstanceMap::iterator const the_end = instance_map_.end ();
01498
01499 if (a_handle == DDS::HANDLE_NIL)
01500 {
01501 it = instance_map_.begin ();
01502 }
01503 else
01504 {
01505 for (it = instance_map_.begin ();
01506 it != the_end;
01507 ++it)
01508 {
01509 if (a_handle == it->second)
01510 {
01511 ++it;
01512 break;
01513 }
01514 }
01515 }
01516
01517 for (; it != the_end; ++it)
01518 {
01519 handle = it->second;
01520 DDS::ReturnCode_t const status =
01521 read_instance_i(received_data, info_seq, max_samples, handle,
01522 sample_states, view_states, instance_states,
01523 #ifndef OPENDDS_NO_QUERY_CONDITION
01524 a_condition);
01525 #else
01526 0);
01527 #endif
01528 if (status != DDS::RETCODE_NO_DATA)
01529 {
01530 post_read_or_take();
01531 return status;
01532 }
01533 }
01534
01535 post_read_or_take();
01536 return DDS::RETCODE_NO_DATA;
01537 }
01538
01539 DDS::ReturnCode_t take_next_instance_i (
01540 MessageSequenceType & received_data,
01541 DDS::SampleInfoSeq & info_seq,
01542 ::CORBA::Long max_samples,
01543 DDS::InstanceHandle_t a_handle,
01544 DDS::SampleStateMask sample_states,
01545 DDS::ViewStateMask view_states,
01546 DDS::InstanceStateMask instance_states,
01547 #ifndef OPENDDS_NO_QUERY_CONDITION
01548 DDS::QueryCondition_ptr a_condition)
01549 #else
01550 int ignored)
01551 #endif
01552 {
01553 #ifdef OPENDDS_NO_QUERY_CONDITION
01554 ACE_UNUSED_ARG(ignored);
01555 #endif
01556
01557 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01558
01559 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01560 guard,
01561 this->sample_lock_,
01562 DDS::RETCODE_ERROR);
01563
01564 typename InstanceMap::iterator it;
01565 typename InstanceMap::iterator const the_end = instance_map_.end ();
01566
01567 if (a_handle == DDS::HANDLE_NIL)
01568 {
01569 it = instance_map_.begin ();
01570 }
01571 else
01572 {
01573 for (it = instance_map_.begin (); it != the_end; ++it)
01574 {
01575 if (a_handle == it->second)
01576 {
01577 ++it;
01578 break;
01579 }
01580 }
01581 }
01582
01583 for (; it != the_end; ++it)
01584 {
01585 handle = it->second;
01586 DDS::ReturnCode_t const status =
01587 take_instance_i(received_data, info_seq, max_samples, handle,
01588 sample_states, view_states, instance_states,
01589 #ifndef OPENDDS_NO_QUERY_CONDITION
01590 a_condition);
01591 #else
01592 0);
01593 #endif
01594 if (status != DDS::RETCODE_NO_DATA)
01595 {
01596 total_samples();
01597 post_read_or_take();
01598 return status;
01599 }
01600 }
01601 post_read_or_take();
01602 return DDS::RETCODE_NO_DATA;
01603 }
01604
01605 void store_instance_data(
01606 unique_ptr<MessageTypeWithAllocator> instance_data,
01607 const OpenDDS::DCPS::DataSampleHeader& header,
01608 OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr,
01609 bool & just_registered,
01610 bool & filtered)
01611 {
01612 const bool is_dispose_msg =
01613 header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
01614 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01615 const bool is_unregister_msg =
01616 header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
01617 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01618
01619
01620 filtered = false;
01621
01622 DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
01623
01624
01625
01626
01627 typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
01628
01629 if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end())
01630 {
01631 return;
01632 }
01633
01634
01635 if (it == instance_map_.end())
01636 {
01637 std::size_t instances_size = 0;
01638 {
01639 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01640 instances_size = instances_.size();
01641 }
01642 if ((this->qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
01643 ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances))
01644 {
01645 DDS::DataReaderListener_var listener
01646 = listener_for (DDS::SAMPLE_REJECTED_STATUS);
01647
01648 set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
01649
01650 sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
01651 ++sample_rejected_status_.total_count;
01652 ++sample_rejected_status_.total_count_change;
01653 sample_rejected_status_.last_instance_handle = handle;
01654
01655 if (!CORBA::is_nil(listener.in()))
01656 {
01657 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01658
01659 listener->on_sample_rejected(this, sample_rejected_status_);
01660 sample_rejected_status_.total_count_change = 0;
01661 }
01662 notify_status_condition_no_sample_lock();
01663
01664 return;
01665 }
01666
01667 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01668 SharedInstanceMap_rch inst;
01669 bool new_handle = true;
01670 if (this->is_exclusive_ownership_) {
01671 OwnershipManagerPtr owner_manager = this->ownership_manager();
01672
01673 if (!owner_manager || owner_manager->instance_lock_acquire () != 0) {
01674 ACE_ERROR ((LM_ERROR,
01675 ACE_TEXT("(%P|%t) ")
01676 ACE_TEXT("%CDataReaderImpl::")
01677 ACE_TEXT("store_instance_data, ")
01678 ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name()));
01679 return;
01680 }
01681
01682 inst = dynamic_rchandle_cast<SharedInstanceMap>(
01683 owner_manager->get_instance_map(this->topic_servant_->type_name(), this));
01684 if (inst != 0) {
01685 typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
01686 if (iter != inst->end ()) {
01687 handle = iter->second;
01688 new_handle = false;
01689 }
01690 }
01691 }
01692 #endif
01693
01694 just_registered = true;
01695 DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
01696 handle = handle == DDS::HANDLE_NIL ? this->get_next_handle( key) : handle;
01697 OpenDDS::DCPS::SubscriptionInstance_rch instance =
01698 OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
01699 this,
01700 this->qos_,
01701 ref(this->instances_lock_),
01702 handle);
01703
01704 instance->instance_handle_ = handle;
01705
01706 {
01707 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
01708 int ret = OpenDDS::DCPS::bind(instances_, handle, instance);
01709
01710 if (ret != 0)
01711 {
01712 ACE_ERROR ((LM_ERROR,
01713 ACE_TEXT("(%P|%t) ")
01714 ACE_TEXT("%CDataReaderImpl::")
01715 ACE_TEXT("store_instance_data, ")
01716 ACE_TEXT("insert handle failed. \n"), TraitsType::type_name()));
01717 return;
01718 }
01719 }
01720
01721 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01722 OwnershipManagerPtr owner_manager = this->ownership_manager();
01723
01724 if (owner_manager) {
01725 if (!inst) {
01726 inst = make_rch<SharedInstanceMap>();
01727 owner_manager->set_instance_map(
01728 this->topic_servant_->type_name(),
01729 inst,
01730 this);
01731 }
01732
01733 if (new_handle) {
01734 std::pair<typename InstanceMap::iterator, bool> bpair =
01735 inst->insert(typename InstanceMap::value_type(*instance_data,
01736 handle));
01737 if (bpair.second == false)
01738 {
01739 ACE_ERROR ((LM_ERROR,
01740 ACE_TEXT("(%P|%t) ")
01741 ACE_TEXT("%CDataReaderImpl::")
01742 ACE_TEXT("store_instance_data, ")
01743 ACE_TEXT("insert to participant scope %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01744 return;
01745 }
01746 }
01747
01748 if (owner_manager->instance_lock_release () != 0) {
01749 ACE_ERROR ((LM_ERROR,
01750 ACE_TEXT("(%P|%t) ")
01751 ACE_TEXT("%CDataReaderImpl::")
01752 ACE_TEXT("store_instance_data, ")
01753 ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name()));
01754 return;
01755 }
01756 }
01757 #endif
01758
01759 std::pair<typename InstanceMap::iterator, bool> bpair =
01760 instance_map_.insert(typename InstanceMap::value_type(*instance_data,
01761 handle));
01762 if (bpair.second == false)
01763 {
01764 ACE_ERROR ((LM_ERROR,
01765 ACE_TEXT("(%P|%t) ")
01766 ACE_TEXT("%CDataReaderImpl::")
01767 ACE_TEXT("store_instance_data, ")
01768 ACE_TEXT("insert %C failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01769 return;
01770 }
01771 }
01772 else
01773 {
01774 just_registered = false;
01775 handle = it->second;
01776 }
01777
01778 if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
01779 {
01780 instance_ptr = get_handle_instance(handle);
01781
01782 if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
01783 {
01784 filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
01785
01786 ACE_Time_Value filter_time_expired;
01787 if (!filtered &&
01788 time_based_filter_instance(instance_ptr, filter_time_expired)) {
01789 filtered = true;
01790 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01791 filter_delayed_handler_->delay_sample(handle, move(instance_data), header, just_registered, filter_time_expired);
01792
01793 }
01794 } else {
01795
01796 filter_delayed_handler_->clear_sample(handle);
01797
01798 }
01799
01800 if (filtered)
01801 {
01802 return;
01803 }
01804 }
01805
01806 finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
01807 }
01808 else
01809 {
01810 instance_ptr = this->get_handle_instance(handle);
01811 instance_ptr->instance_state_.lively(header.publication_id_);
01812 }
01813 }
01814
01815 void finish_store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data, const DataSampleHeader& header,
01816 SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg )
01817 {
01818 if ((this->qos_.resource_limits.max_samples_per_instance !=
01819 DDS::LENGTH_UNLIMITED) &&
01820 (instance_ptr->rcvd_samples_.size_ >=
01821 this->qos_.resource_limits.max_samples_per_instance)) {
01822
01823
01824
01825
01826
01827
01828 if (!is_dispose_msg && !is_unregister_msg
01829 && instance_ptr->rcvd_samples_.head_->sample_state_
01830 == DDS::NOT_READ_SAMPLE_STATE)
01831 {
01832
01833
01834
01835
01836 DDS::DataReaderListener_var listener
01837 = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01838
01839 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01840
01841 sample_rejected_status_.last_reason =
01842 DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
01843 ++sample_rejected_status_.total_count;
01844 ++sample_rejected_status_.total_count_change;
01845 sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01846
01847 if (!CORBA::is_nil(listener.in()))
01848 {
01849 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01850
01851 listener->on_sample_rejected(this, sample_rejected_status_);
01852 sample_rejected_status_.total_count_change = 0;
01853 }
01854 notify_status_condition_no_sample_lock();
01855 return;
01856 }
01857 else if (!is_dispose_msg && !is_unregister_msg)
01858 {
01859
01860 OpenDDS::DCPS::ReceivedDataElement *item =
01861 instance_ptr->rcvd_samples_.head_;
01862 instance_ptr->rcvd_samples_.remove(item);
01863 item->dec_ref();
01864 }
01865 }
01866 else if (this->qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
01867 {
01868 CORBA::Long total_samples = 0;
01869 {
01870 ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01871 for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
01872 iter != instances_.end();
01873 ++iter) {
01874 OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
01875
01876 total_samples += (CORBA::Long) ptr->rcvd_samples_.size_;
01877 }
01878 }
01879
01880 if (total_samples >= this->qos_.resource_limits.max_samples)
01881 {
01882
01883
01884
01885
01886
01887 if (!is_dispose_msg && !is_unregister_msg
01888 && instance_ptr->rcvd_samples_.head_->sample_state_
01889 == DDS::NOT_READ_SAMPLE_STATE)
01890 {
01891
01892
01893
01894
01895 DDS::DataReaderListener_var listener
01896 = listener_for(DDS::SAMPLE_REJECTED_STATUS);
01897
01898 set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
01899
01900 sample_rejected_status_.last_reason =
01901 DDS::REJECTED_BY_SAMPLES_LIMIT;
01902 ++sample_rejected_status_.total_count;
01903 ++sample_rejected_status_.total_count_change;
01904 sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
01905 if (!CORBA::is_nil(listener.in()))
01906 {
01907 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01908
01909 listener->on_sample_rejected(this, sample_rejected_status_);
01910 sample_rejected_status_.total_count_change = 0;
01911 }
01912 notify_status_condition_no_sample_lock();
01913
01914 return;
01915 }
01916 else if (!is_dispose_msg && !is_unregister_msg)
01917 {
01918
01919 OpenDDS::DCPS::ReceivedDataElement *item =
01920 instance_ptr->rcvd_samples_.head_;
01921 instance_ptr->rcvd_samples_.remove(item);
01922 item->dec_ref();
01923 }
01924 }
01925 }
01926
01927 if (is_dispose_msg || is_unregister_msg)
01928 {
01929 instance_data.reset();
01930 }
01931
01932 bool event_notify = false;
01933
01934 if (is_dispose_msg) {
01935 event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_);
01936 }
01937
01938 if (is_unregister_msg) {
01939 if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) {
01940 event_notify = true;
01941 }
01942 }
01943
01944 if (!is_dispose_msg && !is_unregister_msg) {
01945 event_notify = true;
01946 instance_ptr->instance_state_.data_was_received(header.publication_id_);
01947 }
01948
01949 if (!event_notify) {
01950 return;
01951 }
01952
01953 OpenDDS::DCPS::ReceivedDataElement *ptr =
01954 new (*rd_allocator_.get()) OpenDDS::DCPS::ReceivedDataElementWithType<MessageTypeWithAllocator>(header,instance_data.release(), &this->sample_lock_);
01955
01956 ptr->disposed_generation_count_ =
01957 instance_ptr->instance_state_.disposed_generation_count();
01958 ptr->no_writers_generation_count_ =
01959 instance_ptr->instance_state_.no_writers_generation_count();
01960
01961 instance_ptr->last_sequence_ = header.sequence_;
01962
01963 instance_ptr->rcvd_strategy_->add(ptr);
01964
01965 if (! is_dispose_msg && ! is_unregister_msg
01966 && instance_ptr->rcvd_samples_.size_ > get_depth())
01967 {
01968 OpenDDS::DCPS::ReceivedDataElement* head_ptr =
01969 instance_ptr->rcvd_samples_.head_;
01970
01971 instance_ptr->rcvd_samples_.remove(head_ptr);
01972
01973 if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
01974 {
01975 DDS::DataReaderListener_var listener
01976 = listener_for (DDS::SAMPLE_LOST_STATUS);
01977
01978 ++sample_lost_status_.total_count;
01979 ++sample_lost_status_.total_count_change;
01980
01981 set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
01982
01983 if (!CORBA::is_nil(listener.in()))
01984 {
01985 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01986
01987 listener->on_sample_lost(this, sample_lost_status_);
01988
01989 sample_lost_status_.total_count_change = 0;
01990 }
01991
01992 notify_status_condition_no_sample_lock();
01993 }
01994
01995 head_ptr->dec_ref();
01996 }
01997
01998 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01999 if (! ptr->coherent_change_) {
02000 #endif
02001 RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant ();
02002 if (!sub)
02003 return;
02004
02005 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true);
02006
02007 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
02008
02009 DDS::SubscriberListener_var sub_listener =
02010 sub->listener_for(DDS::DATA_ON_READERS_STATUS);
02011 if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_)
02012 {
02013 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02014
02015 sub_listener->on_data_on_readers(sub.in());
02016 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02017 }
02018 else
02019 {
02020 sub->notify_status_condition();
02021
02022 DDS::DataReaderListener_var listener =
02023 listener_for (DDS::DATA_AVAILABLE_STATUS);
02024
02025 if (!CORBA::is_nil(listener.in()))
02026 {
02027 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02028
02029 listener->on_data_available(this);
02030 set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
02031 sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
02032 }
02033 else
02034 {
02035 notify_status_condition_no_sample_lock();
02036 }
02037 }
02038 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02039 }
02040 #endif
02041 }
02042
02043
02044
02045
02046 void notify_status_condition_no_sample_lock()
02047 {
02048
02049
02050
02051
02052
02053
02054
02055
02056
02057
02058
02059
02060
02061 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
02062 notify_status_condition();
02063 }
02064
02065
02066
02067 DDS::ReturnCode_t check_inputs (
02068 const char* method_name,
02069 MessageSequenceType & received_data,
02070 DDS::SampleInfoSeq & info_seq,
02071 ::CORBA::Long max_samples)
02072 {
02073 typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data);
02074
02075
02076
02077
02078
02079
02080
02081
02082
02083
02084 if (received_data.length() != info_seq.length())
02085 {
02086 ACE_DEBUG((LM_DEBUG,
02087 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02088 ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
02089 ACE_TEXT("sequences do not match.\n"),
02090 TraitsType::type_name(),
02091 method_name ));
02092 return DDS::RETCODE_PRECONDITION_NOT_MET;
02093 }
02094
02095
02096 if ((received_data.maximum() > 0) && (received_data.release() == false))
02097 {
02098 ACE_DEBUG((LM_DEBUG,
02099 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02100 ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
02101 ACE_TEXT("maximum %d and owns %d\n"),
02102 TraitsType::type_name(),
02103 method_name,
02104 received_data.maximum(),
02105 received_data.release() ));
02106
02107 return DDS::RETCODE_PRECONDITION_NOT_MET;
02108 }
02109
02110 if (received_data.maximum() == 0)
02111 {
02112
02113 if (max_samples == DDS::LENGTH_UNLIMITED)
02114 {
02115 max_samples =
02116 static_cast< ::CORBA::Long> (received_data_p.max_slots());
02117 }
02118 }
02119 else
02120 {
02121 if (max_samples == DDS::LENGTH_UNLIMITED)
02122 {
02123
02124 max_samples = received_data.maximum();
02125 }
02126 else if (
02127 max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
02128 {
02129
02130 ACE_DEBUG((LM_DEBUG,
02131 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
02132 ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
02133 TraitsType::type_name(),
02134 method_name,
02135 max_samples,
02136 received_data.maximum()));
02137 return DDS::RETCODE_PRECONDITION_NOT_MET;
02138 }
02139
02140
02141 }
02142
02143
02144
02145 if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
02146 {
02147 max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
02148 }
02149
02150
02151 return DDS::RETCODE_OK;
02152 }
02153
02154 class FilterDelayedHandler : public Watchdog {
02155 public:
02156 FilterDelayedHandler(DataReaderImpl_T<MessageType>& data_reader_impl)
02157
02158 : Watchdog(ACE_Time_Value(0))
02159 , data_reader_impl_(data_reader_impl)
02160 {
02161 }
02162
02163 virtual ~FilterDelayedHandler()
02164 {
02165 }
02166
02167 void cancel()
02168 {
02169 cancel_all();
02170 cleanup();
02171 }
02172
02173 void delay_sample(DDS::InstanceHandle_t handle,
02174 unique_ptr<MessageTypeWithAllocator> data,
02175 const OpenDDS::DCPS::DataSampleHeader& header,
02176 const bool just_registered,
02177 const ACE_Time_Value& filter_time_expired)
02178 {
02179
02180 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02181
02182 if (!data_reader_impl) {
02183 return;
02184 }
02185
02186 MessageTypeWithAllocator* instance_data = data.get();
02187
02188 DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header));
02189
02190 typename FilterDelayedSampleMap::iterator i = map_.find(handle);
02191 if (i == map_.end()) {
02192
02193
02194
02195 std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
02196 #ifdef ACE_HAS_CPP11
02197 map_.emplace(std::piecewise_construct,
02198 std::forward_as_tuple(handle),
02199 std::forward_as_tuple(move(data), hdr, just_registered));
02200 #else
02201 map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
02202 #endif
02203 FilterDelayedSample& sample = result.first->second;
02204
02205 const ACE_Time_Value interval = duration_to_time_value(
02206 data_reader_impl->qos_.time_based_filter.minimum_separation);
02207
02208 const ACE_Time_Value filter_time_remaining = duration_to_time_value(
02209 data_reader_impl->qos_.time_based_filter.minimum_separation) - filter_time_expired;
02210
02211 long timer_id = -1;
02212
02213 {
02214 ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02215 timer_id = schedule_timer(reinterpret_cast<const void*>(intptr_t(handle)),
02216 filter_time_remaining, interval);
02217 }
02218
02219
02220 if (instance_data == sample.message.get()) {
02221 sample.timer_id = timer_id;
02222 }
02223 } else {
02224 FilterDelayedSample& sample = i->second;
02225
02226
02227 sample.message = move(data);
02228 sample.header = hdr;
02229 sample.new_instance = just_registered;
02230
02231 }
02232 }
02233
02234 void clear_sample(DDS::InstanceHandle_t handle)
02235 {
02236
02237
02238 typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02239 if (sample != map_.end()) {
02240
02241 sample->second.message.reset();
02242 }
02243 }
02244
02245 void drop_sample(DDS::InstanceHandle_t handle)
02246 {
02247
02248
02249 typename FilterDelayedSampleMap::iterator sample = map_.find(handle);
02250 if (sample != map_.end()) {
02251 {
02252 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02253 if (data_reader_impl) {
02254 ACE_GUARD(Reverse_Lock_t, unlock_guard, data_reader_impl->reverse_sample_lock_);
02255 cancel_timer(sample->second.timer_id);
02256 }
02257 }
02258
02259
02260 map_.erase(handle);
02261 }
02262 }
02263
02264 private:
02265
02266
02267
02268 int handle_timeout(const ACE_Time_Value&, const void* act)
02269 {
02270 DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
02271
02272 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02273 if (!data_reader_impl)
02274 return -1;
02275
02276 SubscriptionInstance_rch instance = data_reader_impl->get_handle_instance(handle);
02277
02278 if (!instance)
02279 return 0;
02280
02281 long cancel_timer_id = -1;
02282
02283 {
02284 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_, -1);
02285
02286 typename FilterDelayedSampleMap::iterator data = map_.find(handle);
02287 if (data == map_.end()) {
02288 return 0;
02289 }
02290
02291 if (data->second.message) {
02292 const bool NOT_DISPOSE_MSG = false;
02293 const bool NOT_UNREGISTER_MSG = false;
02294
02295
02296 instance->last_accepted_ = ACE_OS::gettimeofday();
02297 const DataSampleHeader_ptr header = data->second.header;
02298 const bool new_instance = data->second.new_instance;
02299
02300
02301 data_reader_impl->finish_store_instance_data(
02302 move(data->second.message),
02303 *header,
02304 instance,
02305 NOT_DISPOSE_MSG,
02306 NOT_UNREGISTER_MSG);
02307
02308 data_reader_impl->accept_sample_processing(instance, *header, new_instance);
02309 } else {
02310
02311
02312 const ACE_Time_Value interval = duration_to_time_value(data_reader_impl->qos_.time_based_filter.minimum_separation);
02313 if (ACE_OS::gettimeofday() - instance->last_sample_tv_ >= interval) {
02314
02315 cancel_timer_id = data->second.timer_id;
02316
02317 map_.erase(data);
02318 }
02319 }
02320 }
02321
02322 if (cancel_timer_id != -1) {
02323 cancel_timer(cancel_timer_id);
02324 }
02325 return 0;
02326 }
02327
02328 virtual void reschedule_deadline()
02329 {
02330 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02331
02332 if (data_reader_impl) {
02333 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02334
02335 for (typename FilterDelayedSampleMap::iterator sample = map_.begin(); sample != map_.end(); ++sample) {
02336 reset_timer_interval(sample->second.timer_id);
02337 }
02338 }
02339 }
02340
02341 void cleanup()
02342 {
02343 RcHandle<DataReaderImpl_T<MessageType> > data_reader_impl(data_reader_impl_.lock());
02344 if (data_reader_impl) {
02345 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, data_reader_impl->sample_lock_);
02346
02347 map_.clear();
02348 }
02349 }
02350
02351 WeakRcHandle<DataReaderImpl_T<MessageType> > data_reader_impl_;
02352
02353 typedef ACE_Strong_Bound_Ptr<const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex> DataSampleHeader_ptr;
02354
02355 struct FilterDelayedSample {
02356
02357 FilterDelayedSample(unique_ptr<MessageTypeWithAllocator> msg, DataSampleHeader_ptr hdr, bool new_inst)
02358 : message(move(msg))
02359 , header(hdr)
02360 , new_instance(new_inst)
02361 , timer_id(-1) {
02362 }
02363
02364 container_supported_unique_ptr<MessageTypeWithAllocator> message;
02365 DataSampleHeader_ptr header;
02366 bool new_instance;
02367 long timer_id;
02368 };
02369
02370
02371 typedef OPENDDS_MAP(DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap;
02372
02373 FilterDelayedSampleMap map_;
02374 public:
02375 typedef typename DataReaderImpl_T<MessageType>::DataAllocator DataAllocator;
02376
02377
02378 unique_ptr<DataAllocator> data_allocator_;
02379 };
02380
02381 unique_ptr<DataAllocator>& data_allocator() { return filter_delayed_handler_->data_allocator_; }
02382
02383 RcHandle<FilterDelayedHandler> filter_delayed_handler_;
02384
02385 InstanceMap instance_map_;
02386 };
02387
02388 template <typename MessageType>
02389 void* DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator new(size_t , ACE_New_Allocator& pool)
02390 {
02391 typedef typename DataReaderImpl_T<MessageType>::MessageTypeMemoryBlock MessageTypeMemoryBlock;
02392 MessageTypeMemoryBlock* block =
02393 static_cast<MessageTypeMemoryBlock*>(pool.malloc(sizeof(MessageTypeMemoryBlock)));
02394 block->allocator_ = &pool;
02395 return block;
02396 }
02397
02398 template <typename MessageType>
02399 void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory)
02400 {
02401 if (memory) {
02402 MessageTypeMemoryBlock* block = static_cast<MessageTypeMemoryBlock*>(memory);
02403 block->allocator_->free(block);
02404 }
02405 }
02406
02407 template <typename MessageType>
02408 void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory, ACE_New_Allocator&)
02409 {
02410 operator delete(memory);
02411 }
02412
02413 }
02414 }
02415
02416 OPENDDS_END_VERSIONED_NAMESPACE_DECL
02417
02418 #endif