00001 #ifndef dds_DCPS_DataReaderImpl_T_h
00002 #define dds_DCPS_DataReaderImpl_T_h
00003
00004 #include "dds/DCPS/MultiTopicImpl.h"
00005 #include "dds/DCPS/RakeResults_T.h"
00006 #include "dds/DCPS/SubscriberImpl.h"
00007 #include "dds/DCPS/BuiltInTopicUtils.h"
00008 #include "dds/DCPS/Util.h"
00009 #include "dds/DCPS/TypeSupportImpl.h"
00010 #include "dcps_export.h"
00011
00012 namespace OpenDDS {
00013 namespace DCPS {
00014
00015
00016
00017
00018
00019
00020
00021 template <typename MessageType>
00022 class
00023 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00024 OpenDDS_Dcps_Export
00025 #endif
00026 DataReaderImpl_T
00027 : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataReaderType>,
00028 public virtual OpenDDS::DCPS::DataReaderImpl
00029 {
00030 public:
00031 typedef DDSTraits<MessageType> TraitsType;
00032 typedef typename TraitsType::MessageSequenceType MessageSequenceType;
00033
00034 typedef OPENDDS_MAP_CMP(MessageType, ::DDS::InstanceHandle_t,
00035 typename TraitsType::LessThanType) InstanceMap;
00036 typedef ::OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageType, ACE_Null_Mutex> DataAllocator;
00037
00038 typedef typename TraitsType::DataReaderType Interface;
00039
00040
00041 DataReaderImpl_T (void)
00042 : data_allocator_ (0)
00043 {
00044 }
00045
00046
00047 virtual ~DataReaderImpl_T (void)
00048 {
00049 for (typename InstanceMap::iterator it = instance_map_.begin();
00050 it != instance_map_.end(); ++it)
00051 {
00052 OpenDDS::DCPS::SubscriptionInstance* ptr =
00053 get_handle_instance(it->second);
00054 this->purge_data(ptr);
00055 }
00056
00057 delete data_allocator_;
00058
00059 }
00060
00061
00062
00063
00064
00065 virtual ::DDS::ReturnCode_t enable_specific ()
00066 {
00067 data_allocator_ = new DataAllocator(get_n_chunks ());
00068 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00069 ACE_DEBUG((LM_DEBUG,
00070 ACE_TEXT("(%P|%t) %CDataReaderImpl::")
00071 ACE_TEXT("enable_specific-data")
00072 ACE_TEXT(" Cached_Allocator_With_Overflow ")
00073 ACE_TEXT("%x with %d chunks\n"),
00074 TraitsType::type_name(),
00075 data_allocator_,
00076 this->get_n_chunks ()));
00077
00078 return ::DDS::RETCODE_OK;
00079 }
00080
00081 virtual ::DDS::ReturnCode_t read (
00082 MessageSequenceType & received_data,
00083 ::DDS::SampleInfoSeq & info_seq,
00084 ::CORBA::Long max_samples,
00085 ::DDS::SampleStateMask sample_states,
00086 ::DDS::ViewStateMask view_states,
00087 ::DDS::InstanceStateMask instance_states)
00088 {
00089 ::DDS::ReturnCode_t const precond =
00090 check_inputs("read", received_data, info_seq, max_samples);
00091 if (::DDS::RETCODE_OK != precond)
00092 {
00093 return precond;
00094 }
00095
00096 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00097 guard,
00098 this->sample_lock_,
00099 ::DDS::RETCODE_ERROR);
00100
00101 return read_i(received_data, info_seq, max_samples, sample_states,
00102 view_states, instance_states, 0);
00103 }
00104
00105 virtual ::DDS::ReturnCode_t take (
00106 MessageSequenceType & received_data,
00107 ::DDS::SampleInfoSeq & info_seq,
00108 ::CORBA::Long max_samples,
00109 ::DDS::SampleStateMask sample_states,
00110 ::DDS::ViewStateMask view_states,
00111 ::DDS::InstanceStateMask instance_states)
00112 {
00113 ::DDS::ReturnCode_t const precond =
00114 check_inputs("take", received_data, info_seq, max_samples);
00115 if (::DDS::RETCODE_OK != precond)
00116 {
00117 return precond;
00118 }
00119
00120 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00121 guard,
00122 this->sample_lock_,
00123 ::DDS::RETCODE_ERROR);
00124
00125 return take_i(received_data, info_seq, max_samples, sample_states,
00126 view_states, instance_states, 0);
00127 }
00128
00129 virtual ::DDS::ReturnCode_t read_w_condition (
00130 MessageSequenceType & received_data,
00131 ::DDS::SampleInfoSeq & sample_info,
00132 ::CORBA::Long max_samples,
00133 ::DDS::ReadCondition_ptr a_condition)
00134 {
00135 ::DDS::ReturnCode_t const precond =
00136 check_inputs("read_w_condition", received_data, sample_info, max_samples);
00137 if (::DDS::RETCODE_OK != precond)
00138 {
00139 return precond;
00140 }
00141
00142 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00143 ::DDS::RETCODE_ERROR);
00144
00145 if (!has_readcondition(a_condition))
00146 {
00147 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00148 }
00149
00150 return read_i(received_data, sample_info, max_samples,
00151 a_condition->get_sample_state_mask(),
00152 a_condition->get_view_state_mask(),
00153 a_condition->get_instance_state_mask(),
00154 #ifndef OPENDDS_NO_QUERY_CONDITION
00155 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition));
00156 #else
00157 0);
00158 #endif
00159 }
00160
00161 virtual ::DDS::ReturnCode_t take_w_condition (
00162 MessageSequenceType & received_data,
00163 ::DDS::SampleInfoSeq & sample_info,
00164 ::CORBA::Long max_samples,
00165 ::DDS::ReadCondition_ptr a_condition)
00166 {
00167 ::DDS::ReturnCode_t const precond =
00168 check_inputs("take_w_condition", received_data, sample_info, max_samples);
00169 if (::DDS::RETCODE_OK != precond)
00170 {
00171 return precond;
00172 }
00173
00174 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00175 ::DDS::RETCODE_ERROR);
00176
00177 if (!has_readcondition(a_condition))
00178 {
00179 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00180 }
00181
00182 return take_i(received_data, sample_info, max_samples,
00183 a_condition->get_sample_state_mask(),
00184 a_condition->get_view_state_mask(),
00185 a_condition->get_instance_state_mask(),
00186 #ifndef OPENDDS_NO_QUERY_CONDITION
00187 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition)
00188 #else
00189 0
00190 #endif
00191 );
00192 }
00193
00194 virtual ::DDS::ReturnCode_t read_next_sample (
00195 MessageType & received_data,
00196 ::DDS::SampleInfo & sample_info)
00197 {
00198
00199 bool found_data = false;
00200
00201 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00202 guard,
00203 this->sample_lock_,
00204 ::DDS::RETCODE_ERROR);
00205
00206 typename InstanceMap::iterator const the_end = instance_map_.end ();
00207 for (typename InstanceMap::iterator it = instance_map_.begin ();
00208 it != the_end;
00209 ++it)
00210 {
00211 ::DDS::InstanceHandle_t handle = it->second;
00212 OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle);
00213
00214 bool mrg = false;
00215
00216 if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) &&
00217 (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE))
00218 {
00219 for (OpenDDS::DCPS::ReceivedDataElement* item = ptr->rcvd_samples_.head_;
00220 item != 0;
00221 item = item->next_data_sample_)
00222 {
00223 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00224 if (item->coherent_change_) continue;
00225 #endif
00226
00227 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE)
00228 {
00229 if (item->registered_data_ != 0)
00230 {
00231 received_data =
00232 *static_cast< MessageType *> (item->registered_data_);
00233 }
00234 ptr->instance_state_.sample_info(sample_info, item);
00235
00236 item->sample_state_ = ::DDS::READ_SAMPLE_STATE;
00237
00238
00239 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00240
00241 found_data = true;
00242 }
00243 if (found_data)
00244 {
00245 break;
00246 }
00247 }
00248 }
00249
00250 if (found_data)
00251 {
00252 if (mrg) ptr->instance_state_.accessed();
00253
00254
00255
00256 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00257
00258 break;
00259 }
00260 }
00261 post_read_or_take();
00262 return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA;
00263 }
00264
00265 virtual ::DDS::ReturnCode_t take_next_sample (
00266 MessageType & received_data,
00267 ::DDS::SampleInfo & sample_info)
00268 {
00269 bool found_data = false;
00270
00271
00272 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00273 guard,
00274 this->sample_lock_,
00275 ::DDS::RETCODE_ERROR);
00276
00277 typename InstanceMap::iterator const the_end = instance_map_.end ();
00278 for (typename InstanceMap::iterator it = instance_map_.begin ();
00279 it != the_end;
00280 ++it)
00281 {
00282 ::DDS::InstanceHandle_t handle = it->second;
00283 OpenDDS::DCPS::SubscriptionInstance* ptr = get_handle_instance(handle);
00284
00285 bool mrg = false;
00286
00287 OpenDDS::DCPS::ReceivedDataElement *tail = 0;
00288 if ((ptr->instance_state_.view_state() & ::DDS::ANY_VIEW_STATE) &&
00289 (ptr->instance_state_.instance_state() & ::DDS::ANY_INSTANCE_STATE))
00290 {
00291
00292 OpenDDS::DCPS::ReceivedDataElement *next;
00293 tail = 0;
00294 OpenDDS::DCPS::ReceivedDataElement *item = ptr->rcvd_samples_.head_;
00295 while (item)
00296 {
00297 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00298 if (item->coherent_change_)
00299 {
00300 item = item->next_data_sample_;
00301 continue;
00302 }
00303 #endif
00304 if (item->sample_state_ & ::DDS::NOT_READ_SAMPLE_STATE)
00305 {
00306 if (item->registered_data_ != 0)
00307 {
00308 received_data =
00309 *static_cast< MessageType *> (item->registered_data_);
00310 }
00311 ptr->instance_state_.sample_info(sample_info, item);
00312
00313 item->sample_state_ = ::DDS::READ_SAMPLE_STATE;
00314
00315 if (!mrg) mrg = ptr->instance_state_.most_recent_generation(item);
00316
00317 if (item == ptr->rcvd_samples_.tail_)
00318 {
00319 tail = ptr->rcvd_samples_.tail_;
00320 item = item->next_data_sample_;
00321 }
00322 else
00323 {
00324 next = item->next_data_sample_;
00325
00326 ptr->rcvd_samples_.remove(item);
00327 dec_ref_data_element(item);
00328
00329 item = next;
00330 }
00331
00332 found_data = true;
00333 }
00334 if (found_data)
00335 {
00336 break;
00337 }
00338 }
00339 }
00340
00341 if (found_data)
00342 {
00343 if (mrg) ptr->instance_state_.accessed();
00344
00345
00346
00347
00348
00349 if (tail)
00350 {
00351 this->sample_info(sample_info, tail);
00352
00353 ptr->rcvd_samples_.remove(tail);
00354 dec_ref_data_element(tail);
00355 }
00356 else
00357 {
00358 this->sample_info(sample_info, ptr->rcvd_samples_.tail_);
00359 }
00360
00361 break;
00362 }
00363 }
00364 post_read_or_take();
00365 return found_data ? ::DDS::RETCODE_OK : ::DDS::RETCODE_NO_DATA;
00366 }
00367
00368 virtual ::DDS::ReturnCode_t read_instance (
00369 MessageSequenceType & received_data,
00370 ::DDS::SampleInfoSeq & info_seq,
00371 ::CORBA::Long max_samples,
00372 ::DDS::InstanceHandle_t a_handle,
00373 ::DDS::SampleStateMask sample_states,
00374 ::DDS::ViewStateMask view_states,
00375 ::DDS::InstanceStateMask instance_states)
00376 {
00377 ::DDS::ReturnCode_t const precond =
00378 check_inputs("read_instance", received_data, info_seq, max_samples);
00379 if (::DDS::RETCODE_OK != precond)
00380 {
00381 return precond;
00382 }
00383
00384 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00385 guard,
00386 this->sample_lock_,
00387 ::DDS::RETCODE_ERROR);
00388 return read_instance_i(received_data, info_seq, max_samples, a_handle,
00389 sample_states, view_states, instance_states, 0);
00390 }
00391
00392 virtual ::DDS::ReturnCode_t take_instance (
00393 MessageSequenceType & received_data,
00394 ::DDS::SampleInfoSeq & info_seq,
00395 ::CORBA::Long max_samples,
00396 ::DDS::InstanceHandle_t a_handle,
00397 ::DDS::SampleStateMask sample_states,
00398 ::DDS::ViewStateMask view_states,
00399 ::DDS::InstanceStateMask instance_states)
00400 {
00401 ::DDS::ReturnCode_t const precond =
00402 check_inputs("take_instance", received_data, info_seq, max_samples);
00403 if (::DDS::RETCODE_OK != precond)
00404 {
00405 return precond;
00406 }
00407
00408 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00409 guard,
00410 this->sample_lock_,
00411 ::DDS::RETCODE_ERROR);
00412 return take_instance_i(received_data, info_seq, max_samples, a_handle,
00413 sample_states, view_states, instance_states, 0);
00414 }
00415
00416 virtual ::DDS::ReturnCode_t read_next_instance (
00417 MessageSequenceType & received_data,
00418 ::DDS::SampleInfoSeq & info_seq,
00419 ::CORBA::Long max_samples,
00420 ::DDS::InstanceHandle_t a_handle,
00421 ::DDS::SampleStateMask sample_states,
00422 ::DDS::ViewStateMask view_states,
00423 ::DDS::InstanceStateMask instance_states)
00424 {
00425 ::DDS::ReturnCode_t const precond =
00426 check_inputs("read_next_instance", received_data, info_seq, max_samples);
00427 if (::DDS::RETCODE_OK != precond)
00428 {
00429 return precond;
00430 }
00431
00432 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00433 sample_states, view_states, instance_states, 0);
00434 }
00435
00436 virtual ::DDS::ReturnCode_t take_next_instance (
00437 MessageSequenceType & received_data,
00438 ::DDS::SampleInfoSeq & info_seq,
00439 ::CORBA::Long max_samples,
00440 ::DDS::InstanceHandle_t a_handle,
00441 ::DDS::SampleStateMask sample_states,
00442 ::DDS::ViewStateMask view_states,
00443 ::DDS::InstanceStateMask instance_states)
00444 {
00445 ::DDS::ReturnCode_t const precond =
00446 check_inputs("take_next_instance", received_data, info_seq, max_samples);
00447 if (::DDS::RETCODE_OK != precond)
00448 {
00449 return precond;
00450 }
00451
00452 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00453 sample_states, view_states, instance_states, 0);
00454 }
00455
00456 virtual ::DDS::ReturnCode_t read_next_instance_w_condition (
00457 MessageSequenceType & received_data,
00458 ::DDS::SampleInfoSeq & info_seq,
00459 ::CORBA::Long max_samples,
00460 ::DDS::InstanceHandle_t a_handle,
00461 ::DDS::ReadCondition_ptr a_condition)
00462 {
00463 ::DDS::ReturnCode_t const precond =
00464 check_inputs("read_next_instance_w_condition", received_data, info_seq,
00465 max_samples);
00466 if (::DDS::RETCODE_OK != precond)
00467 {
00468 return precond;
00469 }
00470
00471 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00472 ::DDS::RETCODE_ERROR);
00473
00474 if (!has_readcondition(a_condition))
00475 {
00476 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00477 }
00478
00479 #ifndef OPENDDS_NO_QUERY_CONDITION
00480 ::DDS::QueryCondition_ptr query_condition =
00481 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition);
00482 #endif
00483
00484 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
00485 a_condition->get_sample_state_mask(),
00486 a_condition->get_view_state_mask(),
00487 a_condition->get_instance_state_mask(),
00488 #ifndef OPENDDS_NO_QUERY_CONDITION
00489 query_condition
00490 #else
00491 0
00492 #endif
00493 );
00494 }
00495
00496 virtual ::DDS::ReturnCode_t take_next_instance_w_condition (
00497 MessageSequenceType & received_data,
00498 ::DDS::SampleInfoSeq & info_seq,
00499 ::CORBA::Long max_samples,
00500 ::DDS::InstanceHandle_t a_handle,
00501 ::DDS::ReadCondition_ptr a_condition)
00502 {
00503 ::DDS::ReturnCode_t const precond =
00504 check_inputs("take_next_instance_w_condition", received_data, info_seq,
00505 max_samples);
00506 if (::DDS::RETCODE_OK != precond)
00507 {
00508 return precond;
00509 }
00510
00511 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
00512 ::DDS::RETCODE_ERROR);
00513
00514 if (!has_readcondition(a_condition))
00515 {
00516 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00517 }
00518
00519 #ifndef OPENDDS_NO_QUERY_CONDITION
00520 ::DDS::QueryCondition_ptr query_condition =
00521 dynamic_cast< ::DDS::QueryCondition_ptr >(a_condition);
00522 #endif
00523
00524 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
00525 a_condition->get_sample_state_mask(),
00526 a_condition->get_view_state_mask(),
00527 a_condition->get_instance_state_mask(),
00528 #ifndef OPENDDS_NO_QUERY_CONDITION
00529 query_condition
00530 #else
00531 0
00532 #endif
00533 );
00534 }
00535
00536 virtual ::DDS::ReturnCode_t return_loan (
00537 MessageSequenceType & received_data,
00538 ::DDS::SampleInfoSeq & info_seq)
00539 {
00540
00541
00542 if (received_data.length() != info_seq.length())
00543 {
00544 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00545 }
00546
00547 if (received_data.release())
00548 {
00549
00550 return ::DDS::RETCODE_OK;
00551 }
00552 else
00553 {
00554 info_seq.length(0);
00555 received_data.length(0);
00556 }
00557 return ::DDS::RETCODE_OK;
00558 }
00559
00560 virtual ::DDS::ReturnCode_t get_key_value (
00561 MessageType & key_holder,
00562 ::DDS::InstanceHandle_t handle)
00563 {
00564 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00565 guard,
00566 this->sample_lock_,
00567 ::DDS::RETCODE_ERROR);
00568
00569 typename InstanceMap::iterator const the_end = instance_map_.end ();
00570 for (typename InstanceMap::iterator it = instance_map_.begin ();
00571 it != the_end;
00572 ++it)
00573 {
00574 if (it->second == handle)
00575 {
00576 key_holder = it->first;
00577 return ::DDS::RETCODE_OK;
00578 }
00579 }
00580
00581 return ::DDS::RETCODE_ERROR;
00582 }
00583
00584 virtual ::DDS::InstanceHandle_t lookup_instance (
00585 const MessageType & instance_data)
00586 {
00587 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00588
00589 if (it == instance_map_.end())
00590 {
00591 return ::DDS::HANDLE_NIL;
00592 }
00593 else
00594 {
00595 return it->second;
00596 }
00597 }
00598
00599 virtual ::DDS::ReturnCode_t auto_return_loan(void* seq)
00600 {
00601 MessageSequenceType& received_data =
00602 *static_cast< MessageSequenceType*> (seq);
00603
00604 if (!received_data.release())
00605 {
00606
00607 received_data.length(0);
00608 }
00609 return ::DDS::RETCODE_OK;
00610 }
00611
00612 void release_loan (MessageSequenceType & received_data)
00613 {
00614 received_data.length(0);
00615 }
00616
00617 void dec_ref_data_element(::OpenDDS::DCPS::ReceivedDataElement* item)
00618 {
00619 using ::OpenDDS::DCPS::ReceivedDataElement;
00620
00621 if (0 == item->dec_ref())
00622 {
00623 if (item->registered_data_ != 0)
00624 {
00625 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00626 guard,
00627 this->sample_lock_);
00628
00629 MessageType* const ptr
00630 = static_cast< MessageType* >(item->registered_data_);
00631 ACE_DES_FREE (ptr,
00632 data_allocator_->free,
00633 MessageType );
00634 }
00635
00636 ACE_DES_FREE (item,
00637 rd_allocator_->free,
00638 ReceivedDataElement);
00639 }
00640 }
00641
00642 virtual void delete_instance_map (void* map)
00643 {
00644 InstanceMap* instances = reinterpret_cast <InstanceMap* > (map);
00645 delete instances;
00646 }
00647
00648 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00649 bool contains_sample_filtered(::DDS::SampleStateMask sample_states,
00650 ::DDS::ViewStateMask view_states,
00651 ::DDS::InstanceStateMask instance_states,
00652 const OpenDDS::DCPS::FilterEvaluator& evaluator,
00653 const ::DDS::StringSeq& params)
00654 {
00655 using namespace OpenDDS::DCPS;
00656 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
00657 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, false);
00658
00659 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
00660 end = instances_.end(); iter != end; ++iter) {
00661 SubscriptionInstance& inst = *iter->second;
00662
00663 if ((inst.instance_state_.view_state() & view_states) &&
00664 (inst.instance_state_.instance_state() & instance_states)) {
00665 for (ReceivedDataElement* item = inst.rcvd_samples_.head_; item != 0;
00666 item = item->next_data_sample_) {
00667 if (item->sample_state_ & sample_states
00668 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00669 && !item->coherent_change_
00670 #endif
00671 ) {
00672 if (evaluator.eval(*static_cast< MessageType* >(item->registered_data_), params)) {
00673 return true;
00674 }
00675 }
00676 }
00677 }
00678 }
00679
00680 return false;
00681 }
00682
00683 ::DDS::ReturnCode_t read_generic(
00684 OpenDDS::DCPS::DataReaderImpl::GenericBundle& gen,
00685 ::DDS::SampleStateMask sample_states, ::DDS::ViewStateMask view_states,
00686 ::DDS::InstanceStateMask instance_states,
00687 bool adjust_ref_count=false)
00688 {
00689
00690 MessageSequenceType data;
00691 ::DDS::ReturnCode_t rc;
00692 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00693 guard,
00694 this->sample_lock_,
00695 ::DDS::RETCODE_ERROR);
00696 {
00697 rc = read_i(data, gen.info_,
00698 ::DDS::LENGTH_UNLIMITED,
00699 sample_states, view_states, instance_states, 0);
00700 if (true == adjust_ref_count ) {
00701 data.increment_references();
00702 }
00703 }
00704 gen.samples_.reserve(data.length());
00705 for (CORBA::ULong i = 0; i < data.length(); ++i) {
00706 gen.samples_.push_back(&data[i]);
00707 }
00708 return rc;
00709
00710 }
00711
00712 ::DDS::InstanceHandle_t lookup_instance_generic(const void* data)
00713 {
00714 return lookup_instance(*static_cast<const MessageType*>(data));
00715 }
00716
00717 virtual ::DDS::ReturnCode_t take(
00718 OpenDDS::DCPS::AbstractSamples& samples,
00719 ::DDS::SampleStateMask sample_states, ::DDS::ViewStateMask view_states,
00720 ::DDS::InstanceStateMask instance_states)
00721 {
00722
00723 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00724 guard,
00725 this->sample_lock_,
00726 ::DDS::RETCODE_ERROR);
00727
00728
00729 MessageSequenceType data;
00730 ::DDS::SampleInfoSeq infos;
00731 ::DDS::ReturnCode_t rc = take_i(data, infos, ::DDS::LENGTH_UNLIMITED,
00732 sample_states, view_states, instance_states, 0);
00733
00734 samples.reserve(data.length());
00735
00736 for (CORBA::ULong i = 0; i < data.length(); ++i) {
00737 samples.push_back(infos[i], &data[i]);
00738 }
00739
00740 return rc;
00741 }
00742
00743 ::DDS::ReturnCode_t read_instance_generic(void*& data,
00744 ::DDS::SampleInfo& info, ::DDS::InstanceHandle_t instance,
00745 ::DDS::SampleStateMask sample_states, ::DDS::ViewStateMask view_states,
00746 ::DDS::InstanceStateMask instance_states)
00747 {
00748 MessageSequenceType dataseq;
00749 ::DDS::SampleInfoSeq infoseq;
00750 ::DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
00751 ::DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
00752 instance_states, 0);
00753 if (rc == ::DDS::RETCODE_NO_DATA) return rc;
00754 const CORBA::ULong last = dataseq.length() - 1;
00755 data = new MessageType(dataseq[last]);
00756 info = infoseq[last];
00757 return rc;
00758 }
00759
00760 ::DDS::ReturnCode_t read_next_instance_generic(void*& data,
00761 ::DDS::SampleInfo& info, ::DDS::InstanceHandle_t previous_instance,
00762 ::DDS::SampleStateMask sample_states, ::DDS::ViewStateMask view_states,
00763 ::DDS::InstanceStateMask instance_states)
00764 {
00765 MessageSequenceType dataseq;
00766 ::DDS::SampleInfoSeq infoseq;
00767 ::DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
00768 ::DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
00769 instance_states, 0);
00770 if (rc == ::DDS::RETCODE_NO_DATA) return rc;
00771 const CORBA::ULong last = dataseq.length() - 1;
00772 data = new MessageType(dataseq[last]);
00773 info = infoseq[last];
00774 return rc;
00775 }
00776
00777 #endif
00778
00779 ::DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
00780 ::DDS::ViewStateKind view)
00781 {
00782 using namespace OpenDDS::DCPS;
00783 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
00784 ::DDS::HANDLE_NIL);
00785
00786 #ifndef OPENDDS_NO_MULTI_TOPIC
00787 ::DDS::TopicDescription_var descr = get_topicdescription();
00788 if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
00789 if (!mt->filter(sample)) {
00790 return ::DDS::HANDLE_NIL;
00791 }
00792 }
00793 #endif
00794
00795 get_subscriber_servant()->data_received(this);
00796
00797 ::DDS::InstanceHandle_t inst = lookup_instance(sample);
00798 bool filtered;
00799 SubscriptionInstance* instance = 0;
00800
00801
00802
00803
00804 for (int i = 0; i < 2; ++i) {
00805 if (i == 0 && inst != ::DDS::HANDLE_NIL) continue;
00806
00807 DataSampleHeader header;
00808 header.message_id_ = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
00809 bool just_registered;
00810 MessageType* data;
00811 ACE_NEW_MALLOC_NORETURN(data,
00812 static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))),
00813 MessageType(sample));
00814 store_instance_data(data, header, instance, just_registered, filtered);
00815 if (instance) inst = instance->instance_handle_;
00816 }
00817
00818 if (!filtered) {
00819 if (view == ::DDS::NOT_NEW_VIEW_STATE) {
00820 instance->instance_state_.accessed();
00821 }
00822 notify_read_conditions();
00823 }
00824 return inst;
00825 }
00826
00827 void set_instance_state(::DDS::InstanceHandle_t instance,
00828 ::DDS::InstanceStateKind state)
00829 {
00830 using namespace OpenDDS::DCPS;
00831 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
00832
00833 SubscriptionInstance* si = get_handle_instance(instance);
00834 if (si && state != ::DDS::ALIVE_INSTANCE_STATE) {
00835 DataSampleHeader header;
00836 header.message_id_ = (state == ::DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
00837 ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
00838 bool just_registered, filtered;
00839 MessageType* data;
00840 ACE_NEW_MALLOC_NORETURN(data,
00841 static_cast< MessageType*>(data_allocator_->malloc(sizeof(MessageType))),
00842 MessageType);
00843 get_key_value(*data, instance);
00844 store_instance_data(data, header, si, just_registered, filtered);
00845 notify_read_conditions();
00846 }
00847 }
00848
00849 virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00850 OpenDDS::DCPS::SubscriptionInstance*& instance)
00851 {
00852
00853
00854 MessageType data;
00855
00856 const bool cdr = sample.header_.cdr_encapsulation_;
00857
00858 ::OpenDDS::DCPS::Serializer ser(
00859 sample.sample_,
00860 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00861 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00862 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00863
00864 if (cdr) {
00865 ACE_CDR::ULong header;
00866 ser >> header;
00867 }
00868
00869 if (cdr && Serializer::use_rti_serialization()) {
00870
00871 ser.reset_alignment();
00872 }
00873 if (sample.header_.key_fields_only_) {
00874 ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(data);
00875 } else {
00876 ser >> data;
00877 }
00878
00879
00880 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
00881 typename InstanceMap::const_iterator const it = instance_map_.find(data);
00882 if (it != instance_map_.end()) {
00883 handle = it->second;
00884 }
00885
00886 if (handle == ::DDS::HANDLE_NIL) {
00887 instance = 0;
00888 } else {
00889 instance = get_handle_instance(handle);
00890 }
00891 }
00892
00893 protected:
00894
00895 virtual void dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample& sample,
00896 OpenDDS::DCPS::SubscriptionInstance*& instance,
00897 bool & just_registered,
00898 bool & filtered,
00899 OpenDDS::DCPS::MarshalingType marshaling_type)
00900 {
00901 MessageType* data;
00902
00903 ACE_NEW_MALLOC_NORETURN(data,
00904 static_cast< MessageType *>(
00905 data_allocator_->malloc(sizeof(MessageType))),
00906 MessageType);
00907
00908 const bool cdr = sample.header_.cdr_encapsulation_;
00909
00910 OpenDDS::DCPS::Serializer ser(
00911 sample.sample_,
00912 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
00913 cdr ? OpenDDS::DCPS::Serializer::ALIGN_CDR : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00914
00915 if (cdr) {
00916 ACE_CDR::ULong header;
00917 ser >> header;
00918 }
00919
00920 if (cdr && Serializer::use_rti_serialization()) {
00921
00922 ser.reset_alignment();
00923 }
00924 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00925 ser >> ::OpenDDS::DCPS::KeyOnly< MessageType>(*data);
00926 } else {
00927 ser >> *data;
00928 }
00929
00930
00931 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00932 if (!sample.header_.content_filter_) {
00933 using OpenDDS::DCPS::ContentFilteredTopicImpl;
00934 if (ContentFilteredTopicImpl* cft =
00935 dynamic_cast<ContentFilteredTopicImpl*>(content_filtered_topic_.in())) {
00936 if (sample.header_.message_id_ == OpenDDS::DCPS::SAMPLE_DATA
00937 && !cft->filter(*data)) {
00938 filtered = true;
00939 return;
00940 }
00941 }
00942 }
00943 #endif
00944
00945 store_instance_data(data, sample.header_, instance, just_registered, filtered);
00946 }
00947
00948 virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample& sample,
00949 OpenDDS::DCPS::SubscriptionInstance*& instance)
00950 {
00951
00952
00953
00954
00955
00956
00957 bool just_registered = false;
00958 bool filtered = false;
00959 OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
00960 if (sample.header_.key_fields_only_) {
00961 marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
00962 }
00963 this->dds_demarshal(sample, instance, just_registered, filtered, marshaling);
00964 }
00965
00966 virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance* instance)
00967 {
00968 instance->instance_state_.cancel_release();
00969
00970 while (instance->rcvd_samples_.size_ > 0)
00971 {
00972 OpenDDS::DCPS::ReceivedDataElement* head =
00973 instance->rcvd_samples_.remove_head();
00974 dec_ref_data_element(head);
00975 }
00976
00977 delete instance;
00978 }
00979
00980 virtual void release_instance_i (::DDS::InstanceHandle_t handle)
00981 {
00982 typename InstanceMap::iterator const the_end = instance_map_.end ();
00983 typename InstanceMap::iterator it = instance_map_.begin ();
00984 while (it != the_end)
00985 {
00986 if (it->second == handle)
00987 {
00988 typename InstanceMap::iterator curIt = it;
00989 ++ it;
00990 instance_map_.erase (curIt);
00991 }
00992 else
00993 ++ it;
00994 }
00995 }
00996
00997 private:
00998
00999 ::DDS::ReturnCode_t read_i (
01000 MessageSequenceType & received_data,
01001 ::DDS::SampleInfoSeq & info_seq,
01002 ::CORBA::Long max_samples,
01003 ::DDS::SampleStateMask sample_states,
01004 ::DDS::ViewStateMask view_states,
01005 ::DDS::InstanceStateMask instance_states,
01006 #ifndef OPENDDS_NO_QUERY_CONDITION
01007 ::DDS::QueryCondition_ptr a_condition)
01008 #else
01009 int ignored)
01010 #endif
01011 {
01012 #ifdef OPENDDS_NO_QUERY_CONDITION
01013 ACE_UNUSED_ARG(ignored);
01014 #endif
01015
01016 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01017
01018 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01019 if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01020 && ! this->coherent_) {
01021 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01022 }
01023
01024 bool group_coherent_ordered
01025 = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01026 && this->subqos_.presentation.coherent_access
01027 && this->subqos_.presentation.ordered_access;
01028
01029 if (group_coherent_ordered && this->coherent_) {
01030 max_samples = 1;
01031 }
01032 #endif
01033
01034 ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01035 results(this, received_data, info_seq, max_samples,
01036 this->subqos_.presentation,
01037 #ifndef OPENDDS_NO_QUERY_CONDITION
01038 a_condition,
01039 #endif
01040 ::OpenDDS::DCPS::DDS_OPERATION_READ);
01041
01042 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01043 if (! group_coherent_ordered) {
01044 #endif
01045 for (typename InstanceMap::iterator it = instance_map_.begin(),
01046 the_end = instance_map_.end(); it != the_end; ++it)
01047 {
01048 ::DDS::InstanceHandle_t handle = it->second;
01049
01050 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle);
01051
01052 if ((inst->instance_state_.view_state() & view_states) &&
01053 (inst->instance_state_.instance_state() & instance_states))
01054 {
01055 size_t i(0);
01056 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01057 item != 0; item = item->next_data_sample_)
01058 {
01059 if (item->sample_state_ & sample_states
01060 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01061 && !item->coherent_change_
01062 #endif
01063 )
01064 {
01065 results.insert_sample(item, inst, ++i);
01066 }
01067 }
01068 }
01069 }
01070 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01071 }
01072 else {
01073 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01074 results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01075 }
01076 #endif
01077
01078 results.copy_to_user();
01079
01080 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01081 if (received_data.length())
01082 {
01083 ret = ::DDS::RETCODE_OK;
01084 if (received_data.maximum() == 0)
01085 {
01086 received_data_p.set_loaner(this);
01087 }
01088 }
01089
01090 post_read_or_take();
01091
01092 return ret;
01093 }
01094
01095 ::DDS::ReturnCode_t take_i (
01096 MessageSequenceType & received_data,
01097 ::DDS::SampleInfoSeq & info_seq,
01098 ::CORBA::Long max_samples,
01099 ::DDS::SampleStateMask sample_states,
01100 ::DDS::ViewStateMask view_states,
01101 ::DDS::InstanceStateMask instance_states,
01102 #ifndef OPENDDS_NO_QUERY_CONDITION
01103 ::DDS::QueryCondition_ptr a_condition)
01104 #else
01105 int ignored)
01106 #endif
01107 {
01108 #ifdef OPENDDS_NO_QUERY_CONDITION
01109 ACE_UNUSED_ARG(ignored);
01110 #endif
01111
01112 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01113
01114 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01115 if (this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01116 && ! this->coherent_) {
01117 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01118 }
01119
01120 bool group_coherent_ordered
01121 = this->subqos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS
01122 && this->subqos_.presentation.coherent_access
01123 && this->subqos_.presentation.ordered_access;
01124
01125 if (group_coherent_ordered && this->coherent_) {
01126 max_samples = 1;
01127 }
01128 #endif
01129
01130 ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01131 results(this, received_data, info_seq, max_samples,
01132 this->subqos_.presentation,
01133 #ifndef OPENDDS_NO_QUERY_CONDITION
01134 a_condition,
01135 #endif
01136 ::OpenDDS::DCPS::DDS_OPERATION_TAKE);
01137
01138 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01139 if (! group_coherent_ordered) {
01140 #endif
01141
01142 for (typename InstanceMap::iterator it = instance_map_.begin(),
01143 the_end = instance_map_.end(); it != the_end; ++it)
01144 {
01145 ::DDS::InstanceHandle_t handle = it->second;
01146
01147 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(handle);
01148
01149 if ((inst->instance_state_.view_state() & view_states) &&
01150 (inst->instance_state_.instance_state() & instance_states))
01151 {
01152 size_t i(0);
01153 for (OpenDDS::DCPS::ReceivedDataElement *item = inst->rcvd_samples_.head_;
01154 item != 0; item = item->next_data_sample_)
01155 {
01156 if (item->sample_state_ & sample_states
01157 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01158 && !item->coherent_change_
01159 #endif
01160 )
01161 {
01162 results.insert_sample(item, inst, ++i);
01163 }
01164 }
01165 }
01166 }
01167 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01168 }
01169 else {
01170 OpenDDS::DCPS::RakeData item = this->group_coherent_ordered_data_.get_data();
01171 results.insert_sample(item.rde_, item.si_, item.index_in_instance_);
01172 }
01173 #endif
01174
01175 results.copy_to_user();
01176
01177 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01178 if (received_data.length())
01179 {
01180 ret = ::DDS::RETCODE_OK;
01181 if (received_data.maximum() == 0)
01182 {
01183 received_data_p.set_loaner(this);
01184 }
01185 }
01186
01187 post_read_or_take();
01188 return ret;
01189 }
01190
01191 ::DDS::ReturnCode_t read_instance_i (
01192 MessageSequenceType & received_data,
01193 ::DDS::SampleInfoSeq & info_seq,
01194 ::CORBA::Long max_samples,
01195 ::DDS::InstanceHandle_t a_handle,
01196 ::DDS::SampleStateMask sample_states,
01197 ::DDS::ViewStateMask view_states,
01198 ::DDS::InstanceStateMask instance_states,
01199 #ifndef OPENDDS_NO_QUERY_CONDITION
01200 ::DDS::QueryCondition_ptr a_condition)
01201 #else
01202 int ignored)
01203 #endif
01204 {
01205 #ifdef OPENDDS_NO_QUERY_CONDITION
01206 ACE_UNUSED_ARG(ignored);
01207 #endif
01208
01209 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01210
01211 ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01212 results(this, received_data, info_seq, max_samples,
01213 this->subqos_.presentation,
01214 #ifndef OPENDDS_NO_QUERY_CONDITION
01215 a_condition,
01216 #endif
01217 ::OpenDDS::DCPS::DDS_OPERATION_READ);
01218
01219 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle);
01220 if (inst == 0) return ::DDS::RETCODE_BAD_PARAMETER;
01221
01222 if ((inst->instance_state_.view_state() & view_states) &&
01223 (inst->instance_state_.instance_state() & instance_states))
01224 {
01225 size_t i(0);
01226 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01227 item; item = item->next_data_sample_)
01228 {
01229 if (item->sample_state_ & sample_states
01230 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01231 && !item->coherent_change_
01232 #endif
01233 )
01234 {
01235 results.insert_sample(item, inst, ++i);
01236 }
01237 }
01238 }
01239
01240 results.copy_to_user();
01241
01242 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01243 if (received_data.length())
01244 {
01245 ret = ::DDS::RETCODE_OK;
01246 if (received_data.maximum() == 0)
01247 {
01248 received_data_p.set_loaner(this);
01249 }
01250 }
01251
01252 post_read_or_take();
01253 return ret;
01254 }
01255
01256 ::DDS::ReturnCode_t take_instance_i (
01257 MessageSequenceType & received_data,
01258 ::DDS::SampleInfoSeq & info_seq,
01259 ::CORBA::Long max_samples,
01260 ::DDS::InstanceHandle_t a_handle,
01261 ::DDS::SampleStateMask sample_states,
01262 ::DDS::ViewStateMask view_states,
01263 ::DDS::InstanceStateMask instance_states,
01264 #ifndef OPENDDS_NO_QUERY_CONDITION
01265 ::DDS::QueryCondition_ptr a_condition)
01266 #else
01267 int ignored)
01268 #endif
01269 {
01270 #ifdef OPENDDS_NO_QUERY_CONDITION
01271 ACE_UNUSED_ARG(ignored);
01272 #endif
01273
01274 typename MessageSequenceType::PrivateMemberAccess received_data_p(received_data);
01275
01276 ::OpenDDS::DCPS::RakeResults< MessageSequenceType >
01277 results(this, received_data, info_seq, max_samples,
01278 this->subqos_.presentation,
01279 #ifndef OPENDDS_NO_QUERY_CONDITION
01280 a_condition,
01281 #endif
01282 ::OpenDDS::DCPS::DDS_OPERATION_TAKE);
01283
01284 OpenDDS::DCPS::SubscriptionInstance* inst = get_handle_instance(a_handle);
01285
01286 if ((inst->instance_state_.view_state() & view_states) &&
01287 (inst->instance_state_.instance_state() & instance_states))
01288 {
01289 size_t i(0);
01290 for (OpenDDS::DCPS::ReceivedDataElement* item = inst->rcvd_samples_.head_;
01291 item; item = item->next_data_sample_)
01292 {
01293 if (item->sample_state_ & sample_states
01294 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01295 && !item->coherent_change_
01296 #endif
01297 )
01298 {
01299 results.insert_sample(item, inst, ++i);
01300 }
01301 }
01302 }
01303
01304 results.copy_to_user();
01305
01306 ::DDS::ReturnCode_t ret = ::DDS::RETCODE_NO_DATA;
01307 if (received_data.length())
01308 {
01309 ret = ::DDS::RETCODE_OK;
01310 if (received_data.maximum() == 0)
01311 {
01312 received_data_p.set_loaner(this);
01313 }
01314 }
01315
01316 post_read_or_take();
01317 return ret;
01318 }
01319
01320 ::DDS::ReturnCode_t read_next_instance_i (
01321 MessageSequenceType & received_data,
01322 ::DDS::SampleInfoSeq & info_seq,
01323 ::CORBA::Long max_samples,
01324 ::DDS::InstanceHandle_t a_handle,
01325 ::DDS::SampleStateMask sample_states,
01326 ::DDS::ViewStateMask view_states,
01327 ::DDS::InstanceStateMask instance_states,
01328 #ifndef OPENDDS_NO_QUERY_CONDITION
01329 ::DDS::QueryCondition_ptr a_condition)
01330 #else
01331 int ignored)
01332 #endif
01333 {
01334 #ifdef OPENDDS_NO_QUERY_CONDITION
01335 ACE_UNUSED_ARG(ignored);
01336 #endif
01337
01338 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01339
01340 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01341 guard,
01342 this->sample_lock_,
01343 ::DDS::RETCODE_ERROR);
01344
01345 typename InstanceMap::iterator it;
01346 typename InstanceMap::iterator const the_end = instance_map_.end ();
01347
01348 if (a_handle == ::DDS::HANDLE_NIL)
01349 {
01350 it = instance_map_.begin ();
01351 }
01352 else
01353 {
01354 for (it = instance_map_.begin ();
01355 it != the_end;
01356 ++it)
01357 {
01358 if (a_handle == it->second)
01359 {
01360 ++it;
01361 break;
01362 }
01363 }
01364 }
01365
01366 for (; it != the_end; ++it)
01367 {
01368 handle = it->second;
01369 ::DDS::ReturnCode_t const status =
01370 read_instance_i(received_data, info_seq, max_samples, handle,
01371 sample_states, view_states, instance_states,
01372 #ifndef OPENDDS_NO_QUERY_CONDITION
01373 a_condition);
01374 #else
01375 0);
01376 #endif
01377 if (status != ::DDS::RETCODE_NO_DATA)
01378 {
01379 post_read_or_take();
01380 return status;
01381 }
01382 }
01383
01384 post_read_or_take();
01385 return ::DDS::RETCODE_NO_DATA;
01386 }
01387
01388 ::DDS::ReturnCode_t take_next_instance_i (
01389 MessageSequenceType & received_data,
01390 ::DDS::SampleInfoSeq & info_seq,
01391 ::CORBA::Long max_samples,
01392 ::DDS::InstanceHandle_t a_handle,
01393 ::DDS::SampleStateMask sample_states,
01394 ::DDS::ViewStateMask view_states,
01395 ::DDS::InstanceStateMask instance_states,
01396 #ifndef OPENDDS_NO_QUERY_CONDITION
01397 ::DDS::QueryCondition_ptr a_condition)
01398 #else
01399 int ignored)
01400 #endif
01401 {
01402 #ifdef OPENDDS_NO_QUERY_CONDITION
01403 ACE_UNUSED_ARG(ignored);
01404 #endif
01405
01406 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01407
01408 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01409 guard,
01410 this->sample_lock_,
01411 ::DDS::RETCODE_ERROR);
01412
01413 typename InstanceMap::iterator it;
01414 typename InstanceMap::iterator const the_end = instance_map_.end ();
01415
01416 if (a_handle == ::DDS::HANDLE_NIL)
01417 {
01418 it = instance_map_.begin ();
01419 }
01420 else
01421 {
01422 for (it = instance_map_.begin (); it != the_end; ++it)
01423 {
01424 if (a_handle == it->second)
01425 {
01426 ++it;
01427 break;
01428 }
01429 }
01430 }
01431
01432 for (; it != the_end; ++it)
01433 {
01434 handle = it->second;
01435 ::DDS::ReturnCode_t const status =
01436 take_instance_i(received_data, info_seq, max_samples, handle,
01437 sample_states, view_states, instance_states,
01438 #ifndef OPENDDS_NO_QUERY_CONDITION
01439 a_condition);
01440 #else
01441 0);
01442 #endif
01443 if (status != ::DDS::RETCODE_NO_DATA)
01444 {
01445 total_samples();
01446 post_read_or_take();
01447 return status;
01448 }
01449 }
01450 post_read_or_take();
01451 return ::DDS::RETCODE_NO_DATA;
01452 }
01453
01454 void store_instance_data(
01455 MessageType *instance_data,
01456 const OpenDDS::DCPS::DataSampleHeader& header,
01457 OpenDDS::DCPS::SubscriptionInstance*& instance_ptr,
01458 bool & just_registered,
01459 bool & filtered)
01460 {
01461 const bool is_dispose_msg =
01462 header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
01463 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01464 const bool is_unregister_msg =
01465 header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
01466 header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
01467
01468 ::DDS::InstanceHandle_t handle(::DDS::HANDLE_NIL);
01469
01470
01471
01472
01473 typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
01474
01475 if ((is_dispose_msg || is_unregister_msg) && it == instance_map_.end())
01476 {
01477 ACE_DES_FREE (instance_data,
01478 data_allocator_->free,
01479 MessageType );
01480 instance_data = 0;
01481 return;
01482 }
01483
01484
01485 if (it == instance_map_.end())
01486 {
01487 std::size_t instances_size = 0;
01488 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01489 instances_size = instances_.size();
01490 }
01491 if ((this->qos_.resource_limits.max_instances != ::DDS::LENGTH_UNLIMITED) &&
01492 ((::CORBA::Long) instances_size >= this->qos_.resource_limits.max_instances))
01493 {
01494
01495 ::DDS::DataReaderListener_var listener
01496 = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01497
01498 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01499
01500 sample_rejected_status_.last_reason =
01501 ::DDS::REJECTED_BY_INSTANCES_LIMIT;
01502 ++sample_rejected_status_.total_count;
01503 ++sample_rejected_status_.total_count_change;
01504 sample_rejected_status_.last_instance_handle = handle;
01505
01506 ::DDS::DataReader_var dr = get_dr_obj_ref();
01507 if (!CORBA::is_nil(listener.in()))
01508 {
01509 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01510
01511 listener->on_sample_rejected(dr.in (),
01512 sample_rejected_status_);
01513 }
01514 notify_status_condition_no_sample_lock();
01515
01516 ACE_DES_FREE (instance_data,
01517 data_allocator_->free,
01518 MessageType );
01519
01520 return;
01521 }
01522
01523
01524
01525
01526
01527
01528
01529 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01530 InstanceMap* inst = 0;
01531 bool new_handle = true;
01532 if (this->is_exclusive_ownership_) {
01533 if (this->owner_manager_->instance_lock_acquire () != 0) {
01534 ACE_ERROR ((LM_ERROR,
01535 ACE_TEXT("(%P|%t) ")
01536 ACE_TEXT("%CDataReaderImpl::")
01537 ACE_TEXT("store_instance_data, ")
01538 ACE_TEXT("acquire instance_lock failed. \n"), TraitsType::type_name()));
01539 return;
01540 }
01541
01542 inst = (InstanceMap*)(
01543 this->owner_manager_->get_instance_map(this->topic_servant_->type_name(), this));
01544 if (inst != 0) {
01545 typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
01546 if (iter != inst->end ()) {
01547 handle = iter->second;
01548 new_handle = false;
01549 }
01550 }
01551 }
01552 #endif
01553
01554 just_registered = true;
01555 OpenDDS::DCPS::SubscriptionInstance* instance = 0;
01556 ::DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(instance_data);
01557 handle = handle == ::DDS::HANDLE_NIL ? this->get_next_handle( key) : handle;
01558 ACE_NEW (instance,
01559 OpenDDS::DCPS::SubscriptionInstance(this,
01560 this->qos_,
01561 this->instances_lock_,
01562 handle));
01563
01564 instance->instance_handle_ = handle;
01565
01566 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01567 int ret = OpenDDS::DCPS::bind(instances_, handle, instance);
01568
01569 if (ret != 0)
01570 {
01571 ACE_ERROR ((LM_ERROR,
01572 ACE_TEXT("(%P|%t) ")
01573 ACE_TEXT("%CDataReaderImpl::")
01574 ACE_TEXT("store_instance_data, ")
01575 ACE_TEXT("insert handle failed. \n"), TraitsType::type_name()));
01576 return;
01577 }
01578 }
01579
01580 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01581 if (this->is_exclusive_ownership_) {
01582 if (inst == 0) {
01583 inst = new InstanceMap ();
01584 this->owner_manager_->set_instance_map(
01585 this->topic_servant_->type_name(), reinterpret_cast <void* > (inst), this);
01586 }
01587
01588 if (new_handle) {
01589 std::pair<typename InstanceMap::iterator, bool> bpair =
01590 inst->insert(typename InstanceMap::value_type(*instance_data,
01591 handle));
01592 if (bpair.second == false)
01593 {
01594 ACE_ERROR ((LM_ERROR,
01595 ACE_TEXT("(%P|%t) ")
01596 ACE_TEXT("%CDataReaderImpl::")
01597 ACE_TEXT("store_instance_data, ")
01598 ACE_TEXT("insert to participant scope %s failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01599 return;
01600 }
01601 }
01602
01603 if (this->owner_manager_->instance_lock_release () != 0) {
01604 ACE_ERROR ((LM_ERROR,
01605 ACE_TEXT("(%P|%t) ")
01606 ACE_TEXT("%CDataReaderImpl::")
01607 ACE_TEXT("store_instance_data, ")
01608 ACE_TEXT("release instance_lock failed. \n"), TraitsType::type_name()));
01609 return;
01610 }
01611 }
01612 #endif
01613
01614 std::pair<typename InstanceMap::iterator, bool> bpair =
01615 instance_map_.insert(typename InstanceMap::value_type(*instance_data,
01616 handle));
01617 if (bpair.second == false)
01618 {
01619 ACE_ERROR ((LM_ERROR,
01620 ACE_TEXT("(%P|%t) ")
01621 ACE_TEXT("%CDataReaderImpl::")
01622 ACE_TEXT("store_instance_data, ")
01623 ACE_TEXT("insert %s failed. \n"), TraitsType::type_name(), TraitsType::type_name()));
01624 return;
01625 }
01626 }
01627 else
01628 {
01629 just_registered = false;
01630 handle = it->second;
01631 }
01632
01633 if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
01634 {
01635 instance_ptr = get_handle_instance(handle);
01636
01637 if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
01638 {
01639
01640
01641 filtered = this->filter_instance(instance_ptr,header.publication_id_);
01642
01643 if (filtered)
01644 {
01645 ACE_DES_FREE (instance_data,
01646 data_allocator_->free,
01647 MessageType );
01648 return;
01649 }
01650 }
01651
01652 if ((this->qos_.resource_limits.max_samples_per_instance !=
01653 ::DDS::LENGTH_UNLIMITED) &&
01654 (instance_ptr->rcvd_samples_.size_ >=
01655 this->qos_.resource_limits.max_samples_per_instance))
01656 {
01657
01658
01659
01660
01661
01662
01663 if (! is_dispose_msg && ! is_unregister_msg
01664 && instance_ptr->rcvd_samples_.head_->sample_state_
01665 == ::DDS::NOT_READ_SAMPLE_STATE)
01666 {
01667
01668
01669
01670
01671 ::DDS::DataReaderListener_var listener
01672 = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01673
01674 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01675
01676 sample_rejected_status_.last_reason =
01677 ::DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
01678 ++sample_rejected_status_.total_count;
01679 ++sample_rejected_status_.total_count_change;
01680 sample_rejected_status_.last_instance_handle = handle;
01681
01682 ::DDS::DataReader_var dr = get_dr_obj_ref();
01683 if (!CORBA::is_nil(listener.in()))
01684 {
01685 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01686
01687 listener->on_sample_rejected(dr.in (),
01688 sample_rejected_status_);
01689 }
01690 notify_status_condition_no_sample_lock();
01691
01692 ACE_DES_FREE (instance_data,
01693 data_allocator_->free,
01694 MessageType );
01695
01696 return;
01697 }
01698 else if (! is_dispose_msg && ! is_unregister_msg)
01699 {
01700
01701 OpenDDS::DCPS::ReceivedDataElement *item =
01702 instance_ptr->rcvd_samples_.head_;
01703 instance_ptr->rcvd_samples_.remove(item);
01704 dec_ref_data_element(item);
01705 }
01706 }
01707 else if (this->qos_.resource_limits.max_samples != ::DDS::LENGTH_UNLIMITED)
01708 {
01709 CORBA::Long total_samples = 0;
01710 { ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
01711 for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
01712 iter != instances_.end();
01713 ++iter) {
01714 OpenDDS::DCPS::SubscriptionInstance *ptr = iter->second;
01715
01716 total_samples += (CORBA::Long) ptr->rcvd_samples_.size_;
01717 }
01718 }
01719
01720 if(total_samples >= this->qos_.resource_limits.max_samples)
01721 {
01722
01723
01724
01725
01726
01727 if (! is_dispose_msg && ! is_unregister_msg
01728 && instance_ptr->rcvd_samples_.head_->sample_state_
01729 == ::DDS::NOT_READ_SAMPLE_STATE)
01730 {
01731
01732
01733
01734
01735 ::DDS::DataReaderListener_var listener
01736 = listener_for (::DDS::SAMPLE_REJECTED_STATUS);
01737
01738 set_status_changed_flag (::DDS::SAMPLE_REJECTED_STATUS, true);
01739
01740 sample_rejected_status_.last_reason =
01741 ::DDS::REJECTED_BY_SAMPLES_LIMIT;
01742 ++sample_rejected_status_.total_count;
01743 ++sample_rejected_status_.total_count_change;
01744 sample_rejected_status_.last_instance_handle = handle;
01745 ::DDS::DataReader_var dr = get_dr_obj_ref();
01746 if (!CORBA::is_nil(listener.in()))
01747 {
01748 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01749
01750 listener->on_sample_rejected(dr.in (),
01751 sample_rejected_status_);
01752 }
01753 notify_status_condition_no_sample_lock();
01754
01755 ACE_DES_FREE (instance_data,
01756 data_allocator_->free,
01757 MessageType );
01758
01759 return;
01760 }
01761 else if (! is_dispose_msg && ! is_unregister_msg)
01762 {
01763
01764 OpenDDS::DCPS::ReceivedDataElement *item =
01765 instance_ptr->rcvd_samples_.head_;
01766 instance_ptr->rcvd_samples_.remove(item);
01767 dec_ref_data_element(item);
01768 }
01769 }
01770 }
01771
01772 if (is_dispose_msg || is_unregister_msg)
01773 {
01774 ACE_DES_FREE (instance_data,
01775 data_allocator_->free,
01776 MessageType );
01777 instance_data = 0;
01778 }
01779
01780 bool event_notify = false;
01781
01782 if (is_dispose_msg) {
01783 event_notify = instance_ptr->instance_state_.dispose_was_received(header.publication_id_);
01784 }
01785
01786 if (is_unregister_msg) {
01787 if (instance_ptr->instance_state_.unregister_was_received(header.publication_id_)) {
01788 event_notify = true;
01789 }
01790 }
01791
01792 if (!is_dispose_msg && !is_unregister_msg) {
01793 event_notify = true;
01794 instance_ptr->instance_state_.data_was_received(header.publication_id_);
01795 }
01796
01797 if (!event_notify) {
01798 return;
01799 }
01800
01801 OpenDDS::DCPS::ReceivedDataElement *ptr;
01802 ACE_NEW_MALLOC (ptr,
01803 static_cast<OpenDDS::DCPS::ReceivedDataElement *> (
01804 rd_allocator_->malloc (
01805 sizeof (OpenDDS::DCPS::ReceivedDataElement))),
01806 OpenDDS::DCPS::ReceivedDataElement(header,
01807 instance_data));
01808
01809 ptr->disposed_generation_count_ =
01810 instance_ptr->instance_state_.disposed_generation_count();
01811 ptr->no_writers_generation_count_ =
01812 instance_ptr->instance_state_.no_writers_generation_count();
01813
01814 instance_ptr->last_sequence_ = header.sequence_;
01815
01816 instance_ptr->rcvd_strategy_->add(ptr);
01817
01818 if (! is_dispose_msg && ! is_unregister_msg
01819 && instance_ptr->rcvd_samples_.size_ > get_depth())
01820 {
01821 OpenDDS::DCPS::ReceivedDataElement* head_ptr =
01822 instance_ptr->rcvd_samples_.head_;
01823
01824 instance_ptr->rcvd_samples_.remove(head_ptr);
01825
01826 if (head_ptr->sample_state_ == ::DDS::NOT_READ_SAMPLE_STATE)
01827 {
01828 ::DDS::DataReaderListener_var listener
01829 = listener_for (::DDS::SAMPLE_LOST_STATUS);
01830
01831 ++sample_lost_status_.total_count;
01832 ++sample_lost_status_.total_count_change;
01833
01834 set_status_changed_flag(::DDS::SAMPLE_LOST_STATUS, true);
01835
01836 ::DDS::DataReader_var dr = get_dr_obj_ref();
01837 if (!CORBA::is_nil(listener.in()))
01838 {
01839 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01840
01841 listener->on_sample_lost(dr.in (), sample_lost_status_);
01842 }
01843
01844 notify_status_condition_no_sample_lock();
01845 }
01846
01847 dec_ref_data_element(head_ptr);
01848 }
01849
01850 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01851 if (! ptr->coherent_change_) {
01852 #endif
01853 OpenDDS::DCPS::SubscriberImpl* sub = get_subscriber_servant ();
01854
01855 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
01856 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
01857
01858 ::DDS::SubscriberListener_var sub_listener =
01859 sub->listener_for(::DDS::DATA_ON_READERS_STATUS);
01860 if (!CORBA::is_nil(sub_listener.in()) && !this->coherent_)
01861 {
01862 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01863
01864 sub_listener->on_data_on_readers(sub);
01865 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
01866 }
01867 else
01868 {
01869 sub->notify_status_condition();
01870
01871 ::DDS::DataReaderListener_var listener =
01872 listener_for (::DDS::DATA_AVAILABLE_STATUS);
01873
01874 ::DDS::DataReader_var dr = get_dr_obj_ref();
01875 if (!CORBA::is_nil(listener.in()))
01876 {
01877 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01878
01879 listener->on_data_available(dr.in ());
01880 set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
01881 sub->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
01882 }
01883 else
01884 {
01885 notify_status_condition_no_sample_lock();
01886 }
01887 }
01888 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
01889 }
01890 #endif
01891 }
01892 else
01893 {
01894 instance_ptr = this->get_handle_instance (handle);
01895 instance_ptr->instance_state_.lively(header.publication_id_);
01896 ACE_DES_FREE (instance_data,
01897 data_allocator_->free,
01898 MessageType );
01899 }
01900 }
01901
01902
01903
01904
01905 void notify_status_condition_no_sample_lock()
01906 {
01907
01908
01909
01910
01911
01912
01913
01914
01915
01916
01917
01918
01919
01920 ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
01921 notify_status_condition();
01922 }
01923
01924
01925
01926 ::DDS::ReturnCode_t check_inputs (
01927 const char* method_name,
01928 MessageSequenceType & received_data,
01929 ::DDS::SampleInfoSeq & info_seq,
01930 ::CORBA::Long max_samples)
01931 {
01932 typename MessageSequenceType::PrivateMemberAccess received_data_p (received_data);
01933
01934
01935
01936
01937
01938
01939
01940
01941
01942
01943 if (received_data.length() != info_seq.length())
01944 {
01945 ACE_DEBUG((LM_DEBUG,
01946 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01947 ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
01948 ACE_TEXT("sequences do not match.\n"),
01949 TraitsType::type_name(),
01950 method_name ));
01951 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01952 }
01953
01954
01955 if ((received_data.maximum() > 0) && (received_data.release() == false))
01956 {
01957 ACE_DEBUG((LM_DEBUG,
01958 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01959 ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
01960 ACE_TEXT("maximum %d and owns %d\n"),
01961 TraitsType::type_name(),
01962 method_name,
01963 received_data.maximum(),
01964 received_data.release() ));
01965
01966 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01967 }
01968
01969 if (received_data.maximum() == 0)
01970 {
01971
01972 if (max_samples == ::DDS::LENGTH_UNLIMITED)
01973 {
01974 max_samples =
01975 static_cast< ::CORBA::Long> (received_data_p.max_slots());
01976 }
01977 }
01978 else
01979 {
01980 if (max_samples == ::DDS::LENGTH_UNLIMITED)
01981 {
01982
01983 max_samples = received_data.maximum();
01984 }
01985 else if (
01986 max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
01987 {
01988
01989 ACE_DEBUG((LM_DEBUG,
01990 ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
01991 ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
01992 TraitsType::type_name(),
01993 method_name,
01994 max_samples,
01995 received_data.maximum()));
01996 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
01997 }
01998
01999
02000 }
02001
02002
02003
02004 if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
02005 {
02006 max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
02007 }
02008
02009
02010 return ::DDS::RETCODE_OK;
02011 }
02012
02013
02014 InstanceMap instance_map_;
02015 DataAllocator* data_allocator_;
02016 };
02017
02018 }
02019 }
02020
02021 #endif