1 #ifndef OPENDDS_DCPS_DATAREADERIMPL_T_H 2 #define OPENDDS_DCPS_DATAREADERIMPL_T_H 7 # define OPENDDS_HAS_STD_SHARED_PTR 20 #ifndef OPENDDS_HAS_STD_SHARED_PTR 25 #ifndef OPENDDS_HAS_STD_SHARED_PTR 40 template <
typename MessageType>
42 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1) 55 typename TraitsType::LessThanType) InstanceMap;
66 typedef typename TraitsType::DataReaderType
Interface;
70 return Interface::_is_a(type_id);
75 return Interface::_interface_repository_id();
96 void operator delete(
void* memory);
104 const MessageType*
message()
const {
return this; }
106 #ifndef OPENDDS_HAS_STD_UNIQUE_PTR 122 , marshal_skip_serialize_(false)
124 initialize_lookup_maps();
129 filter_delayed_sample_task_->cancel();
131 for (
typename InstanceMap::iterator it = instance_map_.begin();
132 it != instance_map_.end(); ++it)
147 data_allocator().reset(
new DataAllocator(get_n_chunks ()));
150 ACE_TEXT(
"(%P|%t) %CDataReaderImpl::")
152 ACE_TEXT(
" Cached_Allocator_With_Overflow ")
154 TraitsType::type_name(),
155 data_allocator().
get(),
162 MessageSequenceType & received_data,
170 check_inputs(
"read", received_data, info_seq, max_samples);
181 return read_i(received_data, info_seq, max_samples, sample_states,
182 view_states, instance_states, 0);
186 MessageSequenceType & received_data,
194 check_inputs(
"take", received_data, info_seq, max_samples);
205 return take_i(received_data, info_seq, max_samples, sample_states,
206 view_states, instance_states, 0);
210 MessageSequenceType & received_data,
213 DDS::ReadCondition_ptr a_condition)
216 check_inputs(
"read_w_condition", received_data, sample_info, max_samples);
225 if (!has_readcondition(a_condition))
230 return read_i(received_data, sample_info, max_samples,
231 a_condition->get_sample_state_mask(),
232 a_condition->get_view_state_mask(),
233 a_condition->get_instance_state_mask(),
234 #ifndef OPENDDS_NO_QUERY_CONDITION 235 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition));
242 MessageSequenceType & received_data,
245 DDS::ReadCondition_ptr a_condition)
248 check_inputs(
"take_w_condition", received_data, sample_info, max_samples);
257 if (!has_readcondition(a_condition))
262 return take_i(received_data, sample_info, max_samples,
263 a_condition->get_sample_state_mask(),
264 a_condition->get_view_state_mask(),
265 a_condition->get_instance_state_mask(),
266 #ifndef OPENDDS_NO_QUERY_CONDITION 267 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition)
277 bool found_data =
false;
284 for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
290 bool most_recent_generation =
false;
293 if (item->registered_data_) {
294 received_data = *
static_cast<MessageType*
>(item->registered_data_);
300 if (observer && item->registered_data_ && vd) {
305 if (!most_recent_generation) {
313 if (most_recent_generation) {
331 bool found_data =
false;
338 for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
344 bool most_recent_generation =
false;
359 if (!most_recent_generation) {
363 if (most_recent_generation) {
386 MessageSequenceType & received_data,
395 check_inputs(
"read_instance", received_data, info_seq, max_samples);
405 return read_instance_i(received_data, info_seq, max_samples, a_handle,
406 sample_states, view_states, instance_states, 0);
410 MessageSequenceType & received_data,
419 check_inputs(
"take_instance", received_data, info_seq, max_samples);
429 return take_instance_i(received_data, info_seq, max_samples, a_handle,
430 sample_states, view_states, instance_states, 0);
434 MessageSequenceType & received_data,
438 DDS::ReadCondition_ptr a_condition)
441 check_inputs(
"read_instance_w_condition", received_data, info_seq,
451 if (!has_readcondition(a_condition))
456 #ifndef OPENDDS_NO_QUERY_CONDITION 457 DDS::QueryCondition_ptr query_condition =
458 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition);
461 return read_instance_i(received_data, info_seq, max_samples, a_handle,
462 a_condition->get_sample_state_mask(),
463 a_condition->get_view_state_mask(),
464 a_condition->get_instance_state_mask(),
465 #ifndef OPENDDS_NO_QUERY_CONDITION 474 MessageSequenceType & received_data,
478 DDS::ReadCondition_ptr a_condition)
481 check_inputs(
"take_instance_w_condition", received_data, info_seq,
491 if (!has_readcondition(a_condition))
496 #ifndef OPENDDS_NO_QUERY_CONDITION 497 DDS::QueryCondition_ptr query_condition =
498 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition);
501 return take_instance_i(received_data, info_seq, max_samples, a_handle,
502 a_condition->get_sample_state_mask(),
503 a_condition->get_view_state_mask(),
504 a_condition->get_instance_state_mask(),
505 #ifndef OPENDDS_NO_QUERY_CONDITION 514 MessageSequenceType & received_data,
523 check_inputs(
"read_next_instance", received_data, info_seq, max_samples);
529 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
530 sample_states, view_states, instance_states, 0);
534 MessageSequenceType & received_data,
543 check_inputs(
"take_next_instance", received_data, info_seq, max_samples);
549 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
550 sample_states, view_states, instance_states, 0);
554 MessageSequenceType & received_data,
558 DDS::ReadCondition_ptr a_condition)
561 check_inputs(
"read_next_instance_w_condition", received_data, info_seq,
571 if (!has_readcondition(a_condition))
576 #ifndef OPENDDS_NO_QUERY_CONDITION 577 DDS::QueryCondition_ptr query_condition =
578 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition);
581 return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
582 a_condition->get_sample_state_mask(),
583 a_condition->get_view_state_mask(),
584 a_condition->get_instance_state_mask(),
585 #ifndef OPENDDS_NO_QUERY_CONDITION 594 MessageSequenceType & received_data,
598 DDS::ReadCondition_ptr a_condition)
601 check_inputs(
"take_next_instance_w_condition", received_data, info_seq,
611 if (!has_readcondition(a_condition))
616 #ifndef OPENDDS_NO_QUERY_CONDITION 617 DDS::QueryCondition_ptr query_condition =
618 dynamic_cast< DDS::QueryCondition_ptr
>(a_condition);
621 return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
622 a_condition->get_sample_state_mask(),
623 a_condition->get_view_state_mask(),
624 a_condition->get_instance_state_mask(),
625 #ifndef OPENDDS_NO_QUERY_CONDITION 634 MessageSequenceType & received_data,
639 if (received_data.length() != info_seq.length())
644 if (received_data.release())
652 received_data.length(0);
662 const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
663 if (pos != reverse_instance_map_.end()) {
664 key_holder = pos->second->first;
675 const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
676 if (it != instance_map_.end()) {
684 MessageSequenceType& received_data =
685 *
static_cast< MessageSequenceType*
> (seq);
687 if (!received_data.release())
690 received_data.length(0);
697 received_data.length(0);
700 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 713 const bool filter_has_non_key_fields = type_support ? evaluator.
has_non_key_fields(*type_support) :
true;
715 const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
716 for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
724 if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
727 if (evaluator.
eval(*static_cast<MessageType*>(item->registered_data_), params)) {
740 bool adjust_ref_count =
false)
742 MessageSequenceType data;
747 sample_states, view_states, instance_states, 0);
748 if (adjust_ref_count) {
750 received_data_p.increment_references();
753 gen.
samples_.reserve(data.length());
774 MessageSequenceType data;
777 sample_states, view_states, instance_states, 0);
779 samples.
reserve(data.length());
793 MessageSequenceType dataseq;
801 data =
new MessageType(dataseq[last]);
802 info = infoseq[last];
812 MessageSequenceType dataseq;
820 data =
new MessageType(dataseq[last]);
821 info = infoseq[last];
835 #ifndef OPENDDS_NO_MULTI_TOPIC 836 DDS::TopicDescription_var descr = get_topicdescription();
837 if (
MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
838 if (!mt->filter(sample)) {
844 get_subscriber_servant()->data_received(
this);
847 bool filtered =
false;
858 for (
int i = 0; i < 2; ++i) {
864 bool just_registered;
866 store_instance_data(
move(data),
DDS::HANDLE_NIL, header, instance, just_registered, filtered);
874 notify_read_conditions();
879 if (observer && vd) {
891 const GUID_t& publication_id)
906 bool just_registered, filtered;
909 store_instance_data(
move(data), publication_handle, header, si, just_registered, filtered);
911 notify_read_conditions();
929 if (!(ser >> encap)) {
932 ACE_TEXT(
"%CDataReaderImpl::lookup_instance: ")
933 ACE_TEXT(
"deserialization of encapsulation header failed.\n"),
934 TraitsType::type_name()));
939 if (!encap.
to_encoding(encoding, type_support_->base_extensibility())) {
943 if (decoding_modes_.find(encoding.
kind()) == decoding_modes_.end()) {
946 ACE_TEXT(
"%CDataReaderImpl::lookup_instance: ")
947 ACE_TEXT(
"Encoding kind of the received sample (%C) does not ")
948 ACE_TEXT(
"match the ones specified by DataReader.\n"),
949 TraitsType::type_name(),
956 ACE_TEXT(
"%CDataReaderImpl::lookup_instance: ")
957 ACE_TEXT(
"Deserializing with encoding kind %C.\n"),
958 TraitsType::type_name(),
962 ser.encoding(encoding);
970 ser_ret = ser >> data;
976 ACE_TEXT(
"object construction failure, dropping sample.\n"),
977 TraitsType::type_name()));
982 ACE_TEXT(
"deserialization failed.\n"),
983 TraitsType::type_name()));
990 typename InstanceMap::const_iterator
const it = instance_map_.find(data);
991 if (it != instance_map_.end()) {
998 instance = get_handle_instance(handle);
1008 if (qos_.time_based_filter.minimum_separation != zero) {
1011 const TimeDuration interval(qos_.time_based_filter.minimum_separation);
1012 FilterDelayedSampleQueue queue;
1015 for (
typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
1016 FilterDelayedSample& sample = pos->second;
1017 sample.expiration_time = now + (interval - (sample.expiration_time - now));
1018 queue.insert(std::make_pair(sample.expiration_time, pos->first));
1020 std::swap(queue, filter_delayed_sample_queue_);
1022 if (!filter_delayed_sample_queue_.empty()) {
1023 filter_delayed_sample_task_->cancel();
1024 filter_delayed_sample_task_->schedule(interval);
1028 filter_delayed_sample_task_->cancel();
1030 filter_delayed_sample_map_.clear();
1031 filter_delayed_sample_queue_.clear();
1044 marshal_skip_serialize_ =
value;
1049 return marshal_skip_serialize_;
1056 const typename InstanceMap::iterator end = instance_map_.end();
1057 typename InstanceMap::iterator it = instance_map_.begin();
1061 release_instance(handle);
1070 bool& just_registered,
1076 dynamic_hook(*data);
1080 if (marshal_skip_serialize_) {
1081 if (!MarshalTraitsType::from_message_block(*data, *payload)) {
1084 ACE_TEXT(
"attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
1086 return message_holder;
1088 store_instance_data(
move(data), publication_handle, sample.
header_, instance, just_registered, filtered);
1089 return message_holder;
1100 if (!(ser >> encap)) {
1103 ACE_TEXT(
"%CDataReaderImpl::dds_demarshal: ")
1104 ACE_TEXT(
"deserialization of encapsulation header failed.\n"),
1105 TraitsType::type_name()));
1107 return message_holder;
1110 if (!encap.
to_encoding(encoding, type_support_->base_extensibility())) {
1111 return message_holder;
1114 if (decoding_modes_.find(encoding.
kind()) == decoding_modes_.end()) {
1117 ACE_TEXT(
"%CDataReaderImpl::dds_demarshal: ")
1118 ACE_TEXT(
"Encoding kind %C of the received sample does not ")
1119 ACE_TEXT(
"match the ones specified by DataReader.\n"),
1120 TraitsType::type_name(),
1123 return message_holder;
1127 ACE_TEXT(
"%CDataReaderImpl::dds_demarshal: ")
1128 ACE_TEXT(
"Deserializing with encoding kind %C.\n"),
1129 TraitsType::type_name(),
1136 const bool key_only_marshaling =
1139 bool ser_ret =
true;
1140 if (key_only_marshaling) {
1143 ser_ret = ser >> *data;
1145 message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
1152 ACE_TEXT(
"object construction failure, dropping sample.\n"),
1153 TraitsType::type_name()));
1158 ACE_TEXT(
"deserialization failed, dropping sample.\n"),
1159 TraitsType::type_name()));
1162 return message_holder;
1165 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 1172 if (content_filtered_topic_) {
1174 if (key_only_marshaling != sample_only_has_key_fields) {
1177 ACE_TEXT(
"%CDataReaderImpl::dds_demarshal: ")
1178 ACE_TEXT(
"Mismatch between the key only and valid data properties ")
1179 ACE_TEXT(
"of a %C message of a content filtered topic!\n"),
1180 TraitsType::type_name(),
1184 message_holder.
reset();
1185 return message_holder;
1187 const MessageType& type =
static_cast<MessageType&
>(*data);
1188 if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
1190 message_holder.
reset();
1191 return message_holder;
1197 store_instance_data(
move(data), publication_handle, sample.
header_, instance, just_registered, filtered);
1198 return message_holder;
1211 bool just_registered =
false;
1212 bool filtered =
false;
1217 dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling,
false);
1237 const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
1238 if (pos != reverse_instance_map_.end()) {
1239 remove_from_lookup_maps(handle);
1240 instance_map_.erase(pos->second);
1241 reverse_instance_map_.erase(pos);
1247 const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
1248 if (pos != instances_.end()) {
1249 update_lookup_maps(pos);
1264 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER 1265 const bool is_dispose_msg =
1269 if (!is_bit() && security_config_) {
1283 const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1285 DDS::DynamicData_var dda =
1289 !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle,
this, publication_handle, dda, ex)) {
1292 "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
1297 }
else if (is_dispose_msg) {
1302 const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1304 DDS::DynamicData_var dda =
1308 !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle,
this, publication_handle, dda, ex)) {
1311 "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
1319 ACE_UNUSED_ARG(instance_data);
1320 ACE_UNUSED_ARG(publication_handle);
1321 ACE_UNUSED_ARG(header);
1322 ACE_UNUSED_ARG(instance_ptr);
1334 #ifndef OPENDDS_NO_QUERY_CONDITION
1335 DDS::QueryCondition_ptr a_condition)
1343 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1348 const bool group_coherent_ordered =
1350 && subqos_.presentation.coherent_access
1351 && subqos_.presentation.ordered_access;
1353 if (group_coherent_ordered && coherent_) {
1359 #ifndef OPENDDS_NO_QUERY_CONDITION
1366 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1367 if (!group_coherent_ordered) {
1369 const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1370 for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1374 if (!inst)
continue;
1382 if (observer && item->registered_data_ && vd) {
1388 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1390 const RakeData item = group_coherent_ordered_data_.get_data();
1394 typename InstanceMap::iterator i = instance_map_.begin();
1405 if (received_data.length()) {
1407 if (received_data.maximum() == 0) {
1408 received_data_p.set_loaner(
this);
1412 post_read_or_take();
1422 #ifndef OPENDDS_NO_QUERY_CONDITION
1423 DDS::QueryCondition_ptr a_condition)
1430 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1435 const bool group_coherent_ordered =
1437 && subqos_.presentation.coherent_access
1438 && subqos_.presentation.ordered_access;
1440 if (group_coherent_ordered && coherent_) {
1446 #ifndef OPENDDS_NO_QUERY_CONDITION
1453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1454 if (!group_coherent_ordered) {
1456 const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1457 for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1461 if (!inst)
continue;
1469 if (observer && item->registered_data_ && vd) {
1475 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1477 const RakeData item = group_coherent_ordered_data_.get_data();
1485 if (received_data.length()) {
1487 if (received_data.maximum() == 0) {
1488 received_data_p.set_loaner(
this);
1492 post_read_or_take();
1503 #ifndef OPENDDS_NO_QUERY_CONDITION
1504 DDS::QueryCondition_ptr a_condition)
1515 #ifndef OPENDDS_NO_QUERY_CONDITION
1521 if (state_obj->
match(view_states, instance_states)) {
1528 if (observer && item->registered_data_ && vd) {
1536 msg =
"view state is not valid";
1539 if (!msg.empty()) msg +=
" and ";
1540 msg +=
"instance state is ";
1545 ACE_TEXT(
"will return no data reading sub %C because:\n %C\n"),
1552 if (received_data.length()) {
1554 if (received_data.maximum() == 0) {
1555 received_data_p.set_loaner(
this);
1559 post_read_or_take();
1570 #ifndef OPENDDS_NO_QUERY_CONDITION
1571 DDS::QueryCondition_ptr a_condition)
1582 #ifndef OPENDDS_NO_QUERY_CONDITION
1588 if (state_obj->
match(view_states, instance_states)) {
1595 if (observer && item->registered_data_ && vd) {
1605 if (received_data.length()) {
1607 if (received_data.maximum() == 0) {
1608 received_data_p.set_loaner(
this);
1612 post_read_or_take();
1623 #ifndef OPENDDS_NO_QUERY_CONDITION
1624 DDS::QueryCondition_ptr a_condition)
1631 typename InstanceMap::iterator it = instance_map_.begin();
1632 const typename InstanceMap::iterator the_end = instance_map_.end();
1634 const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1635 if (pos != reverse_instance_map_.end()) {
1644 for (; it != the_end; ++it) {
1645 handle = it->second;
1647 read_instance_i(received_data, info_seq, max_samples, handle,
1648 sample_states, view_states, instance_states,
1649 #ifndef OPENDDS_NO_QUERY_CONDITION
1655 post_read_or_take();
1660 post_read_or_take();
1671 #ifndef OPENDDS_NO_QUERY_CONDITION
1672 DDS::QueryCondition_ptr a_condition)
1679 typename InstanceMap::iterator it = instance_map_.begin();
1680 const typename InstanceMap::iterator the_end = instance_map_.end();
1682 const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1683 if (pos != reverse_instance_map_.end()) {
1692 for (; it != the_end; ++it) {
1693 handle = it->second;
1695 take_instance_i(received_data, info_seq, max_samples, handle,
1696 sample_states, view_states, instance_states,
1697 #ifndef OPENDDS_NO_QUERY_CONDITION
1704 post_read_or_take();
1709 post_read_or_take();
1717 bool& just_registered,
1720 ACE_UNUSED_ARG(publication_handle);
1722 const bool is_dispose_msg =
1725 const bool is_unregister_msg =
1729 if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
1741 typename InstanceMap::const_iterator
const it = instance_map_.find(*instance_data);
1743 if (it == instance_map_.end()) {
1744 if (is_dispose_msg || is_unregister_msg) {
1748 std::size_t instances_size = 0;
1751 instances_size = instances_.size();
1754 ((::
CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
1756 DDS::DataReaderListener_var listener
1762 ++sample_rejected_status_.total_count;
1763 ++sample_rejected_status_.total_count_change;
1764 sample_rejected_status_.last_instance_handle = handle;
1770 listener->on_sample_rejected(
this, sample_rejected_status_);
1771 sample_rejected_status_.total_count_change = 0;
1773 notify_status_condition_no_sample_lock();
1781 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1782 SharedInstanceMap_rch inst;
1786 bool new_handle =
true;
1787 if (is_exclusive_ownership_) {
1789 temp.
swap(ownership_scoped_access);
1790 if (!owner_manager || ownership_scoped_access.
lock_result_ != 0) {
1796 ACE_TEXT(
"acquire instance_lock failed.\n"), TraitsType::type_name()));
1804 typename InstanceMap::const_iterator
const iter = inst->find(*instance_data);
1805 if (iter != inst->end ()) {
1806 handle = iter->second;
1813 just_registered =
true;
1815 bool owns_handle =
false;
1817 handle = get_next_handle(key);
1821 OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
1824 ref(instances_lock_),
1825 handle, owns_handle);
1827 const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
1828 instances_.insert(
typename SubscriptionInstanceMapType::value_type(handle, instance));
1830 if (bpair.second ==
false) {
1836 ACE_TEXT(
"insert handle failed.\n"), TraitsType::type_name()));
1840 update_lookup_maps(bpair.first);
1842 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1843 if (owner_manager) {
1845 inst = make_rch<SharedInstanceMap>();
1847 topic_servant_->type_name(),
1853 const std::pair<typename InstanceMap::iterator, bool> bpair =
1854 inst->insert(
typename InstanceMap::value_type(*instance_data, handle));
1855 if (!bpair.second) {
1861 ACE_TEXT(
"insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1868 temp.
swap(ownership_scoped_access);
1875 ACE_TEXT(
"release instance_lock failed.\n"), TraitsType::type_name()));
1883 std::pair<typename InstanceMap::iterator, bool> bpair =
1884 instance_map_.insert(
typename InstanceMap::value_type(*instance_data,
1886 if (bpair.second ==
false)
1893 ACE_TEXT(
"insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1897 reverse_instance_map_[handle] = bpair.first;
1901 just_registered =
false;
1902 handle = it->second;
1907 instance_ptr = get_handle_instance(handle);
1914 filtered = ownership_filter_instance(instance_ptr, header.
publication_id_);
1919 if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
1922 delay_sample(handle,
move(instance_data), header, just_registered, now, deadline);
1926 clear_sample(handle);
1935 finish_store_instance_data(
move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
1939 instance_ptr = get_handle_instance(handle);
1948 if ((qos_.resource_limits.max_samples_per_instance !=
1951 static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
1958 if (!is_dispose_msg && !is_unregister_msg
1961 DDS::DataReaderListener_var listener
1966 sample_rejected_status_.last_reason =
1968 ++sample_rejected_status_.total_count;
1969 ++sample_rejected_status_.total_count_change;
1970 sample_rejected_status_.last_instance_handle = instance_ptr->
instance_handle_;
1976 listener->on_sample_rejected(
this, sample_rejected_status_);
1977 sample_rejected_status_.total_count_change = 0;
1979 notify_status_condition_no_sample_lock();
1982 else if (!is_dispose_msg && !is_unregister_msg)
1995 for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
1996 iter != instances_.end();
2004 if (total_samples >= qos_.resource_limits.max_samples)
2011 if (!is_dispose_msg && !is_unregister_msg
2014 DDS::DataReaderListener_var listener
2019 sample_rejected_status_.last_reason =
2021 ++sample_rejected_status_.total_count;
2022 ++sample_rejected_status_.total_count_change;
2023 sample_rejected_status_.last_instance_handle = instance_ptr->
instance_handle_;
2028 listener->on_sample_rejected(
this, sample_rejected_status_);
2029 sample_rejected_status_.total_count_change = 0;
2031 notify_status_condition_no_sample_lock();
2035 else if (!is_dispose_msg && !is_unregister_msg)
2045 bool event_notify =
false;
2047 if (is_dispose_msg) {
2051 if (is_unregister_msg) {
2053 event_notify =
true;
2057 if (!is_dispose_msg && !is_unregister_msg) {
2058 event_notify =
true;
2062 if (!event_notify) {
2068 header, instance_data.
release(), &sample_lock_);
2079 if (! is_dispose_msg && ! is_unregister_msg
2087 DDS::DataReaderListener_var listener
2090 ++sample_lost_status_.total_count;
2091 ++sample_lost_status_.total_count_change;
2099 listener->on_sample_lost(
this, sample_lost_status_);
2101 sample_lost_status_.total_count_change = 0;
2104 notify_status_condition_no_sample_lock();
2110 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 2114 if (!sub || get_deleted())
2121 DDS::SubscriberListener_var sub_listener =
2127 sub_listener->on_data_on_readers(sub.
in());
2134 DDS::DataReaderListener_var listener =
2143 listener->on_data_available(
this);
2148 notify_status_condition_no_sample_lock();
2151 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 2175 notify_status_condition();
2181 MessageSequenceType& received_data,
2196 if (received_data.length() != info_seq.length())
2199 ACE_TEXT(
"(%P|%t) %CDataReaderImpl::%C ")
2200 ACE_TEXT(
"PRECONDITION_NOT_MET sample and info input ")
2201 ACE_TEXT(
"sequences do not match.\n"),
2202 TraitsType::type_name(),
2208 if ((received_data.maximum() > 0) && (received_data.release() ==
false))
2211 ACE_TEXT(
"(%P|%t) %CDataReaderImpl::%C ")
2212 ACE_TEXT(
"PRECONDITION_NOT_MET mismatch of ")
2213 ACE_TEXT(
"maximum %d and owns %d\n"),
2214 TraitsType::type_name(),
2216 received_data.maximum(),
2217 received_data.release() ));
2222 if (received_data.maximum() == 0)
2228 static_cast< ::
CORBA::Long> (received_data_p.max_slots());
2236 max_samples = received_data.maximum();
2239 max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
2243 ACE_TEXT(
"(%P|%t) %CDataReaderImpl::%C ")
2244 ACE_TEXT(
"PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
2245 TraitsType::type_name(),
2248 received_data.maximum()));
2257 if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
2259 max_samples =
static_cast< ::
CORBA::Long> (received_data_p.max_slots());
2269 const bool just_registered,
2276 typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
2277 if (i == filter_delayed_sample_map_.end()) {
2281 std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
2282 #ifdef ACE_HAS_CPP11 2283 filter_delayed_sample_map_.emplace(std::piecewise_construct,
2284 std::forward_as_tuple(handle),
2285 std::forward_as_tuple(
move(data), hdr, just_registered));
2287 filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(
move(data), hdr, just_registered)));
2289 FilterDelayedSample& sample = result.first->second;
2290 sample.expiration_time = deadline;
2291 const bool schedule = filter_delayed_sample_queue_.empty();
2292 filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
2294 filter_delayed_sample_task_->schedule(now - deadline);
2295 }
else if (filter_delayed_sample_queue_.begin()->second == handle) {
2296 filter_delayed_sample_task_->cancel();
2297 filter_delayed_sample_task_->schedule(now - deadline);
2300 FilterDelayedSample& sample = i->second;
2303 sample.message =
move(data);
2304 sample.header = hdr;
2305 sample.new_instance = just_registered;
2314 typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2315 if (sample != filter_delayed_sample_map_.end()) {
2317 sample->second.message.reset();
2325 typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2326 if (sample != filter_delayed_sample_map_.end()) {
2327 for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
2328 if (pos->second == handle) {
2329 filter_delayed_sample_queue_.erase(pos);
2335 filter_delayed_sample_map_.erase(handle);
2349 for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
2350 handles.push_back(pos->second);
2351 filter_delayed_sample_queue_.erase(pos++);
2354 const TimeDuration interval(qos_.time_based_filter.minimum_separation);
2356 for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
2364 typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
2365 if (data == filter_delayed_sample_map_.end()) {
2369 if (data->second.message) {
2370 const bool NOT_DISPOSE_MSG =
false;
2371 const bool NOT_UNREGISTER_MSG =
false;
2376 const bool new_instance = data->second.new_instance;
2379 finish_store_instance_data(
move(data->second.message),
2383 NOT_UNREGISTER_MSG);
2385 accept_sample_processing(instance, *header, new_instance);
2388 data = filter_delayed_sample_map_.find(handle);
2389 if (data == filter_delayed_sample_map_.end()) {
2394 data->second.expiration_time = now + interval;
2395 filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
2403 filter_delayed_sample_map_.erase(data);
2408 if (!filter_delayed_sample_queue_.empty()) {
2409 filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
2423 #ifdef OPENDDS_HAS_STD_SHARED_PTR 2430 : message(
move(msg))
2432 , new_instance(new_inst)
2448 template <
typename MessageType>
2452 MessageTypeMemoryBlock* block =
2453 static_cast<MessageTypeMemoryBlock*
>(pool.malloc(
sizeof(MessageTypeMemoryBlock)));
2458 template <
typename MessageType>
2467 template <
typename MessageType>
2470 operator delete(memory);
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
DataSampleHeader header_
The demarshalled sample header.
virtual DDS::ReturnCode_t take_next_sample(MessageType &received_data, DDS::SampleInfo &sample_info_ref)
void swap(MessageBlock &lhs, MessageBlock &rhs)
RcHandle< T > rchandle_from(T *pointer)
virtual DDS::ReturnCode_t read_next_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
InstanceMap instance_map_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release_loan(MessageSequenceType &received_data)
void set_instance_map(const char *type_name, const RcHandle< RcObject > &instance_map, DataReaderImpl *reader)
virtual void free(void *ptr)
static String kind_to_string(Kind value)
DDS::Time_t to_dds_time() const
const StatusKind SAMPLE_LOST_STATUS
const LogLevel::Value value
DDS::InstanceHandle_t lookup_instance_generic(const void *data)
InstanceStateKind instance_state
virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
virtual DDS::ReturnCode_t enable_specific()
char message_id_
The enum MessageId.
FilterDelayedSampleMap filter_delayed_sample_map_
void notify_status_condition_no_sample_lock()
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
size_t index_in_instance_
::DDS::ReturnCode_t take_next_sample(inout<%SCOPED%> received_data, inout ::DDS::SampleInfo sample_info)
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
RcHandle< SharedInstanceMap > SharedInstanceMap_rch
bool dispose_was_received(const GUID_t &writer_id)
const ReceivedDataElement * peek_tail()
TimeBasedFilterQosPolicy time_based_filter
#define OpenDDS_Dcps_Export
sequence< SampleInfo > SampleInfoSeq
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
virtual DDS::ReturnCode_t read_next_sample(MessageType &received_data, DDS::SampleInfo &sample_info_ref)
ReverseInstanceMap reverse_instance_map_
virtual DDS::ReturnCode_t read_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
ACE_UINT32 source_timestamp_nanosec_
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
MonotonicTimePoint last_accepted_
const char * c_str() const
virtual void push_back(const DDS::SampleInfo &info, const void *sample)=0
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
#define OPENDDS_ASSERT(C)
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
#define OPENDDS_MULTIMAP(K, T)
reference_wrapper< T > ref(T &r)
const char * _interface_repository_id() const
unsigned long InstanceStateMask
MarshalTraits< MessageType > MarshalTraitsType
T::rv_reference move(T &p)
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
void *const registered_data_
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
bool unregister_was_received(const GUID_t &writer_id)
virtual DDS::ReturnCode_t take_next_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
SubscriptionInstance_rch si_
static OPENDDS_STRING instance_state_mask_string(DDS::InstanceStateMask mask)
Return string representation of the instance state mask passed.
unique_ptr< ReceivedDataStrategy > rcvd_strategy_
ReceivedDataElementList strategy.
void accessed()
A read or take operation has been performed on this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
DDS::InstanceStateKind instance_state() const
virtual void release_instance_i(DDS::InstanceHandle_t handle)
bool valid_data() const
Returns true if the sample has a complete serialized payload.
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
virtual DDS::ReturnCode_t return_loan(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq)
TraitsType::DataReaderType Interface
virtual DDS::ReturnCode_t take_next_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t get_key_value(MessageType &key_holder, DDS::InstanceHandle_t handle)
size_t disposed_generation_count_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
static TimePoint_T< SystemClock > now()
ReceivedDataElement * remove_head()
const StatusKind DATA_ON_READERS_STATUS
SequenceNumber last_sequence_
Sequence number of the move recent data sample received.
void release_all_instances()
Release all instances held by the reader.
container_supported_unique_ptr< MessageTypeWithAllocator > message
virtual void reserve(CORBA::ULong size)=0
DDS::SampleStateKind sample_state_
MessageTypeWithAllocator()
virtual DDS::ReturnCode_t take_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
const ViewStateKind NOT_NEW_VIEW_STATE
RcHandle< RcObject > get_instance_map(const char *type_name, DataReaderImpl *reader)
const char * instance_state_string() const
Return string of the name of the current instance state.
ReliabilityQosPolicyKind kind
Class to serialize and deserialize data for DDS.
void swap(OwnershipManagerScopedAccess &rhs)
void data_was_received(const GUID_t &writer_id)
Data sample received for this instance.
virtual void state_updated_i(DDS::InstanceHandle_t handle)
InstanceHandle_t instance_handle
virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Holds a data sample received by the transport.
long ParticipantCryptoHandle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void dynamic_hook(MessageType &)
unsigned long InstanceStateKind
A fixed-size allocator that caches items for quicker access but if the pool is exhausted it will use ...
const StatusKind DATA_AVAILABLE_STATUS
void set_marshal_skip_serialize(bool value)
const InstanceStateMask ANY_INSTANCE_STATE
Implements the DDS::DataReader interface.
bool writes_instance(const GUID_t &writer_id) const
Returns true if the writer is a writer of this instance.
unique_ptr< DataAllocator > data_allocator_
const unsigned long DURATION_ZERO_NSEC
bool get_marshal_skip_serialize() const
CORBA::Boolean marshal(TAO_OutputCDR &)
const ViewStateMask ANY_VIEW_STATE
CORBA::Boolean _is_a(const char *type_id)
virtual DDS::ReturnCode_t read_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
MonotonicTimePoint expiration_time
ReceivedDataElementList * rdel_
bool store_instance_data_check(unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
::DDS::ReturnCode_t read_next_sample(inout<%SCOPED%> received_data, inout ::DDS::SampleInfo sample_info)
FilterDelayedSample(unique_ptr< MessageTypeWithAllocator > msg, DataSampleHeader_ptr hdr, bool new_inst)
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
bool most_recent_generation(ReceivedDataElement *item) const
OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_Mutex > DataAllocator
virtual DDS::ReturnCode_t take(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
MonotonicTimePoint last_sample_tv_
DataSampleHeader_ptr header
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
const SampleStateKind READ_SAMPLE_STATE
ACE_INT32 source_timestamp_sec_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
DDS::InstanceStateKind instance_state() const
Access instance state.
OpenDDS_Dcps_Export GUID_t make_part_guid(const GuidPrefix_t &prefix)
void delay_sample(DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
DDS::BuiltinTopicKey_t keyFromSample(TopicType *sample)
const ReturnCode_t RETCODE_NO_DATA
ConstructionStatus get_construction_status() const
virtual DDS::ReturnCode_t auto_return_loan(void *seq)
void filter_delayed(const MonotonicTimePoint &now)
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
void mark_read(ReceivedDataElement *item)
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDSTraits< MessageType > TraitsType
ReliabilityQosPolicy reliability
virtual ~DataReaderImpl_T()
OpenDDS_Dcps_Export LogLevel log_level
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch instance, size_t index_in_instance)
bool remove(ReceivedDataElement *data_sample)
bool eval(const T &sample, const DDS::StringSeq ¶ms) const
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
MessageTypeWithAllocator element_
const long DURATION_ZERO_SEC
#define OPENDDS_MAP_CMP_T
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
virtual RcHandle< MessageHolder > dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)
const Encoding & encoding() const
const ReturnCode_t RETCODE_ERROR
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool marshal_skip_serialize_
Duration_t minimum_separation
const MessageType * message() const
TraitsType::MessageSequenceType MessageSequenceType
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
virtual DDS::ReturnCode_t read(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
::DDS::ReturnCode_t get_key_value(inout<%SCOPED%> key_holder, in ::DDS::InstanceHandle_t handle)
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
void notify_status_condition()
virtual DDS::ReturnCode_t read_next_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
const char * to_string(MessageId value)
DDS::InstanceHandle_t store_synthetic_data(const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint ×tamp=SystemTimePoint::now())
void clear_sample(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t take_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
DDS::ViewStateKind view_state() const
Access view state.
size_t disposed_generation_count() const
Access disposed generation count.
const long LENGTH_UNLIMITED
unsigned long ViewStateKind
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
ReceivedDataElement * rde_
void cancel_release()
Cancel a scheduled or pending release of resources.
ACE_New_Allocator * allocator_
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleQueue filter_delayed_sample_queue_
virtual void qos_change(const DDS::DataReaderQos &qos)
DDS::DynamicData_ptr get_dynamic_data_adapter(DDS::DynamicType_ptr type, const T &value)
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
bool matches(CORBA::ULong sample_states) const
unique_ptr< DataAllocator > & data_allocator()
const InstanceState_rch instance_state_
Instance state for this instance.
virtual DDS::ReturnCode_t read_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
The Internal API and Implementation of OpenDDS.
virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
unsigned long ViewStateMask
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
size_t no_writers_generation_count_
const InstanceStateKind ALIVE_INSTANCE_STATE
virtual DDS::ReturnCode_t take_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
bool coherent_change_
Sample belongs to an active coherent change set.
const StatusKind SAMPLE_REJECTED_STATUS
virtual DDS::ReturnCode_t take(AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
virtual void qos_change(const DDS::DataReaderQos &qos)
sequence< string > StringSeq
bool has_non_key_fields(const TypeSupportImpl &ts) const
DDS::ReturnCode_t read_generic(GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
size_t no_writers_generation_count() const
Access no writers generation count.
DCPS::PmfSporadicTask< DataReaderImpl_T > DRISporadicTask
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
void drop_sample(DDS::InstanceHandle_t handle)
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
void set_instance_state_i(DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint ×tamp, const GUID_t &publication_id)
const ReturnCode_t RETCODE_BAD_PARAMETER
MessageTypeWithAllocator(const MessageType &other)
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
bool contains_sample_filtered(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq ¶ms)