31 #ifndef DDS_HAS_MINIMUM_BIT 35 #ifndef DDS_HAS_MINIMUM_BIT 36 # include <dds/DdsDcpsCoreTypeSupportC.h> 38 #include <dds/DdsDcpsCoreC.h> 39 #include <dds/DdsDcpsGuidTypeSupportImpl.h> 47 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 51 #ifndef __ACE_INLINE__ 61 : has_subscription_id_(false)
62 , subscription_id_mutex_()
63 , subscription_id_condition_(subscription_id_mutex_)
65 , reverse_sample_lock_(sample_lock_)
69 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
70 , is_exclusive_ownership_(false)
81 , last_deadline_missed_total_count_(0)
82 , deadline_queue_enabled_(false)
85 , always_get_history_(false)
86 , statistics_enabled_(false)
87 , raw_latency_buffer_size_(0)
89 , transport_disabled_(false)
142 #ifndef OPENDDS_SAFETY_PROFILE 146 if (type_lookup_service) {
162 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 171 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 178 #ifndef OPENDDS_NO_MULTI_TOPIC 187 DDS::DataReaderListener_ptr listener,
192 topic_desc_ = DDS::TopicDescription::_duplicate(topic_desc);
193 if (
TopicImpl* topic = dynamic_cast<TopicImpl*>(topic_desc)) {
199 #ifndef DDS_HAS_MINIMUM_BIT 203 #endif // !defined (DDS_HAS_MINIMUM_BIT) 208 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 224 ACE_TEXT(
"(%P|%t) WARNING: DataReaderImpl::init() - ")
225 ACE_TEXT(
"failed to get SubscriberQos\n")));
251 ACE_TEXT(
" This is a deleted datareader, ignoring add.\n")));
277 WriterInfo_rch info = make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(
this), writer_id, writer.
writerQos);
278 std::pair<WriterMapType::iterator, bool> bpair =
writers_.insert(
280 WriterMapType::value_type(
293 StatsMapType::value_type(
305 "(%P|%t) DataReaderImpl::add_association: " 306 "inserted writer %C.return %d\n",
307 LogGuid(writer_id).c_str(), bpair.second));
309 WriterMapType::iterator iter =
writers_.find(writer_id);
315 ACE_TEXT(
"(%P|%t) DataReaderImpl::add_association: ")
316 ACE_TEXT(
"reader %C is associated with writer %C.\n"),
348 ACE_TEXT(
"(%P|%t) DataReaderImpl::add_association: ")
349 ACE_TEXT(
"ERROR: transport layer failed to associate.\n")));
360 ACE_TEXT(
"(%P|%t) DataReaderImpl::transport_assoc_done: ")
361 ACE_TEXT(
"ERROR: transport layer failed to associate %C\n"),
371 ACE_TEXT(
"(%P|%t) DataReaderImpl::transport_assoc_done: ")
372 ACE_TEXT(
"starting/resetting liveliness timer for reader %C\n"),
397 ACE_TEXT(
"(%P|%t) DataReaderImpl::transport_assoc_done: ")
398 ACE_TEXT(
"id_to_handle_map_[ %C] = 0x%x.\n"),
418 DDS::DataReaderListener_var listener =
442 writers_[remote_id]->handle(handle);
457 if (writers.length() == 0) {
470 ACE_TEXT(
"(%P|%t) DataReaderImpl::remove_associations: ")
471 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
486 const GUID_t writer_id = writers[i];
504 if (writers.length() == 0) {
510 ACE_TEXT(
"(%P|%t) DataReaderImpl::remove_associations_i: ")
511 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
530 WriterMapType removed_writers;
540 const GUID_t writer_id = writers[i];
542 WriterMapType::iterator it = this->
writers_.find(writer_id);
545 removed_writers.insert(*it);
549 if (this->
writers_.erase(writer_id) == 0) {
552 ACE_TEXT(
"(%P|%t) DataReaderImpl::remove_associations_i: ")
553 ACE_TEXT(
"the writer local %C was already removed.\n"),
563 for (WriterMapType::iterator it = removed_writers.begin(); it != removed_writers.end(); ++it) {
564 it->second->removed();
566 removed_writers.clear();
568 wr_len = updated_writers.length();
586 for (
CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
609 = handles[ wr_len - 1];
613 DDS::DataReaderListener_var listener
651 size =
static_cast<int>(
writers_.size());
652 writers.length(size);
654 WriterMapType::iterator curr_writer =
writers_.begin();
655 WriterMapType::iterator end_writer =
writers_.end();
659 while (curr_writer != end_writer) {
660 writers[i++] = curr_writer->first;
671 ACE_TEXT(
"(%P|%t) WARNING: DataReaderImpl::remove_all_associations() - ")
672 ACE_TEXT(
"caught exception from remove_associations.\n")));
681 DDS::DataReaderListener_var listener =
717 GUID_t prefix = remote_participant;
722 typedef std::pair<GUID_t, WriterInfo_rch> RepoWriterPair;
728 for (WriterMapType::iterator pos =
writers_.lower_bound(prefix),
732 writers.push_back(std::make_pair(pos->first, pos->second));
737 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
740 pos->second->received_activity(when);
743 if (!writers.empty()) {
745 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
748 for (SubscriptionInstanceMapType::iterator iter =
instances_.begin();
765 view_states, instance_states);
770 #ifndef OPENDDS_NO_QUERY_CONDITION 775 const char* query_expression,
781 view_states, instance_states, query_expression);
785 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
788 }
catch (
const std::exception& e) {
791 ACE_TEXT(
"DataReaderImpl::create_querycondition - %C\n"),
802 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
807 DDS::ReadCondition_ptr a_condition)
811 DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
848 subscriber->get_qos(subscriberQos);
850 disco->update_subscription_qos(
859 ACE_TEXT(
"(%P|%t) DataReaderImpl::set_qos, ")
910 DDS::DataReaderListener_ptr a_listener,
916 listener_ = DDS::DataReaderListener::_duplicate(a_listener);
923 return DDS::DataReaderListener::_duplicate(
listener_.in());
929 return DataReaderListener::_narrow(
listener_.in());
934 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 942 return DDS::TopicDescription::_duplicate(
topic_desc_.in());
1056 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
1057 ACE_TEXT(
" Entity is not enabled.\n")),
1070 for (RepoIdToHandleMap::iterator
1073 ++current, ++index) {
1074 publication_handles[index] = current->second;
1080 #if !defined (DDS_HAS_MINIMUM_BIT) 1088 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::")
1089 ACE_TEXT(
"get_matched_publication_data: ")
1090 ACE_TEXT(
"Entity is not enabled.\n")),
1099 DDS::PublicationBuiltinTopicDataSeq data;
1100 const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
1107 publication_data = data[0];
1112 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1131 if (!subscriber->is_enabled()) {
1141 dp_id_ = participant->get_id();
1183 " Cached_Allocator_With_Overflow %x with %d chunks\n",
1205 disco->pre_reader(
this);
1215 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::enable, ")
1216 ACE_TEXT(
"Transport Exception.\n")));
1222 return setup_deserialization_result;
1230 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 1242 subscriber->get_qos(sub_qos);
1254 typesupport->
add_types(type_lookup_service);
1258 const GUID_t subscription_id =
1271 #if defined(OPENDDS_SECURITY) 1289 "add_subscription failed\n"));
1296 "got GUID %C, subscribed to topic name \"%C\" type \"%C\"\n",
1305 return_value = subscriber->reader_enabled(name.
in(),
this);
1319 return return_value;
1338 writer = iter->second;
1345 ACE_TEXT(
"(%P|%t) DataReaderImpl::writer_activity: ")
1346 ACE_TEXT(
"reader %C is not associated with writer %C.\n"),
1361 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1380 publication_handle = pos->second;
1392 ACE_TEXT(
"(%P|%t) DataReaderImpl::data_received: ")
1393 ACE_TEXT(
"%C received sample: %C.\n"),
1418 subscriber->data_received(
this);
1429 bool is_new_instance =
false;
1430 bool filtered =
false;
1440 ACE_TEXT(
"(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
1441 ACE_TEXT(
"instance %d is_new_instance %d filtered %d\n"),
1445 is_new_instance, filtered));
1448 if (filtered)
break;
1454 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1464 if (!(serializer >> control)) {
1466 ACE_TEXT(
"deserialization coherent change control failed.\n")));
1471 std::stringstream buffer;
1472 buffer << control << std::endl;
1475 ACE_TEXT(
"(%P|%t) DataReaderImpl::data_received: ")
1476 ACE_TEXT(
"END_COHERENT_CHANGES %C\n"),
1477 buffer.str().c_str()));
1484 WriterMapType::iterator it =
1489 ACE_TEXT(
"(%P|%t) WARNING: DataReaderImpl::data_received() - ")
1490 ACE_TEXT(
" subscription %C failed to find ")
1491 ACE_TEXT(
" publication data for %C!\n"),
1497 writer = it->second;
1507 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 1512 ACE_TEXT(
"(%P|%t) DataReaderImpl::data_received: ")
1513 ACE_TEXT(
"reader %C got datawriter liveliness from writer %C\n"),
1522 for (SubscriptionInstanceMapType::iterator iter =
instances_.begin();
1545 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1555 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1577 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1583 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1605 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1619 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1634 if (!(ser >> readerId)) {
1636 ACE_TEXT(
"deserialization reader failed.\n")));
1645 ACE_DEBUG((
LM_INFO,
"(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
1653 "(%P|%t) Resumed sample processing for durable writer %C\n",
1661 "(%P|%t) ERROR: DataReaderImpl::data_received" 1662 "unexpected message_id = %d\n",
1667 if (observer && real_data && vd) {
1698 for (ReadConditionSet::iterator it = local_read_conditions.begin(),
1699 end = local_read_conditions.end(); it != end; ++it) {
1705 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
1706 ACE_TEXT(
"Failed to obtain ConditionImpl - can't notify.\n")));
1752 DDS::DataReaderListener_ptr
1762 return subscriber->listener_for(kind);
1765 return DDS::DataReaderListener::_duplicate(
listener_.in());
1807 for (SubscriptionInstanceMapType::iterator iter =
instances_.begin();
1821 execute_or_enqueue(make_rch<CheckLivelinessCommand>(
this));
1841 if (! data_reader) {
1842 this->reactor()->purge_pending_notifications(
this);
1846 long local_timer_id = liveliness_timer_id_;
1847 bool timer_was_reset =
false;
1849 if (local_timer_id != -1 && cancel) {
1852 ACE_TEXT(
"(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1853 ACE_TEXT(
" canceling timer for reader %C.\n"),
1854 LogGuid(data_reader->get_guid()).c_str()));
1859 if (this->reactor()->cancel_timer(local_timer_id) == -1) {
1864 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1868 timer_was_reset =
true;
1873 int alive_writers = 0;
1884 if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
1885 liveliness_timer_id_ = -1;
1892 data_reader->writers_lock_);
1893 WriterMapType writers = data_reader->writers_;
1894 read_guard.release();
1896 for (WriterMapType::iterator iter = writers.begin();
1897 iter != writers.end();
1902 if (!next_absolute.is_max()) {
1904 smallest = std::min(smallest, next_absolute);
1909 if (!alive_writers) {
1912 liveliness_timer_id_ = -1;
1917 ACE_TEXT(
"(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1918 ACE_TEXT(
"reader %C has %d live writers; from_reactor=%d\n"),
1919 LogGuid(data_reader->get_guid()).c_str(),
1925 if (alive_writers) {
1928 if (now < smallest) {
1929 relative = smallest - now;
1933 liveliness_timer_id_ = this->reactor()->schedule_timer(
this, 0, relative.
value());
1935 if (liveliness_timer_id_ == -1) {
1937 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1946 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1948 if (owner_manager) {
1958 "could not find the instance by handle 0x%x\n", handle));
1990 double datum =
static_cast<double>(delay.
value().
sec());
1991 datum += delay.
value().
usec() / 1000000.0;
2014 #ifndef OPENDDS_SAFETY_PROFILE 2018 <<
" samples out of " << this->
stats_.
n() << std::endl;
2019 return str << this->
stats_;
2021 #endif //OPENDDS_SAFETY_PROFILE 2030 ACE_TEXT(
"(%P|%t) DataReaderImpl::writer_removed: ")
2031 ACE_TEXT(
"reader %C from writer %C.\n"),
2033 LogGuid(info_writer_id).c_str()));
2036 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2038 if (owner_manager) {
2048 RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2049 if (pos != publication_id_to_handle_map_.end()) {
2050 publication_handle = pos->second;
2054 bool liveliness_changed =
false;
2061 --liveliness_changed_status_.alive_count;
2062 --liveliness_changed_status_.alive_count_change;
2063 liveliness_changed =
true;
2067 --liveliness_changed_status_.not_alive_count;
2068 --liveliness_changed_status_.not_alive_count_change;
2069 liveliness_changed =
true;
2072 liveliness_changed_status_.last_publication_handle = info.
handle();
2073 instances_liveliness_update(info_writer_id, publication_handle);
2075 if (liveliness_changed) {
2077 this->notify_liveliness_change();
2089 ACE_TEXT(
"reader %C from writer %C previous state %C.\n"),
2091 LogGuid(info_writer_id).c_str(),
2100 bool liveliness_changed =
false;
2105 liveliness_changed_status_.alive_count++;
2106 liveliness_changed_status_.alive_count_change++;
2107 liveliness_changed =
true;
2111 liveliness_changed_status_.not_alive_count--;
2112 liveliness_changed_status_.not_alive_count_change--;
2115 if (liveliness_changed_status_.alive_count < 0) {
2117 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2118 ACE_TEXT(
"invalid liveliness_changed_status alive count - %d.\n"),
2119 liveliness_changed_status_.alive_count));
2123 if (liveliness_changed_status_.not_alive_count < 0) {
2125 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2126 ACE_TEXT(
"invalid liveliness_changed_status not alive count - %d.\n"),
2127 liveliness_changed_status_.not_alive_count));
2131 liveliness_changed_status_.last_publication_handle = info.
handle();
2137 if (this->monitor_) {
2138 this->monitor_->report();
2142 if (liveliness_changed) {
2144 this->notify_liveliness_change();
2149 liveliness_timer_->check_liveliness();
2159 ACE_TEXT(
"reader %C from writer %C previous state %C.\n"),
2161 LogGuid(info_writer_id).c_str(),
2165 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2167 if (owner_manager) {
2173 bool liveliness_changed =
false;
2180 RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2181 if (pos != publication_id_to_handle_map_.end()) {
2182 publication_handle = pos->second;
2190 ++liveliness_changed_status_.not_alive_count;
2191 ++liveliness_changed_status_.not_alive_count_change;
2192 liveliness_changed =
true;
2196 --liveliness_changed_status_.alive_count;
2197 --liveliness_changed_status_.alive_count_change;
2200 if (liveliness_changed_status_.alive_count < 0) {
2202 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2203 ACE_TEXT(
"invalid liveliness_changed_status alive count - %d.\n"),
2204 liveliness_changed_status_.alive_count));
2208 if (liveliness_changed_status_.not_alive_count < 0) {
2210 ACE_TEXT(
"(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2211 ACE_TEXT(
"invalid liveliness_changed_status not alive count - %d.\n"),
2212 liveliness_changed_status_.not_alive_count));
2216 liveliness_changed_status_.last_publication_handle = info.
handle();
2220 if (this->monitor_) {
2221 this->monitor_->report();
2224 instances_liveliness_update(info_writer_id, publication_handle);
2227 if (liveliness_changed) {
2229 this->notify_liveliness_change();
2239 InstanceSet localinsts;
2242 if (instances_.size() == 0) {
2245 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2246 iter != instances_.end(); ++iter) {
2247 if (iter->second->instance_state_->writes_instance(writer)) {
2248 localinsts.insert(iter->first);
2253 for (InstanceSet::iterator iter = localinsts.begin(); iter != localinsts.end(); ++iter) {
2264 sample_lost_status_ = status;
2272 sample_rejected_status_ = status;
2289 if (location != this->statistics_.end()) {
2294 if ((this->statistics_enabled()) ||
2295 (this->qos_.latency_budget.duration > zero)) {
2302 if (this->statistics_enabled()) {
2303 location->second.add_stat(latency);
2308 ACE_TEXT(
"(%P|%t) DataReaderImpl::process_latency() - ")
2309 ACE_TEXT(
"measured latency of %C for current sample.\n"),
2310 latency.
str().c_str()));
2313 if (this->qos_.latency_budget.duration > zero) {
2315 if (latency >
TimeDuration(this->qos_.latency_budget.duration)) {
2330 ACE_TEXT(
"(%P|%t) DataReaderImpl::process_latency() - ")
2331 ACE_TEXT(
"reader %C is not associated with writer %C (late sample?).\n"),
2341 DataReaderListener_var listener = get_ext_listener();
2345 writerIds.length(1);
2346 writerIds[ 0] = writer;
2349 this->lookup_instance_handles(writerIds, handles);
2351 if (handles.length() >= 1) {
2352 this->budget_exceeded_status_.last_instance_handle = handles[ 0];
2355 this->budget_exceeded_status_.last_instance_handle = -1;
2358 ++this->budget_exceeded_status_.total_count;
2359 ++this->budget_exceeded_status_.total_count_change;
2361 listener->on_budget_exceeded(
this, this->budget_exceeded_status_);
2363 this->budget_exceeded_status_.total_count_change = 0;
2367 #ifndef OPENDDS_SAFETY_PROFILE 2373 stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
2376 for (StatsMapType::const_iterator current = this->statistics_.begin();
2377 current != this->statistics_.end();
2378 ++current, ++index) {
2379 stats[ index] = current->second.get_stats();
2380 stats[ index].publication = current->first;
2389 for (StatsMapType::iterator current = this->statistics_.begin();
2390 current != this->statistics_.end();
2392 current->second.reset_stats();
2399 return statistics_enabled_;
2406 statistics_enabled_ = statistics_enabled;
2417 this->set_deleted(
true);
2418 this->stop_associating();
2419 this->send_final_acks();
2420 subscription_id_condition_.notify_all();
2428 SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
2429 if (iter == instances_.end()) {
2432 ACE_TEXT(
"DataReaderImpl::get_handle_instance: ")
2433 ACE_TEXT(
"lookup for 0x%x failed\n"),
2438 return iter->second;
2450 return participant->assign_handle(
id);
2453 return participant->assign_handle();
2461 participant->return_handle(handle);
2468 DBG_ENTRY_LVL(
"DataReaderImpl",
"notify_subscription_disconnected",6);
2472 DataReaderListener_var the_listener = get_ext_listener();
2480 the_listener->on_subscription_disconnected(
this, status);
2487 DBG_ENTRY_LVL(
"DataReaderImpl",
"notify_subscription_reconnected",6);
2489 if (!this->is_bit_) {
2492 DataReaderListener_var the_listener = get_ext_listener();
2500 the_listener->on_subscription_reconnected(
this, status);
2508 DBG_ENTRY_LVL(
"DataReaderImpl",
"notify_subscription_lost",6);
2510 if (!this->is_bit_) {
2513 DataReaderListener_var the_listener = get_ext_listener();
2525 the_listener->on_subscription_lost(
this, status);
2533 DBG_ENTRY_LVL(
"DataReaderImpl",
"notify_subscription_lost",6);
2537 DataReaderListener_var the_listener = get_ext_listener();
2545 the_listener->on_subscription_lost(
this, status);
2557 const char* separator =
"";
2567 ACE_TEXT(
"(%P|%t) DataReaderImpl::lookup_instance_handles: ")
2568 ACE_TEXT(
"searching for handles for writer Ids: %C.\n"),
2572 hdls.length(num_wrts);
2577 hdls[i] = participant->lookup_handle(ids[i]);
2592 ACE_TEXT(
"(%P|%t) DataReaderImpl::filter_sample: ")
2593 ACE_TEXT(
"Discarded historic data.\n")));
2612 if (now >= expiration_time) {
2617 ACE_TEXT(
"expired by %d seconds, %d microseconds.\n"),
2633 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2634 if (this->is_exclusive_ownership_) {
2637 WriterMapType::iterator iter = writers_.find(pubid);
2639 if (iter == writers_.end()) {
2645 ACE_TEXT(
"(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2646 ACE_TEXT(
"reader %C is not associated with writer %C.\n"),
2660 bool is_owner = owner_manager && owner_manager->
select_owner (
2662 iter->second->writer_id(),
2663 iter->second->writer_qos_ownership_strength(),
2670 ACE_TEXT(
"(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2671 ACE_TEXT(
"reader %C writer %C is not elected as owner %C\n"),
2682 ACE_TEXT(
"(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2683 ACE_TEXT(
"reader %C writer %C is not owner %C\n"),
2692 ACE_UNUSED_ARG(pubid);
2693 ACE_UNUSED_ARG(instance);
2704 const TimeDuration minimum_separation(qos_.time_based_filter.minimum_separation);
2708 if (!minimum_separation.
is_zero()) {
2710 deadline = now + minimum_separation;
2722 return this->is_bit_;
2734 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2735 iter != instances_.end();
2753 DDS::DataReaderListener_var listener
2759 liveliness_changed_status_.not_alive_count_change = 0;
2761 listener->on_liveliness_changed(
this, status);
2763 notify_status_condition();
2768 output_str +=
"subscription ";
2770 output_str +=
", listener at: 0x";
2773 for (WriterMapType::iterator current = this->writers_.begin();
2774 current != this->writers_.end();
2776 const GUID_t id = current->first;
2777 output_str +=
"\n\tNOTIFY: writer[ ";
2779 output_str +=
"] == ";
2780 output_str += current->second->get_state_str();
2784 ACE_TEXT(
"(%P|%t) DataReaderImpl::notify_liveliness_change: ")
2785 ACE_TEXT(
"listener at 0x%x, mask 0x%x.\n")
2789 output_str.c_str()));
2798 subscriber->set_status_changed_flag(
2806 return this->reactor_;
2827 for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
2828 end = instances_.end(); iter != end; ++iter) {
2829 instance_handles.push_back(iter->first);
2838 this->writers_lock_);
2839 for (WriterMapType::iterator iter = writers_.begin();
2840 iter != writers_.end();
2843 iter->second->state()));
2847 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2854 this->writers_lock_);
2855 for (WriterMapType::iterator iter = writers_.begin();
2856 iter != writers_.end();
2858 if (iter->second->writer_id() == pub_id) {
2859 if (ownership_strength != iter->second->writer_qos_ownership_strength()) {
2862 ACE_TEXT(
"(%P|%t) DataReaderImpl::update_ownership_strength - ")
2863 ACE_TEXT(
"local %C update remote %C strength from %d to %d\n"),
2866 iter->second->writer_qos_ownership_strength(), ownership_strength));
2868 iter->second->writer_qos_ownership_strength(ownership_strength);
2869 iter->second->clear_owner_evaluated();
2877 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 2881 bool accept_here =
true;
2887 subqos_.presentation.coherent_access) {
2894 subscriber->coherent_change_received(publisher_id,
this, state);
2895 accept_here =
false;
2899 reject_coherent(writer_id, publisher_id);
2905 if (state ==
COMPLETED && accept_here) {
2906 accept_coherent(writer_id, publisher_id);
2907 coherent_changes_completed(
this);
2915 const GUID_t& publisher_id)
2919 ACE_TEXT(
"(%P|%t) DataReaderImpl::accept_coherent()")
2920 ACE_TEXT(
" reader %C writer %C publisher %C\n"),
2923 LogGuid(publisher_id).c_str()));
2925 SubscriptionInstanceSet localsubs;
2928 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2929 iter != this->instances_.end(); ++iter) {
2930 localsubs.insert(iter->second);
2934 for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2935 iter != localsubs.end(); iter++) {
2936 (*iter)->rcvd_strategy_->accept_coherent(writer_id, publisher_id);
2942 const GUID_t& publisher_id)
2946 ACE_TEXT(
"(%P|%t) DataReaderImpl::reject_coherent()")
2947 ACE_TEXT(
" reader %C writer %C publisher %C\n"),
2950 LogGuid(publisher_id).c_str()));
2953 SubscriptionInstanceSet localsubs;
2956 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2957 iter != this->instances_.end(); ++iter) {
2958 localsubs.insert(iter->second);
2962 for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2963 iter != localsubs.end(); iter++) {
2964 (*iter)->rcvd_strategy_->reject_coherent(writer_id, publisher_id);
2966 this->reset_coherent_info(writer_id, publisher_id);
2971 const GUID_t& publisher_id)
2975 WriterMapType::iterator itEnd = this->writers_.end();
2976 for (WriterMapType::iterator it = this->writers_.begin();
2977 it != itEnd; ++it) {
2978 if (it->second->writer_id() == writer_id
2979 && it->second->publisher_id() == publisher_id) {
2980 it->second->reset_coherent_info();
2992 for (WriterMapType::iterator iter = writers_.begin();
2993 iter != writers_.end();
2996 if (iter->second->publisher_id() == publisher_id) {
3021 ::DDS::SubscriberListener_var sub_listener =
3028 if (reader ==
this) {
3031 sub_listener->on_data_on_readers(subscriber.
in());
3039 subscriber->notify_status_condition();
3041 ::DDS::DataReaderListener_var listener =
3049 if (reader ==
this) {
3052 listener->on_data_available(
this);
3054 listener->on_data_available(
this);
3062 this->notify_status_condition();
3071 this->coherent_ =
true;
3078 this->coherent_ =
false;
3079 this->group_coherent_ordered_data_.reset();
3080 this->post_read_or_take();
3089 SubscriptionInstanceSet localsubs;
3092 for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
3093 iter != instances_.end(); ++iter) {
3094 localsubs.insert(iter->second);
3100 for (SubscriptionInstanceSet::iterator iter = localsubs.begin(); iter != localsubs.end(); ++iter) {
3107 group_coherent_ordered_data_.insert_sample(item, &inst->
rcvd_samples_, *iter, ++i);
3113 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 3119 this->subqos_ = qos;
3122 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 3129 content_filtered_topic_ = cft;
3133 DDS::ContentFilteredTopic_ptr
3137 return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
3141 #ifndef OPENDDS_NO_MULTI_TOPIC 3149 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 3155 disco->update_subscription_params(domain_id_,
3166 for (WriterMapType::iterator iter = writers_.begin();
3167 iter != writers_.end();
3169 iter->second->set_owner_evaluated(instance,
false);
3179 WriterMapType::iterator where = writers_.find(pub_id);
3180 if (writers_.end() != where) {
3181 info = where->second;
3189 deliver_historic(to_deliver);
3199 if (iter != writers_.end()) {
3202 if (iter->second->check_historic(seq, sample, last_historic_seq)) {
3207 && seq <= last_historic_seq) {
3218 const iter_t end = samples.end();
3219 for (iter_t iter = samples.begin(); iter != end; ++iter) {
3221 data_received(iter->second);
3232 WriterMapType::iterator it = writers_.find(peer);
3233 if (it != writers_.end()) {
3236 end_historic_sweeper_->schedule_timer(it->second);
3248 if (type ==
"rtps_udp" || type ==
"multicast") {
3249 resume_sample_processing(peer);
3277 WriterMapType::const_iterator iter = writers_.find(writerId);
3278 if (iter == writers_.end()) {
3293 bool xcdr1_mutable =
false;
3294 bool illegal_unaligned =
false;
3295 for (
CORBA::ULong i = 0; i < qos_.representation.value.length(); ++i) {
3299 xcdr1_mutable =
true;
3301 illegal_unaligned =
true;
3303 decoding_modes_.insert(encoding_kind);
3307 "DataReaderImpl::setup_deserialization: " 3308 "Encountered unsupported or unknown data representation: %C\n",
3312 if (decoding_modes_.empty()) {
3315 if (xcdr1_mutable) {
3316 error_message =
" Unsupported combination of XCDR1 and mutable";
3317 }
else if (illegal_unaligned) {
3318 error_message =
" Unaligned CDR is not allowed in rtps_udp transport";
3321 "DataReaderImpl::setup_deserialization: " 3322 "Could not find a valid data representation.%C\n",
3323 error_message.c_str()));
3329 EncodingKinds::iterator it = decoding_modes_.begin();
3330 for (; it != decoding_modes_.end(); ++it) {
3331 if (!encodings.empty()) {
3337 "Setup successfully with the following data representation%C: %C\n",
3338 encodings.size() != 1 ?
"s" :
"",
3339 encodings.c_str()));
3347 bool is_new_instance)
3349 bool accepted =
true;
3350 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 3351 bool verify_coherent =
false;
3358 WriterMapType::iterator where = writers_.find(header.
publication_id_);
3360 if (where != writers_.end()) {
3363 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 3366 verify_coherent =
true;
3368 writer = where->second;
3373 ACE_TEXT(
"(%P|%t) WARNING: DataReaderImpl::accept_sample_processing - ")
3374 ACE_TEXT(
"subscription %C failed to find ")
3375 ACE_TEXT(
"publication data for %C.\n"),
3381 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 3382 if (verify_coherent) {
3383 accepted = verify_coherent_changes_completion(writer.
in());
3387 if (instance && deadline_queue_enabled_) {
3391 if (is_new_instance) {
3392 schedule_deadline(instance,
false);
3399 notify_read_conditions();
3403 #if defined(OPENDDS_SECURITY) 3441 const GUID_t pub_id = info->writer_id();
3453 ACE_DEBUG((
LM_INFO,
"(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
3454 LogGuid(reader->get_guid()).c_str(),
3458 reader->resume_sample_processing(pub_id);
3465 info_->schedule_historic_samples_timer(sweeper_, ten_seconds);
3466 const bool insert_result = sweeper_->info_set_.insert(info_).second;
3469 ACE_DEBUG((
LM_INFO,
"(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - sweeper %@ is now scheduled\n", info_.in()));
3475 info_->cancel_historic_samples_timer(sweeper_);
3476 const bool erase_result = sweeper_->info_set_.erase(info_) > 0;
3479 ACE_DEBUG((
LM_INFO,
"(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - sweeper %@ is no longer scheduled\n", info_.in()));
3485 populate_connection_info();
3487 const GUID_t dp_id_copy = dp_id_;
3489 disco->update_subscription_locators(domain_id_,
3499 if (!subscriber || !data_reader) {
3503 if (set_reader_status_) {
3509 sub_listener_->on_data_on_readers(subscriber.
in());
3517 if (data_reader && set_reader_status_) {
3521 if (data_reader && set_subscriber_status_) {
3528 if (call_ && data_reader) {
3529 listener_->on_data_available(data_reader.
in());
3536 for (
CORBA::ULong is = 1; is <= MAX_SAMPLE_STATE_MASK; ++is) {
3537 for (
CORBA::ULong iv = 1; iv <= MAX_VIEW_STATE_MASK; ++iv) {
3538 for (
CORBA::ULong ii = 1; ii <= MAX_INSTANCE_STATE_MASK; ++ii) {
3539 combined_state_lookup_[to_combined_states(is, iv, ii)] = HandleSet();
3544 combined_state_lookup_[0] = HandleSet();
3549 for (LookupMap::iterator it = combined_state_lookup_.begin(); it != combined_state_lookup_.end(); ++it) {
3550 if (it->first == 0)
continue;
3552 split_combined_states(it->first, sample_states, view_states, instance_states);
3553 if (input->second->matches(sample_states, view_states, instance_states)) {
3554 it->second.insert(input->first);
3556 it->second.erase(input->first);
3563 for (LookupMap::iterator it = combined_state_lookup_.begin(), the_end = combined_state_lookup_.end(); it != the_end; ++it) {
3564 if (it->first == 0)
continue;
3565 it->second.erase(handle);
3571 const CORBA::ULong combined_states = to_combined_states(sample_states, view_states, instance_states);
3572 LookupMap::const_iterator ci = combined_state_lookup_.find(combined_states);
3583 const bool schedule = deadline_queue_.empty();
3584 deadline_queue_.insert(std::make_pair(instance->
deadline_, instance));
3585 if (!timer_called) {
3587 deadline_task_->schedule(deadline_period_);
3588 }
else if (deadline_queue_.begin()->second == instance) {
3590 deadline_task_->cancel();
3591 deadline_task_->schedule(deadline_period_);
3601 for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->
deadline_), limit = deadline_queue_.upper_bound(instance->
deadline_); pos != limit; ++pos) {
3602 if (pos->second == instance) {
3603 deadline_queue_.erase(pos);
3618 bool missed =
false;
3623 }
else if (timer_called) {
3636 ++requested_deadline_missed_status_.total_count;
3637 requested_deadline_missed_status_.total_count_change =
3638 requested_deadline_missed_status_.total_count - last_deadline_missed_total_count_;
3639 requested_deadline_missed_status_.last_instance_handle = instance->
instance_handle_;
3645 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 3661 listener->on_requested_deadline_missed(
this, status);
3665 last_deadline_missed_total_count_ = requested_deadline_missed_status_.total_count;
3668 notify_status_condition();
3675 schedule_deadline(instance, timer_called);
3677 cancel_deadline(instance);
3678 schedule_deadline(instance, timer_called);
3686 deadline_queue_.clear();
3687 deadline_task_->cancel();
3692 if (deadline_period_ != deadline_period) {
3693 deadline_period_ = deadline_period;
3695 if (deadline_queue_enabled_) {
3698 for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
3699 iter != this->instances_.end();
3702 reschedule_deadline(iter->second, now);
3718 for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->
deadline_), limit = deadline_queue_.upper_bound(instance->
deadline_); pos != limit; ++pos) {
3719 if (pos->second == instance) {
3720 deadline_queue_.erase(pos);
3727 const bool schedule = deadline_queue_.empty();
3728 deadline_queue_.insert(std::make_pair(instance->
deadline_, instance));
3730 deadline_task_->schedule(deadline_period_);
3731 }
else if (deadline_queue_.begin()->second == instance) {
3733 deadline_task_->cancel();
3734 deadline_task_->schedule(deadline_period_);
3744 for (DeadlineQueue::iterator pos = deadline_queue_.begin(), limit = deadline_queue_.end(); pos != limit && pos->first <= now;) {
3746 deadline_queue_.erase(pos++);
3748 process_deadline(instance, now,
true);
3751 if (!deadline_queue_.empty()) {
3752 deadline_task_->schedule(deadline_queue_.begin()->first - now);
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
DataSampleHeader header_
The demarshalled sample header.
bool is_last(const GUID_t &pub)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
void resume_sample_processing(const GUID_t &pub_id)
when done handling historic samples, resume
virtual void transport_assoc_done(int flags, const GUID_t &remote_id)
void return_handle(DDS::InstanceHandle_t handle)
DDS::DataReaderListener_var listener_
RcHandle< T > rchandle_from(T *pointer)
bool have_sample_states(DDS::SampleStateMask sample_states) const
sequence< InstanceHandle_t > InstanceHandleSeq
bool has_subscription_id_
bool verify_coherent_changes_completion(WriterInfo *writer)
virtual void state_updated_i(DDS::InstanceHandle_t handle)=0
unsigned int size() const
Amount of data actually stored.
virtual DDS::ReturnCode_t get_matched_publications(DDS::InstanceHandleSeq &publication_handles)
const DDS::StatusMask NO_STATUS_MASK
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(DDS::RequestedIncompatibleQosStatus &status)
TransportImpl_rch impl() const
static String kind_to_string(Kind value)
const StatusKind SAMPLE_LOST_STATUS
const ValueDispatcher * get_value_dispatcher() const
void accept_sample_processing(const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
const LogLevel::Value value
const StatusKind LIVELINESS_CHANGED_STATUS
DDS::ReturnCode_t set_enabled()
virtual void reset_latency_stats()
Clear any intermediate statistical values.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
void cancel_all_deadlines()
const InstanceHandle_t HANDLE_NIL
void remove_from_lookup_maps(DDS::InstanceHandle_t handle)
char message_id_
The enum MessageId.
virtual void install_type_support(TypeSupportImpl *)
virtual DDS::ReturnCode_t get_subscription_matched_status(DDS::SubscriptionMatchedStatus &status)
void check_liveliness_i(bool cancel, const MonotonicTimePoint &now)
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
Observer_rch get_observer(Observer::Event e)
BudgetExceededStatus budget_exceeded_status_
Base class to hold configuration settings for TransportImpls.
virtual DDS::DataReaderListener_ptr get_listener()
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
void coherent_changes_completed(DataReaderImpl *reader)
ReliabilityQosPolicy reliability
CORBA::Long total_samples() const
long max_samples_per_instance
virtual void data_received(const ReceivedDataSample &sample)
process a message that has been received - could be control or a data sample.
void notify_subscription_disconnected(const WriterIdSeq &pubids)
WeakRcHandle< DataReaderImpl > reader_
virtual void on_deleted(DDS::DataWriter_ptr)
DDS::QosPolicyId_t last_policy_id
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
DDS::TopicDescription_var topic_desc_
DDS::InstanceHandle_t handle() const
DataType minimum() const
Access the minimum value.
void state_updated(DDS::InstanceHandle_t handle)
::DDS::InstanceHandleSeq publication_handles
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
DurabilityQosPolicy durability
TransportLocator writerDiscInfo
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
virtual DDS::Subscriber_ptr get_subscriber()
TransportLocatorSeq remote_data_
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
DDS::SubscriptionMatchedStatus subscription_match_status_
virtual DDS::TopicDescription_ptr get_topicdescription()
void deadline_task(const MonotonicTimePoint &now)
virtual void get_latency_stats(LatencyStatisticsSeq &stats)
unsigned long n() const
Access the number of values accumulated.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
void reject_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
HistoryQosPolicyKind kind
bool is_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id)
virtual void signal_liveliness(const GUID_t &remote_participant)
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
#define ACE_WRITE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void schedule_timer(WriterInfo_rch &info)
TransportMessageBlockAllocator mb_alloc_
void notify_subscription_lost(const WriterIdSeq &pubids)
void set_group_info(const CoherentChangeControl &info)
void remove_all_associations()
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
std::ostream & raw_data(std::ostream &str) const
Dump any raw data.
void writer_became_alive(WriterInfo &info, const MonotonicTimePoint &when)
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
String to_dds_string(unsigned short to_convert)
virtual void on_disassociated(DDS::DataWriter_ptr, const GUID_t &)
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
bool time_based_filter_instance(const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
void unregister_reader(const char *type_name, DataReaderImpl *reader)
ACE_UINT32 source_timestamp_nanosec_
MonotonicTime_t participantDiscoveredAt
const octet ENTITYKIND_OPENDDS_NIL_WRITER
MonotonicTime_t participant_discovered_at_
MonotonicTimePoint deadline_
MonotonicTimePoint last_accepted_
bool have_view_states(DDS::ViewStateMask view_states) const
OwnershipQosPolicy ownership
unique_ptr< Monitor > monitor_
Monitor object for this entity.
const char * c_str() const
bool has_zero_copies() const
virtual char * get_type_name()
void notify_liveliness_change()
void get_writer_states(WriterStatePairVec &writer_states)
CommandPtr execute_or_enqueue(CommandPtr command)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
void schedule_deadline(SubscriptionInstance_rch instance, bool timer_called)
TransportLocator discovery_locator_
InstanceHandle_t last_instance_handle
ACE_CDR::ULong remote_transport_context_
LivelinessQosPolicy liveliness
const SampleStateMask ANY_SAMPLE_STATE
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
long absolute_generation_rank
Reverse_Lock_t reverse_sample_lock_
RcHandle< LivelinessTimer > liveliness_timer_
TimePoint_T< SystemClock > SystemTimePoint
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
const ACE_Time_Value & value() const
OwnershipQosPolicyKind kind
Cached_Allocator_With_Overflow< ReceivedDataElementMemoryBlock, ACE_Thread_Mutex > ReceivedDataAllocator
reference_wrapper< T > ref(T &r)
unsigned long InstanceStateMask
sequence< TransportLocator > TransportLocatorSeq
std::pair< GUID_t, WriterInfo::WriterState > WriterStatePair
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
void enable_filtering(ContentFilteredTopicImpl *cft)
bool topicIsBIT(const char *name, const char *type)
void set_sample_lost_status(const DDS::SampleLostStatus &status)
void writer_activity(const DataSampleHeader &header)
update liveliness info for this writer.
unique_ptr< ReceivedDataAllocator > rd_allocator_
void disassociate(const GUID_t &peerId)
virtual CORBA::Boolean statistics_enabled()
void initialize_lookup_maps()
Collection of latency statistics for a single association.
bool filter_sample(const DataSampleHeader &header)
MonotonicTimePoint cur_sample_tv_
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
ACE_Reactor_Timer_Interface * get_reactor()
bool waiting_for_end_historic_samples() const
Coherent_State coherent_change_received()
DDS::DomainId_t domain_id_
DDS::InstanceStateKind instance_state() const
void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
deliver samples that were held by check_historic()
void writer_became_dead(WriterInfo &info)
virtual void release_instance_i(DDS::InstanceHandle_t handle)=0
bool have_instance_states(DDS::InstanceStateMask instance_states) const
const DDS::StatusMask DEFAULT_STATUS_MASK
unsigned long transportContext
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind REQUESTED_INCOMPATIBLE_QOS_STATUS
::DDS::InstanceHandle_t last_instance_handle
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
void release_instance(DDS::InstanceHandle_t handle)
Release the instance with the handle.
size_t disposed_generation_count_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
long long opendds_reserved_publication_seq
static TimePoint_T< MonotonicClock > now()
bool check_historic(const ReceivedDataSample &sample)
const StatusKind DATA_ON_READERS_STATUS
friend class QueryConditionImpl
SequenceNumber sequence_
The data sample's sequence number.
const char *const BUILT_IN_PUBLICATION_TOPIC
Implements the DDS::Topic interface.
DDS::QosPolicyCountSeq policies
virtual void dispose_unregister(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance)
void instances_liveliness_update(const GUID_t &writer, DDS::InstanceHandle_t publication_handle)
void update_subscription_params(const DDS::StringSeq ¶ms) const
bool contains_sample(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
InstanceHandle_t last_publication_handle
DDS::ContentFilteredTopic_ptr get_cf_topic() const
static const TimePoint_T< MonotonicClock > zero_value
int handle_timeout(const ACE_Time_Value ¤t_time, const void *arg)
void reset_deadline_period(const TimeDuration &deadline_period)
virtual DDS::ReturnCode_t get_requested_deadline_missed_status(DDS::RequestedDeadlineMissedStatus &status)
void reset_ownership(DDS::InstanceHandle_t instance)
GUID_t publisher_id() const
void cancel_deadline(SubscriptionInstance_rch instance)
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
RcHandle< SubscriberImpl > get_subscriber_servant()
SampleRejectedStatusKind last_reason
InstanceHandle_t last_publication_handle
Duration_t lease_duration
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
DDS::DynamicType_var dynamic_type_
ACE_Thread_Mutex subscription_id_mutex_
virtual RcHandle< MessageHolder > dds_demarshal(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type, bool full_copy)=0
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch i, size_t index_in_instance)
bool group_coherent() const
ReliabilityQosPolicyKind kind
ACE_Thread_Mutex listener_mutex_
DurabilityQosPolicyKind kind
bool deadline_queue_enabled_
Class to serialize and deserialize data for DDS.
DurabilityQosPolicy durability
const char * get_state_str() const
DataReaderListener_ptr get_ext_listener()
sequence< LatencyStatistics > LatencyStatisticsSeq
virtual void add_association(const GUID_t &yourId, const WriterAssociation &writer, bool active)
void add_types(const XTypes::TypeLookupService_rch &tls) const
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
bool select_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id, const CORBA::Long &ownership_strength, InstanceState_rch instance_state)
Holds a data sample received by the transport.
virtual DDS::DynamicType_ptr get_type() const
DataRepresentationQosPolicy representation
long ParticipantCryptoHandle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
QosPolicyId_t last_policy_id
long current_count_change
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
long no_writers_generation_count
const StatusKind DATA_AVAILABLE_STATUS
DDS::ReturnCode_t setup_deserialization()
Setup deserialization options.
void reschedule_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now)
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
const InstanceStateMask ANY_INSTANCE_STATE
WeakRcHandle< SubscriberImpl > subscriber_servant_
~EndHistoricSamplesMissedSweeper()
virtual DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition)
Implements the DDS::DataReader interface.
long double mean() const
Calculate the average value.
LatencyStatistics get_stats() const
Extract the current latency statistics for this writer.
const unsigned long DURATION_ZERO_NSEC
bool ownership_filter_instance(const SubscriptionInstance_rch &instance, const GUID_t &pubid)
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
virtual OPENDDS_STRING transport_type() const =0
const ViewStateMask ANY_VIEW_STATE
void reset()
Reset statistics to nil.
virtual void lookup_instance(const ReceivedDataSample &sample, SubscriptionInstance_rch &instance)=0
void get_instance_handles(InstanceHandleVec &instance_handles)
bool is_exclusive() const
TimePoint_T< MonotonicClock > MonotonicTimePoint
static const TimePoint_T< MonotonicClock > max_value
int handle_timeout(const ACE_Time_Value ¤t_time, const void *arg)
RcHandle< DRISporadicTask > deadline_task_
sequence< GUID_t > WriterIdSeq
TimeDuration deadline_period_
DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t &key)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
void enable_multi_topic(MultiTopicImpl *mt)
ACE_UINT32 message_length_
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
long not_alive_count_change
void coherent_change_received(const GUID_t &publisher_id, Coherent_State &result)
End Coherent Change message.
virtual DDS::DomainId_t get_domain_id()
RepoIdToHandleMap publication_id_to_handle_map_
MonotonicTimePoint last_sample_tv_
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)
WriterStats(int amount=0, DataCollector< double >::OnFull type=DataCollector< double >::KeepOldest)
Default constructor.
void accept_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
long double var() const
Calculate the variance value.
virtual DDS::InstanceHandle_t get_instance_handle()
void get_ordered_data(GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool notify_all()
Unblock all of the threads waiting on this condition.
unsigned long SampleStateMask
DDS::SampleLostStatus sample_lost_status_
TimeDuration liveliness_lease_duration_
virtual DDS::ReturnCode_t get_qos(DDS::SubscriberQos &qos)
void process_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now, bool timer_called)
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
HANDLE_TYPE_NATIVE InstanceHandle_t
void add_stat(const TimeDuration &delay)
Add a datum to the latency statistics.
ACE_INT32 source_timestamp_sec_
long count_since_last_send
virtual DDS::ReadCondition_ptr create_readcondition(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
long disposed_generation_count
const unsigned long DURATION_INFINITE_NSEC
virtual void on_enabled(DDS::DataWriter_ptr)
void finished_delivering_historic()
void remove_writer(const GUID_t &pub_id)
TransportLocatorSeq writerTransInfo
bool associate(const AssociationData &peer, bool active)
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
WeakRcHandle< DomainParticipantImpl > participant_servant_
TransportPriorityQosPolicy transport_priority
const ReturnCode_t RETCODE_NOT_ENABLED
DDS::StatusMask listener_mask_
ACE_Recursive_Thread_Mutex statistics_lock_
void add_coherent_samples(const SequenceNumber &seq)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
suseconds_t usec(void) const
AtomicBool enabled_
The flag indicates the entity is enabled.
void process_latency(const ReceivedDataSample &sample)
virtual DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t &max_wait)
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
Priority publication_transport_priority_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
void reset_stats()
Reset the latency statistics for this writer.
ReliabilityQosPolicy reliability
virtual RcHandle< EntityImpl > parent() const
virtual bool check_transport_qos(const TransportInst &inst)
TypeSupportImpl * type_support_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
InstanceHandle_t last_instance_handle
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
virtual DDS::ReturnCode_t get_liveliness_changed_status(DDS::LivelinessChangedStatus &status)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Implements the DDS::TopicDescription interface.
void set_subscriber_qos(const DDS::SubscriberQos &qos)
const long DURATION_ZERO_SEC
String str(unsigned decimal_places=3, bool just_sec=false) const
EndHistoricSamplesMissedSweeper(ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *reader)
Sequence number abstraction. Only allows positive 64 bit values.
void to_type_info(XTypes::TypeInformation &type_info) const
virtual DDS::QueryCondition_ptr create_querycondition(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
virtual void remove_associations(const WriterIdSeq &writers, bool callback)
virtual DDS::ReturnCode_t get_sample_rejected_status(DDS::SampleRejectedStatus &status)
ACE_Reactor_Timer_Interface * reactor_
DDS::SampleRejectedStatus sample_rejected_status_
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
void clear_owner_evaluated()
const ReturnCode_t RETCODE_ERROR
void writer_removed(WriterInfo &info)
void notify_latency(GUID_t writer)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_INT32 lifespan_duration_sec_
void add_reader(DataReaderImpl &reader)
virtual DDS::ReturnCode_t enable()
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
virtual DDS::ReturnCode_t get_matched_publication_data(DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
const ReturnCode_t RETCODE_OK
virtual DDS::ReturnCode_t enable_specific()=0
void reset_coherent_info()
void notify_status_condition()
Stats< double > stats_
Latency statistics for the DataWriter to this DataReader.
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_associated(DDS::DataWriter_ptr, const GUID_t &)
virtual void on_qos_changed(DDS::DataWriter_ptr)
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const char * to_string(MessageId value)
void received_activity(const MonotonicTimePoint &when)
called when a sample or other activity is received from this writer.
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_UINT32 lifespan_duration_nanosec_
DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)
bool has_readcondition(DDS::ReadCondition_ptr a_condition)
ACE_Recursive_Thread_Mutex publication_handle_lock_
virtual DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus &status)
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
QosPolicyCountSeq policies
bool check_end_historic_samples(EndHistoricSamplesMissedSweeper *sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&to_deliver)
const long LENGTH_UNLIMITED
#define ACE_ERROR_RETURN(X, Y)
Security::SecurityConfig_rch security_config_
RcHandle< T > lock() const
ACE_HANDLE dup(ACE_HANDLE handle)
virtual DDS::ReturnCode_t get_qos(DDS::DataReaderQos &qos)
const StatusKind SUBSCRIPTION_MATCHED_STATUS
const character_type * in(void) const
DataRepresentationIdSeq value
const StatusKind REQUESTED_DEADLINE_MISSED_STATUS
OwnershipManagerPtr ownership_manager()
DDS::DataReaderQos passed_qos_
void remove_writers(const DDS::InstanceHandle_t &instance_handle)
DeadlineQosPolicy deadline
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export GUID_t bit_key_to_guid(const DDS::BuiltinTopicKey_t &key)
virtual void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
DDS::LivelinessChangedStatus liveliness_changed_status_
static bool valid(const DDS::UserDataQosPolicy &qos)
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex instances_lock_
Keeps track of a DataWriter's liveliness for a DataReader.
Elements stored for managing statistical data.
TopicDescriptionPtr< MultiTopicImpl > multi_topic_
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
const InstanceState_rch instance_state_
Instance state for this instance.
::DDS::DataWriterQos writerQos
The Internal API and Implementation of OpenDDS.
void cancel_timer(WriterInfo_rch &info)
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void purge_data(SubscriptionInstance_rch instance)=0
unsigned long ViewStateMask
void set_sample_rejected_status(const DDS::SampleRejectedStatus &status)
ACE_Thread_Mutex content_filtered_topic_mutex_
virtual char * get_name()
ReadConditionSet read_conditions_
void notify_subscription_reconnected(const WriterIdSeq &pubids)
void reset_coherent_info(const GUID_t &writer_id, const GUID_t &publisher_id)
TopicDescriptionPtr< TopicImpl > topic_servant_
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
size_t no_writers_generation_count_
virtual DDS::ReturnCode_t set_qos(const DDS::DataReaderQos &qos)
virtual bool is_reliable() const =0
Does the transport as configured support RELIABLE_RELIABILITY_QOS?
virtual DDS::ReturnCode_t delete_contained_entities()
ResourceLimitsQosPolicy resource_limits
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
CORBA::Long last_deadline_missed_total_count_
virtual ~DataReaderImpl()
DataType maximum() const
Access the maximum value.
const StatusKind SAMPLE_REJECTED_STATUS
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
WriterState state() const
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
virtual void qos_change(const DDS::DataReaderQos &qos)
sequence< string > StringSeq
bool is_exclusive_ownership_
StatsMapType statistics_
Statistics for this reader, collected for each writer.
void transport_discovery_change()
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)