OpenDDS  Snapshot(2023/04/28-20:55)
DataReaderImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
8 #include "DataReaderImpl.h"
9 
10 #include "SubscriptionInstance.h"
12 #include "DomainParticipantImpl.h"
13 #include "Service_Participant.h"
14 #include "Qos_Helper.h"
16 #include "GuidConverter.h"
17 #include "TopicImpl.h"
18 #include "Serializer.h"
19 #include "SubscriberImpl.h"
20 #include "Transient_Kludge.h"
21 #include "Util.h"
22 #include "DCPS_Utils.h"
23 #include "QueryConditionImpl.h"
24 #include "ReadConditionImpl.h"
25 #include "MonitorFactory.h"
28 #include "SafetyProfileStreams.h"
29 #include "TypeSupportImpl.h"
30 #include "XTypes/TypeObject.h"
31 #ifndef DDS_HAS_MINIMUM_BIT
32 # include "BuiltInTopicUtils.h"
33 #endif
34 
35 #ifndef DDS_HAS_MINIMUM_BIT
36 # include <dds/DdsDcpsCoreTypeSupportC.h>
37 #endif
38 #include <dds/DdsDcpsCoreC.h>
39 #include <dds/DdsDcpsGuidTypeSupportImpl.h>
40 
41 #include <ace/Reactor.h>
42 #include <ace/Auto_Ptr.h>
43 #include <ace/OS_NS_sys_time.h>
44 
45 #include <cstdio>
46 #include <stdexcept>
47 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
48 # include <sstream>
49 #endif
50 
51 #ifndef __ACE_INLINE__
52 # include "DataReaderImpl.inl"
53 #endif
54 
56 
57 namespace OpenDDS {
58 namespace DCPS {
59 
61  : has_subscription_id_(false)
62  , subscription_id_mutex_()
63  , subscription_id_condition_(subscription_id_mutex_)
64  , qos_(TheServiceParticipant->initial_DataReaderQos())
65  , reverse_sample_lock_(sample_lock_)
66  , topic_servant_(0)
67  , type_support_(0)
68  , topic_id_(GUID_UNKNOWN)
69 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
70  , is_exclusive_ownership_(false)
71 #endif
72  , coherent_(false)
73  , subqos_(TheServiceParticipant->initial_SubscriberQos())
74  , topic_desc_(0)
75  , listener_mask_(DEFAULT_STATUS_MASK)
76  , domain_id_(0)
77  , end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
78  , n_chunks_(TheServiceParticipant->n_chunks())
79  , reactor_(0)
80  , liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
81  , last_deadline_missed_total_count_(0)
82  , deadline_queue_enabled_(false)
83  , deadline_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl::deadline_task))
84  , is_bit_(false)
85  , always_get_history_(false)
86  , statistics_enabled_(false)
87  , raw_latency_buffer_size_(0)
88  , raw_latency_buffer_type_(DataCollector<double>::KeepOldest)
89  , transport_disabled_(false)
91 {
93 
100 
105 
110 
117 
120 
125 
129 
130  monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this));
131  periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this));
132 }
133 
134 // This method is called when there are no longer any reference to the
135 // the servant.
137 {
138  DBG_ENTRY_LVL("DataReaderImpl", "~DataReaderImpl", 6);
139 
140  deadline_task_->cancel();
141 
142 #ifndef OPENDDS_SAFETY_PROFILE
144  if (participant) {
145  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
146  if (type_lookup_service) {
147  type_lookup_service->remove_guid_from_dynamic_map(subscription_id_);
148  }
149  }
150 #endif
151 }
152 
153 // this method is called when delete_datareader is called.
154 void
156 {
157  // As first step set our listener to nill which will prevent us from calling
158  // back onto the listener at the moment the related DDS entity has been
159  // deleted
161 
162 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
163  OwnershipManagerPtr owner_manager = this->ownership_manager();
164  if (owner_manager) {
165  owner_manager->unregister_reader(topic_servant_->type_name(), this);
166  }
167 #endif
168 
169  topic_servant_ = 0;
170 
171 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
172  {
175  }
176 #endif
177 
178 #ifndef OPENDDS_NO_MULTI_TOPIC
179  multi_topic_ = 0;
180 #endif
181 
182 }
183 
185  TopicDescriptionImpl* topic_desc,
186  const DDS::DataReaderQos &qos,
187  DDS::DataReaderListener_ptr listener,
188  const DDS::StatusMask & mask,
189  DomainParticipantImpl* participant,
190  SubscriberImpl* subscriber)
191 {
192  topic_desc_ = DDS::TopicDescription::_duplicate(topic_desc);
193  if (TopicImpl* topic = dynamic_cast<TopicImpl*>(topic_desc)) {
194  topic_servant_ = topic;
195  type_support_ = dynamic_cast<TypeSupportImpl*>(topic->get_type_support());
196  topic_id_ = topic->get_id();
197  }
198 
199 #ifndef DDS_HAS_MINIMUM_BIT
200  CORBA::String_var topic_name = topic_desc->get_name();
201  CORBA::String_var topic_type_name = topic_desc->get_type_name();
202  is_bit_ = topicIsBIT(topic_name, topic_type_name);
203 #endif // !defined (DDS_HAS_MINIMUM_BIT)
204 
205  qos_ = qos;
206  passed_qos_ = qos;
207 
208 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
210 #endif
211 
212  set_listener(listener, mask);
213 
214  // Only store the participant pointer, since it is our "grand"
215  // parent, we will exist as long as it does
216  participant_servant_ = *participant;
217 
218  domain_id_ = participant->get_domain_id();
219 
220  subscriber_servant_ = rchandle_from(subscriber);
221 
222  if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
224  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
225  ACE_TEXT("failed to get SubscriberQos\n")));
226  }
227 }
228 
231 {
232  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
233  return get_entity_instance_handle(subscription_id_, participant);
234 }
235 
236 void
238  const WriterAssociation& writer,
239  bool active)
240 {
241  if (DCPS_debug_level) {
242  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
243  ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
244  LogGuid(yourId).c_str(),
245  LogGuid(writer.writerId).c_str()));
246  }
247 
248  if (get_deleted()) {
249  if (DCPS_debug_level) {
250  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
251  ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
252  }
253  return;
254  }
255 
256  // We are being called back from the repository before we are done
257  // processing after our call to the repository that caused this call
258  // (from the repository) to be made.
259  {
262  subscription_id_ = yourId;
263  has_subscription_id_ = true;
265  }
266  }
267 
268  // For each writer in the list of writers to associate with, we
269  // create a WriterInfo and a WriterStats object and store them in
270  // our internal maps.
271  //
272  {
273 
275 
276  const GUID_t& writer_id = writer.writerId;
277  WriterInfo_rch info = make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos);
278  std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
279  // This insertion is idempotent.
280  WriterMapType::value_type(
281  writer_id,
282  info));
283 
284  // Schedule timer if necessary
285  // - only need to check reader qos - we know the writer must be >= reader
288  }
289 
290  {
292  statistics_.insert(
293  StatsMapType::value_type(
294  writer_id,
296  }
297 
298  // If this is a durable reader
300  // TODO schedule timer for removing flag from writers
301  }
302 
303  if (DCPS_debug_level > 4) {
305  "(%P|%t) DataReaderImpl::add_association: "
306  "inserted writer %C.return %d\n",
307  LogGuid(writer_id).c_str(), bpair.second));
308 
309  WriterMapType::iterator iter = writers_.find(writer_id);
310  if (iter != writers_.end()) {
311  // This may not be an error since it could happen that the sample
312  // is delivered to the datareader after the write is dis-associated
313  // with this datareader.
315  ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
316  ACE_TEXT("reader %C is associated with writer %C.\n"),
317  LogGuid(get_guid()).c_str(),
318  LogGuid(writer_id).c_str()));
319  }
320  }
321  }
322 
323  // Propagate the add_associations processing down into the Transport
324  // layer here. This will establish the transport support and reserve
325  // usage of an existing connection or initiate creation of a new
326  // connection if no suitable connection is available.
327  AssociationData data;
328  data.remote_id_ = writer.writerId;
329  data.remote_data_ = writer.writerTransInfo;
330  data.discovery_locator_ = writer.writerDiscInfo;
335  data.remote_reliable_ =
337  data.remote_durable_ =
339 
340  if (associate(data, active)) {
342  if (observer) {
343  observer->on_associated(this, data.remote_id_);
344  }
345  } else {
346  if (DCPS_debug_level) {
348  ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
349  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
350  }
351  }
352 }
353 
354 void
355 DataReaderImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
356 {
357  if (!(flags & ASSOC_OK)) {
358  if (DCPS_debug_level) {
360  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
361  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
362  LogGuid(remote_id).c_str()));
363  }
364  return;
365  }
366 
367  // LIVELINESS policy timers are managed here.
369  if (DCPS_debug_level >= 5) {
371  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
372  ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
373  LogGuid(get_guid()).c_str()));
374  }
375  // this call will start the timer if it is not already set
376  liveliness_timer_->check_liveliness();
377  }
378 
379  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
380 
381  if (!participant)
382  return;
383 
384  const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
385 
386  if (!is_bit_) {
387  // We acquire the publication_handle_lock_ for the remainder of our
388  // processing.
389  {
391 
392  // This insertion is idempotent.
393  publication_id_to_handle_map_.insert(RepoIdToHandleMap::value_type(remote_id, handle));
394 
395  if (DCPS_debug_level > 4) {
397  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
398  ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
399  LogGuid(remote_id).c_str(),
400  handle));
401  }
402 
403  // We need to adjust these after the insertions have all completed
404  // since insertions are not guaranteed to increase the number of
405  // currently matched publications.
406  const int matchedPublications = static_cast<int>(publication_id_to_handle_map_.size());
408  matchedPublications - subscription_match_status_.current_count;
409  subscription_match_status_.current_count = matchedPublications;
410 
413 
415 
417 
418  DDS::DataReaderListener_var listener =
420 
421  if (!CORBA::is_nil(listener)) {
422  listener->on_subscription_matched(this, subscription_match_status_);
423 
424  // TBD - why does the spec say to change this but not change
425  // the ChangeFlagStatus after a listener call?
426 
427  // Client will look at it so next time it looks the change should be 0
430  }
431 
433  }
434 
435  {
438 
439  if (!writers_.count(remote_id)) {
440  return;
441  }
442  writers_[remote_id]->handle(handle);
443  }
444  }
445 
446  if (monitor_) {
447  monitor_->report();
448  }
449 }
450 
451 void
453  bool notify_lost)
454 {
455  DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
456 
457  if (writers.length() == 0) {
458  return;
459  }
460 
462  if (observer) {
463  for (CORBA::ULong i = 0; i < writers.length(); ++i) {
464  observer->on_disassociated(this, writers[i]);
465  }
466  }
467 
468  if (DCPS_debug_level >= 1) {
470  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
471  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
472  is_bit_,
473  LogGuid(get_guid()).c_str(),
474  LogGuid(writers[0]).c_str(),
475  writers.length()));
476  }
477  if (!get_deleted()) {
478  // stop pending associations for these writer ids
479  this->stop_associating(writers.get_buffer(), writers.length());
480 
481  {
482  CORBA::ULong wr_len = writers.length();
484 
485  for (CORBA::ULong i = 0; i < wr_len; i++) {
486  const GUID_t writer_id = writers[i];
487  {
489  statistics_.erase(writer_id);
490  }
491  }
492  }
493  }
494 
495  remove_associations_i(writers, notify_lost);
496 }
497 
498 void
500  bool notify_lost)
501 {
502  DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
503 
504  if (writers.length() == 0) {
505  return;
506  }
507 
508  if (DCPS_debug_level >= 1) {
510  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
511  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
512  is_bit_,
513  LogGuid(get_guid()).c_str(),
514  LogGuid(writers[0]).c_str(),
515  writers.length()));
516  }
517  DDS::InstanceHandleSeq handles;
518 
519  CORBA::ULong wr_len = writers.length();
520 
521  // Flush historic samples and/or allow in-progress delivery of historic samples to complete
522  for (CORBA::ULong i = 0; i < wr_len; i++) {
523  resume_sample_processing(writers[i]);
524  }
525 
526  // This is used to hold the list of writers which were actually
527  // removed, which is a proper subset of the writers which were
528  // requested to be removed.
529  WriterIdSeq updated_writers;
530  WriterMapType removed_writers;
531 
532  //Remove the writers from writer list. If the supplied writer
533  //is not in the cached writers list then it is already removed.
534  //We just need remove the writers in the list that have not been
535  //removed.
536  {
538 
539  for (CORBA::ULong i = 0; i < wr_len; i++) {
540  const GUID_t writer_id = writers[i];
541 
542  WriterMapType::iterator it = this->writers_.find(writer_id);
543 
544  if (it != this->writers_.end()) {
545  removed_writers.insert(*it);
546  end_historic_sweeper_->cancel_timer(it->second);
547  }
548 
549  if (this->writers_.erase(writer_id) == 0) {
550  if (DCPS_debug_level >= 1) {
552  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
553  ACE_TEXT("the writer local %C was already removed.\n"),
554  LogGuid(writer_id).c_str()));
555  }
556 
557  } else {
558  push_back(updated_writers, writer_id);
559  }
560  }
561  }
562 
563  for (WriterMapType::iterator it = removed_writers.begin(); it != removed_writers.end(); ++it) {
564  it->second->removed();
565  }
566  removed_writers.clear();
567 
568  wr_len = updated_writers.length();
569 
570  // Return now if the supplied writers have been removed already.
571  if (wr_len == 0) {
572  return;
573  }
574 
575  if (!is_bit_) {
576  // The writer should be in the id_to_handle map at this time.
577  this->lookup_instance_handles(updated_writers, handles);
578 
580 
581  for (CORBA::ULong i = 0; i < wr_len; ++i) {
582  publication_id_to_handle_map_.erase(updated_writers[i]);
583  }
584  }
585 
586  for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
587  {
588  this->disassociate(updated_writers[i]);
589  }
590  }
591 
592  // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
593  if (!this->is_bit_) {
595 
596  // Derive the change in the number of publications writing to this reader.
597  int matchedPublications = static_cast<int>(this->publication_id_to_handle_map_.size());
599  = matchedPublications - this->subscription_match_status_.current_count;
600 
601  // Only process status if the number of publications has changed.
603  this->subscription_match_status_.current_count = matchedPublications;
604 
605  /// Section 7.1.4.1: total_count will not decrement.
606 
607  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
609  = handles[ wr_len - 1];
610 
612 
613  DDS::DataReaderListener_var listener
615 
616  if (!CORBA::is_nil(listener.in())) {
617  listener->on_subscription_matched(this, this->subscription_match_status_);
618 
619  // Client will look at it so next time it looks the change should be 0
622  }
624  }
625  }
626 
627  // If this remove_association is invoked when the InfoRepo
628  // detects a lost writer then make a callback to notify
629  // subscription lost.
630  if (notify_lost) {
631  this->notify_subscription_lost(handles);
632  }
633 
634  if (this->monitor_) {
635  this->monitor_->report();
636  }
637 }
638 
639 void
641 {
642  DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
644 
646  int size;
647 
648  {
650 
651  size = static_cast<int>(writers_.size());
652  writers.length(size);
653 
654  WriterMapType::iterator curr_writer = writers_.begin();
655  WriterMapType::iterator end_writer = writers_.end();
656 
657  int i = 0;
658 
659  while (curr_writer != end_writer) {
660  writers[i++] = curr_writer->first;
661  ++curr_writer;
662  }
663  }
664 
665  try {
666  if (0 < size) {
667  remove_associations(writers, false);
668  }
669  } catch (const CORBA::Exception&) {
671  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::remove_all_associations() - ")
672  ACE_TEXT("caught exception from remove_associations.\n")));
673  }
674 
675  transport_stop();
676 }
677 
678 void
680 {
681  DDS::DataReaderListener_var listener =
683 
685  // This test should make the method idempotent.
686  return;
687  }
688 
690  true);
691 
692  // copy status and increment change
695  status.count_since_last_send;
697  status.last_policy_id;
699 
700  if (!CORBA::is_nil(listener.in())) {
701  listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
702 
703  // TBD - why does the spec say to change total_count_change but not
704  // change the ChangeFlagStatus after a listener call?
705 
706  // client just looked at it so next time it looks the
707  // change should be 0
709  }
710 
712 }
713 
714 void
715 DataReaderImpl::signal_liveliness(const GUID_t& remote_participant)
716 {
717  GUID_t prefix = remote_participant;
718  prefix.entityId = EntityId_t();
719 
721 
722  typedef std::pair<GUID_t, WriterInfo_rch> RepoWriterPair;
723  typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
724  WriterSet writers;
725 
726  {
728  for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
729  limit = writers_.end();
730  pos != limit && equal_guid_prefixes(pos->first, prefix);
731  ++pos) {
732  writers.push_back(std::make_pair(pos->first, pos->second));
733  }
734  }
735 
737  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
738  pos != limit;
739  ++pos) {
740  pos->second->received_activity(when);
741  }
742 
743  if (!writers.empty()) {
744  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
745  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
746  pos != limit;
747  ++pos) {
748  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
749  iter != instances_.end();
750  ++iter) {
751  SubscriptionInstance_rch ptr = iter->second;
752  ptr->instance_state_->lively(pos->first);
753  }
754  }
755  }
756 }
757 
762 {
764  DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
765  view_states, instance_states);
766  read_conditions_.insert(rc);
767  return rc._retn();
768 }
769 
770 #ifndef OPENDDS_NO_QUERY_CONDITION
775  const char* query_expression,
776  const DDS::StringSeq& query_parameters)
777 {
779  try {
780  DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
781  view_states, instance_states, query_expression);
782  if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
783  return 0;
784  }
785  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
786  read_conditions_.insert(rc);
787  return qc._retn();
788  } catch (const std::exception& e) {
789  if (DCPS_debug_level) {
790  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
791  ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
792  e.what()));
793  }
794  }
795  return 0;
796 }
797 #endif
798 
799 bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
800 {
801  //sample lock already held
802  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
803  return read_conditions_.find(rc) != read_conditions_.end();
804 }
805 
807  DDS::ReadCondition_ptr a_condition)
808 {
811  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
812  return read_conditions_.erase(rc)
814 }
815 
817 {
820  read_conditions_.clear();
821  return DDS::RETCODE_OK;
822 }
823 
825 {
829 
830  DDS::DataReaderQos new_qos = qos;
832  if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
833 
834  if (qos_ == new_qos)
835  return DDS::RETCODE_OK;
836 
837  if (enabled_) {
838  if (!Qos_Helper::changeable(qos_, new_qos)) {
840 
841  } else {
842  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
843  DDS::SubscriberQos subscriberQos;
844 
846  bool status = false;
847  if (subscriber) {
848  subscriber->get_qos(subscriberQos);
849  status =
850  disco->update_subscription_qos(
851  domain_id_,
852  dp_id_,
854  new_qos,
855  subscriberQos);
856  }
857  if (!status) {
859  ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
860  ACE_TEXT("qos not updated.\n")),
862  }
863  }
864  }
865 
866  qos_change(new_qos);
867  qos_ = new_qos;
868  passed_qos_ = qos;
869 
871  if (observer) {
872  observer->on_qos_changed(this);
873  }
874 
875  return DDS::RETCODE_OK;
876 
877  } else {
879  }
880 }
881 
883 {
884  // Reset the deadline timer if the period has changed.
885  if (qos_.deadline.period.sec != qos.deadline.period.sec ||
891  } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
894  deadline_queue_enabled_ = false;
895  } else {
897  }
898  }
899 }
900 
903  DDS::DataReaderQos & qos)
904 {
905  qos = passed_qos_;
906  return DDS::RETCODE_OK;
907 }
908 
910  DDS::DataReaderListener_ptr a_listener,
911  DDS::StatusMask mask)
912 {
914  listener_mask_ = mask;
915  //note: OK to duplicate a nil object ref
916  listener_ = DDS::DataReaderListener::_duplicate(a_listener);
917  return DDS::RETCODE_OK;
918 }
919 
920 DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
921 {
923  return DDS::DataReaderListener::_duplicate(listener_.in());
924 }
925 
926 DataReaderListener_ptr DataReaderImpl::get_ext_listener()
927 {
929  return DataReaderListener::_narrow(listener_.in());
930 }
931 
932 DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
933 {
934 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
935  {
938  return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
939  }
940  }
941 #endif
942  return DDS::TopicDescription::_duplicate(topic_desc_.in());
943 }
944 
945 DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
946 {
947  return get_subscriber_servant()._retn();
948 }
949 
952  DDS::SampleRejectedStatus & status)
953 {
955 
957  status = sample_rejected_status_;
959  return DDS::RETCODE_OK;
960 }
961 
965 {
967 
969  false);
971 
974 
975  return DDS::RETCODE_OK;
976 }
977 
981 {
983 
985  false);
986 
990 
991  // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
992  // is updated by the RequestedDeadlineWatchdog.
993 
994  // Update for next status check.
997 
999 
1000  return DDS::RETCODE_OK;
1001 }
1002 
1006 {
1008 
1012 
1013  return DDS::RETCODE_OK;
1014 }
1015 
1019 {
1021 
1023  status = subscription_match_status_;
1026 
1027  return DDS::RETCODE_OK;
1028 }
1029 
1032  DDS::SampleLostStatus & status)
1033 {
1035 
1037  status = sample_lost_status_;
1039  return DDS::RETCODE_OK;
1040 }
1041 
1044  const DDS::Duration_t & /* max_wait */)
1045 {
1046  // Add your implementation here
1047  return DDS::RETCODE_OK;
1048 }
1049 
1052  DDS::InstanceHandleSeq & publication_handles)
1053 {
1054  if (!enabled_) {
1056  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
1057  ACE_TEXT(" Entity is not enabled.\n")),
1059  }
1060 
1062  guard,
1065 
1066  // Copy out the handles for the current set of publications.
1067  int index = 0;
1068  publication_handles.length(static_cast<CORBA::ULong>(this->publication_id_to_handle_map_.size()));
1069 
1070  for (RepoIdToHandleMap::iterator
1071  current = this->publication_id_to_handle_map_.begin();
1072  current != this->publication_id_to_handle_map_.end();
1073  ++current, ++index) {
1074  publication_handles[index] = current->second;
1075  }
1076 
1077  return DDS::RETCODE_OK;
1078 }
1079 
1080 #if !defined (DDS_HAS_MINIMUM_BIT)
1083  DDS::PublicationBuiltinTopicData & publication_data,
1084  DDS::InstanceHandle_t publication_handle)
1085 {
1086  if (!enabled_) {
1088  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
1089  ACE_TEXT("get_matched_publication_data: ")
1090  ACE_TEXT("Entity is not enabled.\n")),
1092  }
1093 
1094  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1095 
1096  if (!participant)
1097  return DDS::RETCODE_ERROR;
1098 
1099  DDS::PublicationBuiltinTopicDataSeq data;
1100  const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
1101  participant.in(),
1103  publication_handle,
1104  data);
1105 
1106  if (ret == DDS::RETCODE_OK) {
1107  publication_data = data[0];
1108  }
1109 
1110  return ret;
1111 }
1112 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1113 
1116 {
1117  // According to spec:
1118  // - Calling enable on an already enabled Entity has no effect and returns OK.
1119  // - Calling enable on an Entity whose factory is not enabled will fail
1120  // and return PRECONDITION_NOT_MET.
1121 
1122  if (this->is_enabled()) {
1123  return DDS::RETCODE_OK;
1124  }
1125 
1127  if (!subscriber) {
1128  return DDS::RETCODE_ERROR;
1129  }
1130 
1131  if (!subscriber->is_enabled()) {
1133  }
1134 
1135  if (topic_servant_ && !topic_servant_->is_enabled()) {
1137  }
1138 
1140  if (participant) {
1141  dp_id_ = participant->get_id();
1142  }
1143 
1144  if (topic_servant_) {
1146  if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
1147  return DDS::RETCODE_ERROR;
1148  }
1149  }
1150 
1152  // The spec says qos_.history.depth is "has no effect"
1153  // when history.kind = KEEP_ALL so use max_samples_per_instance
1155 
1156  } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
1158  }
1159 
1160  if (depth_ == DDS::LENGTH_UNLIMITED) {
1161  // DDS::LENGTH_UNLIMITED is negative so make it a positive
1162  // value that is, for all intents and purposes, unlimited
1163  // and we can use it for comparisons.
1164  // WARNING: The client risks running out of memory in this case.
1166  }
1167 
1170  }
1171 
1172  //else using value from Service_Participant
1173 
1174  // enable the type specific part of this DataReader
1175  this->enable_specific();
1176 
1177  //Note: the QoS used to set n_chunks_ is Changeable=No so
1178  // it is OK that we cannot change the size of our allocators.
1180 
1181  if (DCPS_debug_level >= 2)
1182  ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
1183  " Cached_Allocator_With_Overflow %x with %d chunks\n",
1184  rd_allocator_.get(), n_chunks_));
1185 
1191  }
1192 
1193  // Setup the requested deadline watchdog if the configured deadline
1194  // period is not the default (infinite).
1195  DDS::Duration_t const deadline_period = this->qos_.deadline.period;
1196 
1198  && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
1199  || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
1201  deadline_queue_enabled_ = true;
1202  }
1203 
1204  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1205  disco->pre_reader(this);
1206 
1207  this->set_enabled();
1208 
1210  try {
1212  this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
1213  } catch (const Transport::Exception&) {
1215  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
1216  ACE_TEXT("Transport Exception.\n")));
1217  return DDS::RETCODE_ERROR;
1218  }
1219 
1220  const DDS::ReturnCode_t setup_deserialization_result = setup_deserialization();
1221  if (setup_deserialization_result != DDS::RETCODE_OK) {
1222  return setup_deserialization_result;
1223  }
1224 
1225  const TransportLocatorSeq& trans_conf_info = connection_info();
1226 
1227  CORBA::String_var filterClassName = "";
1228  CORBA::String_var filterExpression = "";
1229  DDS::StringSeq exprParams;
1230 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1231  {
1234  filterClassName = content_filtered_topic_->get_filter_class_name();
1235  filterExpression = content_filtered_topic_->get_filter_expression();
1236  content_filtered_topic_->get_expression_parameters(exprParams);
1237  }
1238  }
1239 #endif
1240 
1241  DDS::SubscriberQos sub_qos;
1242  subscriber->get_qos(sub_qos);
1243 
1244  TypeSupportImpl* const typesupport =
1245  dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
1246  if (!typesupport) {
1247  return DDS::RETCODE_ERROR;
1248  }
1249 
1250  XTypes::TypeInformation type_info;
1251  typesupport->to_type_info(type_info);
1252 
1253  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1254  typesupport->add_types(type_lookup_service);
1255 
1256  install_type_support(typesupport);
1257 
1258  const GUID_t subscription_id =
1259  disco->add_subscription(domain_id_,
1260  dp_id_,
1261  topic_servant_->get_id(),
1262  rchandle_from(this),
1263  qos_,
1264  trans_conf_info,
1265  sub_qos,
1266  filterClassName,
1267  filterExpression,
1268  exprParams,
1269  type_info);
1270 
1271 #if defined(OPENDDS_SECURITY)
1272  {
1274  security_config_ = participant->get_security_config();
1275  dynamic_type_ = typesupport->get_type();
1276  }
1277 #endif
1278 
1279  {
1281  subscription_id_ = subscription_id;
1282  has_subscription_id_ = true;
1284  }
1285 
1286  if (subscription_id == GUID_UNKNOWN) {
1287  if (DCPS_debug_level >= 1) {
1288  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataReaderImpl::enable: "
1289  "add_subscription failed\n"));
1290  }
1291  return DDS::RETCODE_ERROR;
1292  }
1293 
1294  if (DCPS_debug_level >= 2) {
1295  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::enable: "
1296  "got GUID %C, subscribed to topic name \"%C\" type \"%C\"\n",
1297  LogGuid(get_guid()).c_str(),
1298  topic_servant_->topic_name(), topic_servant_->type_name()));
1299  }
1300  }
1301 
1302  DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
1303  if (topic_servant_) {
1304  const CORBA::String_var name = topic_servant_->get_name();
1305  return_value = subscriber->reader_enabled(name.in(), this);
1306 
1307  if (this->monitor_) {
1308  this->monitor_->report();
1309  }
1310  }
1311 
1312  if (return_value == DDS::RETCODE_OK) {
1313  const Observer_rch observer = get_observer(Observer::e_ENABLED);
1314  if (observer) {
1315  observer->on_enabled(this);
1316  }
1317  }
1318 
1319  return return_value;
1320 }
1321 
1322 void
1324 {
1325  // caller should have the sample_lock_ !!!
1326 
1327  WriterInfo_rch writer;
1328 
1329  // The received_activity() has to be called outside the writers_lock_
1330  // because it probably acquire writers_lock_ read lock recursively
1331  // (in handle_timeout). This could cause deadlock when there are writers
1332  // waiting.
1333  {
1334  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1335  WriterMapType::iterator iter = writers_.find(header.publication_id_);
1336 
1337  if (iter != writers_.end()) {
1338  writer = iter->second;
1339 
1340  } else if (DCPS_debug_level > 4) {
1341  // This may not be an error since it could happen that the sample
1342  // is delivered to the datareader after the write is dis-associated
1343  // with this datareader.
1345  ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
1346  ACE_TEXT("reader %C is not associated with writer %C.\n"),
1347  LogGuid(get_guid()).c_str(),
1348  LogGuid(header.publication_id_).c_str()));
1349  }
1350  }
1351 
1352  if (!writer.is_nil()) {
1354 
1355  if ((header.message_id_ == SAMPLE_DATA) ||
1356  (header.message_id_ == INSTANCE_REGISTRATION) ||
1357  (header.message_id_ == UNREGISTER_INSTANCE) ||
1358  (header.message_id_ == DISPOSE_INSTANCE) ||
1360 
1361 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1362  if (header.coherent_change_) {
1363  writer->add_coherent_samples(header.sequence_);
1364  }
1365 #endif
1366  }
1367  }
1368 }
1369 
1370 void
1372 {
1373  DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
1374 
1375  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
1376  {
1378  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(sample.header_.publication_id_);
1379  if (pos != publication_id_to_handle_map_.end()) {
1380  publication_handle = pos->second;
1381  }
1382  }
1383 
1384  // ensure some other thread is not changing the sample container
1385  // or statuses related to samples.
1387 
1388  if (get_deleted()) return;
1389 
1390  if (DCPS_debug_level > 9) {
1392  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1393  ACE_TEXT("%C received sample: %C.\n"),
1394  LogGuid(get_guid()).c_str(),
1395  to_string(sample.header_).c_str()));
1396  }
1397 
1398  const ValueDispatcher* vd = get_value_dispatcher();
1400 
1401  RcHandle<MessageHolder> real_data;
1402  SubscriptionInstance_rch instance;
1403  switch (sample.header_.message_id_) {
1404  case SAMPLE_DATA:
1405  case INSTANCE_REGISTRATION: {
1406  if (!check_historic(sample)) break;
1407 
1408  DataSampleHeader const & header = sample.header_;
1409 
1410  this->writer_activity(header);
1411 
1412  // Verify data has not exceeded its lifespan.
1413  if (this->filter_sample(header)) break;
1414 
1415  // This adds the reader to the set/list of readers with data.
1417  if (subscriber) {
1418  subscriber->data_received(this);
1419  }
1420 
1421  // Only gather statistics about real samples, not registration data, etc.
1422  if (header.message_id_ == SAMPLE_DATA) {
1423  this->process_latency(sample);
1424  }
1425 
1426  // This also adds to the sample container and makes any callbacks
1427  // and condition modifications.
1428 
1429  bool is_new_instance = false;
1430  bool filtered = false;
1431  if (sample.header_.key_fields_only_) {
1432  dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING, false);
1433  } else {
1434  real_data = dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, FULL_MARSHALING, observer && vd);
1435  }
1436 
1437  // Per sample logging
1438  if (DCPS_debug_level >= 8) {
1440  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
1441  ACE_TEXT("instance %d is_new_instance %d filtered %d\n"),
1442  LogGuid(get_guid()).c_str(),
1443  LogGuid(header.publication_id_).c_str(),
1444  instance ? instance->instance_handle_ : 0,
1445  is_new_instance, filtered));
1446  }
1447 
1448  if (filtered) break; // sample filtered from instance
1449 
1450  if (instance) accept_sample_processing(instance, header, is_new_instance);
1451  }
1452  break;
1453 
1454 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1455  case END_COHERENT_CHANGES: {
1456  CoherentChangeControl control;
1457 
1458  this->writer_activity(sample.header_);
1459 
1460  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1461  Serializer serializer(
1462  payload.get(), Encoding::KIND_UNALIGNED_CDR,
1464  if (!(serializer >> control)) {
1465  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1466  ACE_TEXT("deserialization coherent change control failed.\n")));
1467  return;
1468  }
1469 
1470  if (DCPS_debug_level > 0) {
1471  std::stringstream buffer;
1472  buffer << control << std::endl;
1473 
1475  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1476  ACE_TEXT("END_COHERENT_CHANGES %C\n"),
1477  buffer.str().c_str()));
1478  }
1479 
1480  WriterInfo_rch writer;
1481  {
1482  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1483 
1484  WriterMapType::iterator it =
1485  this->writers_.find(sample.header_.publication_id_);
1486 
1487  if (it == this->writers_.end()) {
1489  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
1490  ACE_TEXT(" subscription %C failed to find ")
1491  ACE_TEXT(" publication data for %C!\n"),
1492  LogGuid(get_guid()).c_str(),
1493  LogGuid(sample.header_.publication_id_).c_str()));
1494  return;
1495  }
1496  else {
1497  writer = it->second;
1498  }
1499  it->second->set_group_info(control);
1500  }
1501 
1502  if (this->verify_coherent_changes_completion(writer.in())) {
1503  this->notify_read_conditions();
1504  }
1505  }
1506  break;
1507 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
1508 
1509  case DATAWRITER_LIVELINESS: {
1510  if (DCPS_debug_level >= 4) {
1512  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1513  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
1514  LogGuid(get_guid()).c_str(),
1515  LogGuid(sample.header_.publication_id_).c_str()));
1516  }
1517  this->writer_activity(sample.header_);
1518 
1519  // tell all instances they got a liveliness message
1520  {
1522  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1523  iter != instances_.end();
1524  ++iter) {
1525  if (iter->second->instance_state_->writes_instance(sample.header_.publication_id_)) {
1526  iter->second->instance_state_->lively(sample.header_.publication_id_);
1527  }
1528  }
1529  }
1530 
1531  }
1532  break;
1533 
1534  case DISPOSE_INSTANCE: {
1535  if (!check_historic(sample)) break;
1536  this->writer_activity(sample.header_);
1537  SubscriptionInstance_rch instance;
1538 
1540  // Find the instance first for timer cancellation since
1541  // the instance may be deleted during dispose and can
1542  // not be accessed.
1543  ReceivedDataSample dup(sample);
1544  this->lookup_instance(dup, instance);
1545 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1546  OwnershipManagerPtr owner_manager = this->ownership_manager();
1547 
1548  if (! this->is_exclusive_ownership_
1549  || (owner_manager
1550  && (instance)
1551  && (owner_manager->is_owner(instance->instance_handle_,
1552  sample.header_.publication_id_)))) {
1553 #endif
1554  cancel_deadline(instance);
1555 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1556  }
1557 #endif
1558  }
1559  instance.reset();
1560  this->dispose_unregister(sample, publication_handle, instance);
1561  }
1562  this->notify_read_conditions();
1563  break;
1564 
1565  case UNREGISTER_INSTANCE: {
1566  if (!check_historic(sample)) break;
1567  this->writer_activity(sample.header_);
1568  SubscriptionInstance_rch instance;
1569 
1571  // Find the instance first for timer cancellation since
1572  // the instance may be deleted during dispose and can
1573  // not be accessed.
1574  ReceivedDataSample dup(sample);
1575  this->lookup_instance(dup, instance);
1576  if (instance) {
1577 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1578  if (! this->is_exclusive_ownership_
1579  || (this->is_exclusive_ownership_
1580  && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1581 #endif
1582  cancel_deadline(instance);
1583 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1584  }
1585 #endif
1586  }
1587  }
1588  instance.reset();
1589  this->dispose_unregister(sample, publication_handle, instance);
1590  }
1591  this->notify_read_conditions();
1592  break;
1593 
1595  if (!check_historic(sample)) break;
1596  this->writer_activity(sample.header_);
1597  SubscriptionInstance_rch instance;
1598 
1600  // Find the instance first for timer cancellation since
1601  // the instance may be deleted during dispose and can
1602  // not be accessed.
1603  ReceivedDataSample dup(sample);
1604  this->lookup_instance(dup, instance);
1605 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1606  OwnershipManagerPtr owner_manager = this->ownership_manager();
1607  if (! this->is_exclusive_ownership_
1608  || (owner_manager
1609  && (instance)
1610  && (owner_manager->is_owner (instance->instance_handle_,
1611  sample.header_.publication_id_)))
1613  && (instance)
1614  && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1615 #endif
1616  if (instance) {
1617  cancel_deadline(instance);
1618  }
1619 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1620  }
1621 #endif
1622  }
1623  instance.reset();
1624  this->dispose_unregister(sample, publication_handle, instance);
1625  }
1626  this->notify_read_conditions();
1627  break;
1628 
1629  case END_HISTORIC_SAMPLES: {
1630  if (sample.header_.message_length_ >= sizeof(GUID_t)) {
1631  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1632  Serializer ser(payload.get(), Encoding::KIND_UNALIGNED_CDR);
1633  GUID_t readerId = GUID_UNKNOWN;
1634  if (!(ser >> readerId)) {
1635  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1636  ACE_TEXT("deserialization reader failed.\n")));
1637  return;
1638  }
1639  const GUID_t repo_id(get_guid());
1640  if (readerId != GUID_UNKNOWN && readerId != repo_id) {
1641  break; // not our message
1642  }
1643  }
1644  if (DCPS_debug_level > 4) {
1645  ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
1646  }
1647  // Going to acquire writers lock, release samples lock
1648  guard.release();
1650  if (DCPS_debug_level > 4) {
1651  ACE_DEBUG((
1652  LM_INFO,
1653  "(%P|%t) Resumed sample processing for durable writer %C\n",
1654  LogGuid(sample.header_.publication_id_).c_str()));
1655  }
1656  break;
1657  }
1658 
1659  default:
1661  "(%P|%t) ERROR: DataReaderImpl::data_received"
1662  "unexpected message_id = %d\n",
1663  sample.header_.message_id_));
1664  break;
1665  }
1666 
1667  if (observer && real_data && vd) {
1668  const DDS::Time_t timestamp = {
1671  };
1672  Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, sample.header_.instance_state(), timestamp, sample.header_.sequence_, real_data->get(), *vd);
1673  observer->on_sample_received(this, s);
1674  }
1675 }
1676 
1679 {
1680  return subscriber_servant_.lock();
1681 }
1682 
1683 bool
1685 {
1687  return ti.is_reliable();
1688  }
1689  return true;
1690 }
1691 
1693 {
1694  //sample lock is already held
1695  ReadConditionSet local_read_conditions = read_conditions_;
1697 
1698  for (ReadConditionSet::iterator it = local_read_conditions.begin(),
1699  end = local_read_conditions.end(); it != end; ++it) {
1700  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
1701  if (ci) {
1702  ci->signal_all();
1703  } else {
1705  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
1706  ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
1707  }
1708  }
1709 }
1710 
1713 {
1714  return subscriber_servant_.lock();
1715 }
1716 
1719 {
1720  //!!!caller should have acquired sample_lock_
1723 }
1724 
1725 bool
1727 {
1728  //!!!caller should have acquired sample_lock_
1731 }
1732 
1735 {
1736  //!!!caller should have acquired sample_lock_
1738  return lookup_matching_instances(DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, instance_states).size();
1739 }
1740 
1741 /// Fold-in the three separate loops of have_sample_states(),
1742 /// have_view_states(), and have_instance_states(). Takes the sample_lock_.
1745 {
1748 
1749  return lookup_matching_instances(sample_states, view_states, instance_states).size();
1750 }
1751 
1752 DDS::DataReaderListener_ptr
1754 {
1755  // per 2.1.4.3.1 Listener Access to Plain Communication Status
1756  // use this entities factory if listener is mask not enabled
1757  // for this kind.
1760  if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
1761  g.release();
1762  return subscriber->listener_for(kind);
1763 
1764  } else {
1765  return DDS::DataReaderListener::_duplicate(listener_.in());
1766  }
1767 }
1768 
1770  const ReceivedDataElement *ptr)
1771 {
1772 
1773  sample_info.sample_rank = 0;
1774 
1775  // generation_rank =
1776  // (MRSIC.disposed_generation_count +
1777  // MRSIC.no_writers_generation_count)
1778  // - (S.disposed_generation_count +
1779  // S.no_writers_generation_count)
1780  //
1781  sample_info.generation_rank =
1782  (sample_info.disposed_generation_count +
1783  sample_info.no_writers_generation_count) -
1784  sample_info.generation_rank;
1785 
1786  // absolute_generation_rank =
1787  // (MRS.disposed_generation_count +
1788  // MRS.no_writers_generation_count)
1789  // - (S.disposed_generation_count +
1790  // S.no_writers_generation_count)
1791  //
1792  sample_info.absolute_generation_rank =
1793  (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
1794  static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
1795  sample_info.absolute_generation_rank;
1796 
1798 }
1799 
1801 {
1802  //!!!caller should have acquired sample_lock_
1804 
1805  CORBA::Long count(0);
1806 
1807  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1808  iter != instances_.end();
1809  ++iter) {
1810  SubscriptionInstance_rch ptr = iter->second;
1811 
1812  count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size());
1813  }
1814 
1815  return count;
1816 }
1817 
1818 void
1820 {
1821  execute_or_enqueue(make_rch<CheckLivelinessCommand>(this));
1822 }
1823 
1824 int
1826  const void * /*arg*/)
1827 {
1828  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1829 
1830  check_liveliness_i(false, MonotonicTimePoint(tv));
1831  return 0;
1832 }
1833 
1834 void
1836  const MonotonicTimePoint& now)
1837 {
1838  // Working copy of the active timer Id.
1839 
1840  RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
1841  if (! data_reader) {
1842  this->reactor()->purge_pending_notifications(this);
1843  return;
1844  }
1845 
1846  long local_timer_id = liveliness_timer_id_;
1847  bool timer_was_reset = false;
1848 
1849  if (local_timer_id != -1 && cancel) {
1850  if (DCPS_debug_level >= 5) {
1852  ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1853  ACE_TEXT(" canceling timer for reader %C.\n"),
1854  LogGuid(data_reader->get_guid()).c_str()));
1855  }
1856 
1857  // called from add_associations and there is already a timer
1858  // so cancel the existing timer.
1859  if (this->reactor()->cancel_timer(local_timer_id) == -1) {
1860  // this could fail because the reactor's call and
1861  // the add_associations' call to this could overlap
1862  // so it is not a failure.
1864  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1865  ACE_TEXT(" %p.\n"), ACE_TEXT("cancel_timer")));
1866  }
1867 
1868  timer_was_reset = true;
1869  }
1870 
1871  // Used after the lock scope ends.
1873  int alive_writers = 0;
1874 
1875  // This is a bit convoluted. The reasoning goes as follows:
1876  // 1) We grab the current timer Id value when we enter the method.
1877  // 2) We *might* cancel the timer if it is active.
1878  // 3) The timer *might* be rescheduled while we do not hold the sample lock.
1879  // 4) If we (or another thread) canceled the timer that we can tell, then
1880  // 5) we should clear the Id value,
1881  // 6) unless it has been rescheduled.
1882  // We are using a changed timer Id value as a proxy for having been
1883  // rescheduled.
1884  if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
1885  liveliness_timer_id_ = -1;
1886  }
1887 
1888  // Iterate over each writer to this reader
1889  {
1891  read_guard,
1892  data_reader->writers_lock_);
1893  WriterMapType writers = data_reader->writers_;
1894  read_guard.release();
1895 
1896  for (WriterMapType::iterator iter = writers.begin();
1897  iter != writers.end();
1898  ++iter) {
1899  // deal with possibly not being alive or
1900  // tell when it will not be alive next (if no activity)
1901  const MonotonicTimePoint next_absolute(iter->second->check_activity(now));
1902  if (!next_absolute.is_max()) {
1903  alive_writers++;
1904  smallest = std::min(smallest, next_absolute);
1905  }
1906  }
1907  }
1908 
1909  if (!alive_writers) {
1910  // no live writers so no need to schedule a timer
1911  // but be sure we don't try to cancel the timer later.
1912  liveliness_timer_id_ = -1;
1913  }
1914 
1915  if (DCPS_debug_level >= 5) {
1917  ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1918  ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
1919  LogGuid(data_reader->get_guid()).c_str(),
1920  alive_writers,
1921  !cancel));
1922  }
1923 
1924  // Call into the reactor after releasing the sample lock.
1925  if (alive_writers) {
1926  // compare the time now with the earliest(smallest) deadline we found
1927  TimeDuration relative;
1928  if (now < smallest) {
1929  relative = smallest - now;
1930  } else {
1931  relative = TimeDuration(0, 1); // ASAP
1932  }
1933  liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative.value());
1934 
1935  if (liveliness_timer_id_ == -1) {
1937  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1938  ACE_TEXT(" %p.\n"), ACE_TEXT("schedule_timer")));
1939  }
1940  }
1941 }
1942 
1943 void
1945 {
1946 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1947  OwnershipManagerPtr owner_manager = this->ownership_manager();
1948  if (owner_manager) {
1949  owner_manager->remove_writers(handle);
1950  }
1951 #endif
1952 
1954  SubscriptionInstance_rch instance = this->get_handle_instance(handle);
1955 
1956  if (!instance) {
1957  ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
1958  "could not find the instance by handle 0x%x\n", handle));
1959  return;
1960  }
1961 
1962  this->purge_data(instance);
1963 
1964  {
1966  instances_.erase(handle);
1967  }
1968 
1969  this->release_instance_i(handle);
1970  if (this->monitor_) {
1971  this->monitor_->report();
1972  }
1973 }
1974 
1975 void
1977 {
1979  state_updated_i(handle);
1980 }
1981 
1983  int amount,
1984  DataCollector<double>::OnFull type) : stats_(amount, type)
1985 {
1986 }
1987 
1989 {
1990  double datum = static_cast<double>(delay.value().sec());
1991  datum += delay.value().usec() / 1000000.0;
1992  this->stats_.add(datum);
1993 }
1994 
1996 {
1998 
1999  value.publication = GUID_UNKNOWN;
2000  value.n = this->stats_.n();
2001  value.maximum = this->stats_.maximum();
2002  value.minimum = this->stats_.minimum();
2003  value.mean = this->stats_.mean();
2004  value.variance = this->stats_.var();
2005 
2006  return value;
2007 }
2008 
2010 {
2011  this->stats_.reset();
2012 }
2013 
2014 #ifndef OPENDDS_SAFETY_PROFILE
2015 std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
2016 {
2017  str << std::dec << this->stats_.size()
2018  << " samples out of " << this->stats_.n() << std::endl;
2019  return str << this->stats_;
2020 }
2021 #endif //OPENDDS_SAFETY_PROFILE
2022 
2023 void
2025 {
2026  const GUID_t info_writer_id = info.writer_id();
2027 
2028  if (DCPS_debug_level >= 5) {
2030  ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
2031  ACE_TEXT("reader %C from writer %C.\n"),
2032  LogGuid(get_guid()).c_str(),
2033  LogGuid(info_writer_id).c_str()));
2034  }
2035 
2036 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2037  OwnershipManagerPtr owner_manager = this->ownership_manager();
2038  if (owner_manager) {
2039  owner_manager->remove_writer(info_writer_id);
2040  info.clear_owner_evaluated();
2041  }
2042 #endif
2043 
2044  {
2045  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2046  {
2047  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
2048  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2049  if (pos != publication_id_to_handle_map_.end()) {
2050  publication_handle = pos->second;
2051  }
2052  }
2053 
2054  bool liveliness_changed = false;
2055 
2056  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2057 
2058  const WriterInfo::WriterState info_state = info.state();
2059 
2060  if (info_state == WriterInfo::ALIVE) {
2061  --liveliness_changed_status_.alive_count;
2062  --liveliness_changed_status_.alive_count_change;
2063  liveliness_changed = true;
2064  }
2065 
2066  if (info_state == WriterInfo::DEAD) {
2067  --liveliness_changed_status_.not_alive_count;
2068  --liveliness_changed_status_.not_alive_count_change;
2069  liveliness_changed = true;
2070  }
2071 
2072  liveliness_changed_status_.last_publication_handle = info.handle();
2073  instances_liveliness_update(info_writer_id, publication_handle);
2074 
2075  if (liveliness_changed) {
2076  set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2077  this->notify_liveliness_change();
2078  }
2079  }
2080 }
2081 
2082 void
2084 {
2085  const GUID_t info_writer_id = info.writer_id();
2086 
2087  if (DCPS_debug_level >= 5) {
2088  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
2089  ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2090  LogGuid(get_guid()).c_str(),
2091  LogGuid(info_writer_id).c_str(),
2092  info.get_state_str()));
2093  }
2094 
2095  // NOTE: each instance will change to ALIVE_STATE when they receive a sample
2096 
2097  const WriterInfo::WriterState info_state = info.state();
2098 
2099  {
2100  bool liveliness_changed = false;
2101 
2102  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2103 
2104  if (info_state != WriterInfo::ALIVE) {
2105  liveliness_changed_status_.alive_count++;
2106  liveliness_changed_status_.alive_count_change++;
2107  liveliness_changed = true;
2108  }
2109 
2110  if (info_state == WriterInfo::DEAD) {
2111  liveliness_changed_status_.not_alive_count--;
2112  liveliness_changed_status_.not_alive_count_change--;
2113  }
2114 
2115  if (liveliness_changed_status_.alive_count < 0) {
2117  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2118  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2119  liveliness_changed_status_.alive_count));
2120  return;
2121  }
2122 
2123  if (liveliness_changed_status_.not_alive_count < 0) {
2125  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2126  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2127  liveliness_changed_status_.not_alive_count));
2128  return;
2129  }
2130 
2131  liveliness_changed_status_.last_publication_handle = info.handle();
2132 
2133  // Change the state to ALIVE since handle_timeout may call writer_became_dead
2134  // which need the current state info.
2135  info.state(WriterInfo::ALIVE);
2136 
2137  if (this->monitor_) {
2138  this->monitor_->report();
2139  }
2140 
2141  // Call listener only when there are liveliness status changes.
2142  if (liveliness_changed) {
2143  set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2144  this->notify_liveliness_change();
2145  }
2146  }
2147 
2148  // this call will start the liveliness timer if it is not already set
2149  liveliness_timer_->check_liveliness();
2150 }
2151 
2152 void
2154 {
2155  const GUID_t info_writer_id = info.writer_id();
2156 
2157  if (DCPS_debug_level >= 5) {
2158  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
2159  ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2160  LogGuid(get_guid()).c_str(),
2161  LogGuid(info_writer_id).c_str(),
2162  info.get_state_str()));
2163  }
2164 
2165 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2166  OwnershipManagerPtr owner_manager = this->ownership_manager();
2167  if (owner_manager) {
2168  owner_manager->remove_writer(info_writer_id);
2169  info.clear_owner_evaluated();
2170  }
2171 #endif
2172 
2173  bool liveliness_changed = false;
2174 
2175  const WriterInfo::WriterState info_state = info.state();
2176 
2177  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2178  {
2179  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
2180  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2181  if (pos != publication_id_to_handle_map_.end()) {
2182  publication_handle = pos->second;
2183  }
2184  }
2185 
2186  {
2187  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2188 
2189  if (info_state != WriterInfo::DEAD) {
2190  ++liveliness_changed_status_.not_alive_count;
2191  ++liveliness_changed_status_.not_alive_count_change;
2192  liveliness_changed = true;
2193  }
2194 
2195  if (info_state == WriterInfo::ALIVE) {
2196  --liveliness_changed_status_.alive_count;
2197  --liveliness_changed_status_.alive_count_change;
2198  }
2199 
2200  if (liveliness_changed_status_.alive_count < 0) {
2202  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2203  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2204  liveliness_changed_status_.alive_count));
2205  return;
2206  }
2207 
2208  if (liveliness_changed_status_.not_alive_count < 0) {
2210  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2211  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2212  liveliness_changed_status_.not_alive_count));
2213  return;
2214  }
2215 
2216  liveliness_changed_status_.last_publication_handle = info.handle();
2217 
2218  info.state(WriterInfo::DEAD);
2219 
2220  if (this->monitor_) {
2221  this->monitor_->report();
2222  }
2223 
2224  instances_liveliness_update(info_writer_id, publication_handle);
2225 
2226  // Call listener only when there are liveliness status changes.
2227  if (liveliness_changed) {
2228  set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2229  this->notify_liveliness_change();
2230  }
2231  }
2232 }
2233 
2234 void
2236  DDS::InstanceHandle_t publication_handle)
2237 {
2238  // sample_lock_ must be held.
2239  InstanceSet localinsts;
2240  {
2241  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
2242  if (instances_.size() == 0) {
2243  return;
2244  }
2245  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2246  iter != instances_.end(); ++iter) {
2247  if (iter->second->instance_state_->writes_instance(writer)) {
2248  localinsts.insert(iter->first);
2249  }
2250  }
2251  }
2252 
2253  for (InstanceSet::iterator iter = localinsts.begin(); iter != localinsts.end(); ++iter) {
2254  set_instance_state_i(*iter, publication_handle, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, SystemTimePoint::now(), writer);
2255  }
2256 }
2257 
2258 
2259 void
2261  const DDS::SampleLostStatus& status)
2262 {
2263  //!!!caller should have acquired sample_lock_
2264  sample_lost_status_ = status;
2265 }
2266 
2267 void
2269  const DDS::SampleRejectedStatus& status)
2270 {
2271  //!!!caller should have acquired sample_lock_
2272  sample_rejected_status_ = status;
2273 }
2274 
2278 {
2279  if (DCPS_debug_level > 0) {
2280  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
2281  }
2282 }
2283 
2285 {
2286  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2287  StatsMapType::iterator location = this->statistics_.find(sample.header_.publication_id_);
2288 
2289  if (location != this->statistics_.end()) {
2291 
2292  // Only when the user has specified a latency budget or statistics
2293  // are enabled we need to calculate our latency
2294  if ((this->statistics_enabled()) ||
2295  (this->qos_.latency_budget.duration > zero)) {
2296  const DDS::Time_t timestamp = {
2299  };
2300  const TimeDuration latency = SystemTimePoint::now() - SystemTimePoint(timestamp);
2301 
2302  if (this->statistics_enabled()) {
2303  location->second.add_stat(latency);
2304  }
2305 
2306  if (DCPS_debug_level > 9) {
2308  ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2309  ACE_TEXT("measured latency of %C for current sample.\n"),
2310  latency.str().c_str()));
2311  }
2312 
2313  if (this->qos_.latency_budget.duration > zero) {
2314  // Check latency against the budget.
2315  if (latency > TimeDuration(this->qos_.latency_budget.duration)) {
2316  this->notify_latency(sample.header_.publication_id_);
2317  }
2318  }
2319  }
2320  } else if (DCPS_debug_level > 0) {
2321  /// NB: This message is generated contemporaneously with a similar
2322  /// message from writer_activity(). That message is not marked
2323  /// as an error, so we follow that lead and leave this as an
2324  /// informational message, guarded by debug level. This seems
2325  /// to be due to late samples (samples delivered after an
2326  /// association has been torn down). We may want to promote this
2327  /// to a warning if other conditions causing this symptom are
2328  /// discovered.
2330  ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2331  ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
2332  LogGuid(get_guid()).c_str(),
2333  LogGuid(sample.header_.publication_id_).c_str()));
2334  }
2335 }
2336 
2338 {
2339  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2340  // is given to this DataReader then narrow() fails.
2341  DataReaderListener_var listener = get_ext_listener();
2342 
2343  if (!CORBA::is_nil(listener.in())) {
2344  WriterIdSeq writerIds;
2345  writerIds.length(1);
2346  writerIds[ 0] = writer;
2347 
2348  DDS::InstanceHandleSeq handles;
2349  this->lookup_instance_handles(writerIds, handles);
2350 
2351  if (handles.length() >= 1) {
2352  this->budget_exceeded_status_.last_instance_handle = handles[ 0];
2353 
2354  } else {
2355  this->budget_exceeded_status_.last_instance_handle = -1;
2356  }
2357 
2358  ++this->budget_exceeded_status_.total_count;
2359  ++this->budget_exceeded_status_.total_count_change;
2360 
2361  listener->on_budget_exceeded(this, this->budget_exceeded_status_);
2362 
2363  this->budget_exceeded_status_.total_count_change = 0;
2364  }
2365 }
2366 
2367 #ifndef OPENDDS_SAFETY_PROFILE
2368 void
2371 {
2372  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2373  stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
2374  int index = 0;
2375 
2376  for (StatsMapType::const_iterator current = this->statistics_.begin();
2377  current != this->statistics_.end();
2378  ++current, ++index) {
2379  stats[ index] = current->second.get_stats();
2380  stats[ index].publication = current->first;
2381  }
2382 }
2383 #endif
2384 
2385 void
2387 {
2388  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2389  for (StatsMapType::iterator current = this->statistics_.begin();
2390  current != this->statistics_.end();
2391  ++current) {
2392  current->second.reset_stats();
2393  }
2394 }
2395 
2398 {
2399  return statistics_enabled_;
2400 }
2401 
2402 void
2404  CORBA::Boolean statistics_enabled)
2405 {
2406  statistics_enabled_ = statistics_enabled;
2407 }
2408 
2409 void
2411 {
2412  const Observer_rch observer = get_observer(Observer::e_DELETED);
2413  if (observer) {
2414  observer->on_deleted(this);
2415  }
2416 
2417  this->set_deleted(true);
2418  this->stop_associating();
2419  this->send_final_acks();
2420  subscription_id_condition_.notify_all();
2421 }
2422 
2425 {
2426  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
2427 
2428  SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
2429  if (iter == instances_.end()) {
2431  ACE_TEXT("(%P|%t) WARNING: ")
2432  ACE_TEXT("DataReaderImpl::get_handle_instance: ")
2433  ACE_TEXT("lookup for 0x%x failed\n"),
2434  handle));
2435  return SubscriptionInstance_rch();
2436  } // if (0 != instances_.find(handle, instance))
2437 
2438  return iter->second;
2439 }
2440 
2443 {
2444  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2445  if (!participant)
2446  return DDS::HANDLE_NIL;
2447 
2448  if (is_bit()) {
2449  const GUID_t id = bit_key_to_guid(key);
2450  return participant->assign_handle(id);
2451 
2452  } else {
2453  return participant->assign_handle();
2454  }
2455 }
2456 
2458 {
2459  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
2460  if (participant) {
2461  participant->return_handle(handle);
2462  }
2463 }
2464 
2465 void
2467 {
2468  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
2469 
2470  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2471  // is given to this DataReader then narrow() fails.
2472  DataReaderListener_var the_listener = get_ext_listener();
2473 
2474  if (!CORBA::is_nil(the_listener.in())) {
2475  SubscriptionLostStatus status;
2476 
2477  // Since this callback may come after remove_association which removes
2478  // the writer from id_to_handle map, we can ignore this error.
2479  this->lookup_instance_handles(pubids, status.publication_handles);
2480  the_listener->on_subscription_disconnected(this, status);
2481  }
2482 }
2483 
2484 void
2486 {
2487  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
2488 
2489  if (!this->is_bit_) {
2490  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2491  // is given to this DataReader then narrow() fails.
2492  DataReaderListener_var the_listener = get_ext_listener();
2493 
2494  if (!CORBA::is_nil(the_listener.in())) {
2495  SubscriptionLostStatus status;
2496 
2497  // If it's reconnected then the reader should be in id_to_handle
2498  this->lookup_instance_handles(pubids, status.publication_handles);
2499 
2500  the_listener->on_subscription_reconnected(this, status);
2501  }
2502  }
2503 }
2504 
2505 void
2507 {
2508  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2509 
2510  if (!this->is_bit_) {
2511  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2512  // is given to this DataReader then narrow() fails.
2513  DataReaderListener_var the_listener = get_ext_listener();
2514 
2515  if (!CORBA::is_nil(the_listener.in())) {
2516  SubscriptionLostStatus status;
2517 
2518  CORBA::ULong len = handles.length();
2519  status.publication_handles.length(len);
2520 
2521  for (CORBA::ULong i = 0; i < len; ++ i) {
2522  status.publication_handles[i] = handles[i];
2523  }
2524 
2525  the_listener->on_subscription_lost(this, status);
2526  }
2527  }
2528 }
2529 
2530 void
2532 {
2533  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2534 
2535  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2536  // is given to this DataReader then narrow() fails.
2537  DataReaderListener_var the_listener = get_ext_listener();
2538 
2539  if (!CORBA::is_nil(the_listener.in())) {
2540  SubscriptionLostStatus status;
2541 
2542  // Since this callback may come after remove_association which removes
2543  // the writer from id_to_handle map, we can ignore this error.
2544  this->lookup_instance_handles(pubids, status.publication_handles);
2545  the_listener->on_subscription_lost(this, status);
2546  }
2547 }
2548 
2549 
2550 void
2552  DDS::InstanceHandleSeq & hdls)
2553 {
2554  CORBA::ULong const num_wrts = ids.length();
2555 
2556  if (DCPS_debug_level > 9) {
2557  const char* separator = "";
2558  OPENDDS_STRING guids;
2559 
2560  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2561  guids += separator;
2562  guids += LogGuid(ids[i]).conv_;
2563  separator = ", ";
2564  }
2565 
2567  ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
2568  ACE_TEXT("searching for handles for writer Ids: %C.\n"),
2569  guids.c_str()));
2570  }
2571 
2572  hdls.length(num_wrts);
2573 
2574  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2575  if (participant) {
2576  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2577  hdls[i] = participant->lookup_handle(ids[i]);
2578  }
2579  }
2580 }
2581 
2582 bool
2584 {
2585  const SystemTimePoint now = SystemTimePoint::now();
2586 
2587  // Expire historic data if QoS indicates VOLATILE.
2588  if (!always_get_history_ && header.historic_sample_
2589  && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
2590  if (DCPS_debug_level >= 8) {
2592  ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
2593  ACE_TEXT("Discarded historic data.\n")));
2594  }
2595 
2596  return true; // Data filtered.
2597  }
2598 
2599  // The LIFESPAN_DURATION_FLAG is set when sample data is sent
2600  // with a non-default LIFESPAN duration value.
2601  if (header.lifespan_duration_) {
2602  // Finite lifespan. Check if data has expired.
2603 
2604  const DDS::Time_t expiration_dds_time = {
2607  };
2608  const SystemTimePoint expiration_time(expiration_dds_time);
2609 
2610  // We assume that the publisher host's clock and subcriber host's
2611  // clock are synchronized (allowed by the spec).
2612  if (now >= expiration_time) {
2613  if (DCPS_debug_level >= 8) {
2614  const TimeDuration diff(now - expiration_time);
2616  ACE_TEXT("(%P|%t) Received data ")
2617  ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
2618  diff.value().sec(),
2619  diff.value().usec()));
2620  }
2621 
2622  return true; // Data filtered.
2623  }
2624  }
2625 
2626  return false;
2627 }
2628 
2629 bool
2631  const GUID_t& pubid)
2632 {
2633 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2634  if (this->is_exclusive_ownership_) {
2635 
2636  ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
2637  WriterMapType::iterator iter = writers_.find(pubid);
2638 
2639  if (iter == writers_.end()) {
2640  if (DCPS_debug_level > 4) {
2641  // This may not be an error since it could happen that the sample
2642  // is delivered to the datareader after the write is dis-associated
2643  // with this datareader.
2645  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2646  ACE_TEXT("reader %C is not associated with writer %C.\n"),
2647  LogGuid(get_guid()).c_str(),
2648  LogGuid(pubid).c_str()));
2649  }
2650  return true;
2651  }
2652 
2653 
2654  // Evaulate the owner of the instance if not selected and filter
2655  // current message if it's not from owner writer.
2656  if ( instance->instance_state_->get_owner() == GUID_UNKNOWN
2657  || ! iter->second->is_owner_evaluated(instance->instance_handle_)) {
2658  OwnershipManagerPtr owner_manager = this->ownership_manager();
2659 
2660  bool is_owner = owner_manager && owner_manager->select_owner (
2661  instance->instance_handle_,
2662  iter->second->writer_id(),
2663  iter->second->writer_qos_ownership_strength(),
2664  instance->instance_state_);
2665  iter->second->set_owner_evaluated(instance->instance_handle_, true);
2666 
2667  if (! is_owner) {
2668  if (DCPS_debug_level >= 1) {
2670  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2671  ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
2672  LogGuid(get_guid()).c_str(),
2673  LogGuid(pubid).c_str(),
2674  LogGuid(instance->instance_state_->get_owner()).c_str()));
2675  }
2676  return true;
2677  }
2678  }
2679  else if (! (instance->instance_state_->get_owner() == pubid)) {
2680  if (DCPS_debug_level >= 1) {
2682  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2683  ACE_TEXT("reader %C writer %C is not owner %C\n"),
2684  LogGuid(get_guid()).c_str(),
2685  LogGuid(pubid).c_str(),
2686  LogGuid(instance->instance_state_->get_owner()).c_str()));
2687  }
2688  return true;
2689  }
2690  }
2691 #else
2692  ACE_UNUSED_ARG(pubid);
2693  ACE_UNUSED_ARG(instance);
2694 #endif
2695  return false;
2696 }
2697 
2698 bool
2700  MonotonicTimePoint& now,
2701  MonotonicTimePoint& deadline)
2702 {
2703  now = MonotonicTimePoint::now();
2704  const TimeDuration minimum_separation(qos_.time_based_filter.minimum_separation);
2705 
2706  // TIME_BASED_FILTER processing; expire data samples
2707  // if minimum separation is not met for instance.
2708  if (!minimum_separation.is_zero()) {
2709  if (now - instance->last_accepted_ < minimum_separation) {
2710  deadline = now + minimum_separation;
2711  return true; // Data filtered.
2712  }
2713  }
2714 
2715  instance->last_accepted_ = now;
2716 
2717  return false;
2718 }
2719 
2721 {
2722  return this->is_bit_;
2723 }
2724 
2725 bool
2727 {
2729  guard,
2730  this->sample_lock_,
2731  true /* assume we have loans */);
2732  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
2733 
2734  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2735  iter != instances_.end();
2736  ++iter) {
2737  SubscriptionInstance_rch ptr = iter->second;
2738 
2739  if (ptr->rcvd_samples_.has_zero_copies()) {
2740  return true;
2741  }
2742  }
2743 
2744  return false;
2745 }
2746 
2748 {
2749  // sample_lock_ must be held.
2750  // N.B. writers_lock_ should already be acquired when
2751  // this method is called.
2752 
2753  DDS::DataReaderListener_var listener
2754  = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
2755 
2756  if (!CORBA::is_nil(listener.in())) {
2757  const DDS::LivelinessChangedStatus status = liveliness_changed_status_;
2758  liveliness_changed_status_.alive_count_change = 0;
2759  liveliness_changed_status_.not_alive_count_change = 0;
2760  ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2761  listener->on_liveliness_changed(this, status);
2762  }
2763  notify_status_condition();
2764 
2765  if (DCPS_debug_level > 9) {
2766  ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
2767  OPENDDS_STRING output_str;
2768  output_str += "subscription ";
2769  output_str += LogGuid(get_guid()).conv_;
2770  output_str += ", listener at: 0x";
2771  output_str += to_dds_string(this->listener_.in());
2772 
2773  for (WriterMapType::iterator current = this->writers_.begin();
2774  current != this->writers_.end();
2775  ++current) {
2776  const GUID_t id = current->first;
2777  output_str += "\n\tNOTIFY: writer[ ";
2778  output_str += LogGuid(id).conv_;
2779  output_str += "] == ";
2780  output_str += current->second->get_state_str();
2781  }
2782 
2784  ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
2785  ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
2786  ACE_TEXT("\tNOTIFY: %C\n"),
2787  listener.in(),
2788  listener_mask_,
2789  output_str.c_str()));
2790  }
2791 }
2792 
2794 {
2795  set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
2796  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2797  if (subscriber) {
2798  subscriber->set_status_changed_flag(
2800  }
2801 }
2802 
2805 {
2806  return this->reactor_;
2807 }
2808 
2811 {
2812  return topic_id_;
2813 }
2814 
2817 {
2818  return dp_id_;
2819 }
2820 
2821 void
2822 DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
2823 {
2824  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2825  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2826 
2827  for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
2828  end = instances_.end(); iter != end; ++iter) {
2829  instance_handles.push_back(iter->first);
2830  }
2831 }
2832 
2833 void
2834 DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
2835 {
2837  read_guard,
2838  this->writers_lock_);
2839  for (WriterMapType::iterator iter = writers_.begin();
2840  iter != writers_.end();
2841  ++iter) {
2842  writer_states.push_back(WriterStatePair(iter->first,
2843  iter->second->state()));
2844  }
2845 }
2846 
2847 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2848 void
2850  const CORBA::Long& ownership_strength)
2851 {
2853  read_guard,
2854  this->writers_lock_);
2855  for (WriterMapType::iterator iter = writers_.begin();
2856  iter != writers_.end();
2857  ++iter) {
2858  if (iter->second->writer_id() == pub_id) {
2859  if (ownership_strength != iter->second->writer_qos_ownership_strength()) {
2860  if (DCPS_debug_level >= 1) {
2862  ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
2863  ACE_TEXT("local %C update remote %C strength from %d to %d\n"),
2864  LogGuid(get_guid()).c_str(),
2865  LogGuid(pub_id).c_str(),
2866  iter->second->writer_qos_ownership_strength(), ownership_strength));
2867  }
2868  iter->second->writer_qos_ownership_strength(ownership_strength);
2869  iter->second->clear_owner_evaluated();
2870  }
2871  break;
2872  }
2873  }
2874 }
2875 #endif
2876 
2877 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2879 {
2881  bool accept_here = true;
2882 
2883  const GUID_t writer_id = writer->writer_id();
2884  const GUID_t publisher_id = writer->publisher_id();
2885 
2886  if (subqos_.presentation.access_scope != ::DDS::INSTANCE_PRESENTATION_QOS &&
2887  subqos_.presentation.coherent_access) {
2888  // verify current coherent changes from single writer
2889  state = writer->coherent_change_received();
2890  if (writer->group_coherent()) { // GROUP coherent any state
2891  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2892  if (subscriber && state != NOT_COMPLETED_YET) {
2893  // verify if all readers received complete coherent changes in a group.
2894  subscriber->coherent_change_received(publisher_id, this, state);
2895  accept_here = false; // coherent_change_received does that itself
2896  }
2897  } else if (state != NOT_COMPLETED_YET) { // TOPIC coherent with final state
2898  if (state == REJECTED) {
2899  reject_coherent(writer_id, publisher_id);
2900  }
2901  writer->reset_coherent_info();
2902  }
2903  }
2904 
2905  if (state == COMPLETED && accept_here) {
2906  accept_coherent(writer_id, publisher_id);
2907  coherent_changes_completed(this);
2908  }
2909 
2910  return state == COMPLETED;
2911 }
2912 
2913 
2915  const GUID_t& publisher_id)
2916 {
2917  if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2919  ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
2920  ACE_TEXT(" reader %C writer %C publisher %C\n"),
2921  LogGuid(get_guid()).c_str(),
2922  LogGuid(writer_id).c_str(),
2923  LogGuid(publisher_id).c_str()));
2924  }
2925  SubscriptionInstanceSet localsubs;
2926  {
2927  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2928  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2929  iter != this->instances_.end(); ++iter) {
2930  localsubs.insert(iter->second);
2931  }
2932  }
2933  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2934  for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2935  iter != localsubs.end(); iter++) {
2936  (*iter)->rcvd_strategy_->accept_coherent(writer_id, publisher_id);
2937  }
2938 }
2939 
2940 
2942  const GUID_t& publisher_id)
2943 {
2944  if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2946  ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
2947  ACE_TEXT(" reader %C writer %C publisher %C\n"),
2948  LogGuid(get_guid()).c_str(),
2949  LogGuid(writer_id).c_str(),
2950  LogGuid(publisher_id).c_str()));
2951  }
2952 
2953  SubscriptionInstanceSet localsubs;
2954  {
2955  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2956  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2957  iter != this->instances_.end(); ++iter) {
2958  localsubs.insert(iter->second);
2959  }
2960  }
2961  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2962  for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2963  iter != localsubs.end(); iter++) {
2964  (*iter)->rcvd_strategy_->reject_coherent(writer_id, publisher_id);
2965  }
2966  this->reset_coherent_info(writer_id, publisher_id);
2967 }
2968 
2969 
2971  const GUID_t& publisher_id)
2972 {
2973  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2974 
2975  WriterMapType::iterator itEnd = this->writers_.end();
2976  for (WriterMapType::iterator it = this->writers_.begin();
2977  it != itEnd; ++it) {
2978  if (it->second->writer_id() == writer_id
2979  && it->second->publisher_id() == publisher_id) {
2980  it->second->reset_coherent_info();
2981  }
2982  }
2983 }
2984 
2985 
2986 void
2988 {
2989  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2990 
2991  result = COMPLETED;
2992  for (WriterMapType::iterator iter = writers_.begin();
2993  iter != writers_.end();
2994  ++iter) {
2995 
2996  if (iter->second->publisher_id() == publisher_id) {
2997  const Coherent_State state = iter->second->coherent_change_received();
2998  if (state == NOT_COMPLETED_YET) {
2999  result = NOT_COMPLETED_YET;
3000  break;
3001  }
3002  else if (state == REJECTED) {
3003  result = REJECTED;
3004  }
3005  }
3006  }
3007 }
3008 
3009 
3010 void
3012 {
3013  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
3014  if (!subscriber) {
3015  return;
3016  }
3017 
3018  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
3019  this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
3020 
3021  ::DDS::SubscriberListener_var sub_listener =
3022  subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
3023  if (!CORBA::is_nil(sub_listener.in()))
3024  {
3025  if (!is_bit()) {
3026  this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3027  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3028  if (reader == this) {
3029  // Release the sample_lock before listener callback.
3030  ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3031  sub_listener->on_data_on_readers(subscriber.in());
3032  }
3033  } else {
3034  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(subscriber, sub_listener, rchandle_from(this), reader == this, true));
3035  }
3036  }
3037  else
3038  {
3039  subscriber->notify_status_condition();
3040 
3041  ::DDS::DataReaderListener_var listener =
3042  this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
3043 
3044  if (!CORBA::is_nil(listener.in()))
3045  {
3046  if (!is_bit()) {
3047  set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3048  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3049  if (reader == this) {
3050  // Release the sample_lock before listener callback.
3051  ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3052  listener->on_data_available(this);
3053  } else {
3054  listener->on_data_available(this);
3055  }
3056  } else {
3057  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(this), reader == this, true, true));
3058  }
3059  }
3060  else
3061  {
3062  this->notify_status_condition();
3063  }
3064  }
3065 }
3066 
3067 
3069 {
3070  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3071  this->coherent_ = true;
3072 }
3073 
3074 
3076 {
3077  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3078  this->coherent_ = false;
3079  this->group_coherent_ordered_data_.reset();
3080  this->post_read_or_take();
3081 }
3082 
3083 
3088 {
3089  SubscriptionInstanceSet localsubs;
3090  {
3091  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
3092  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
3093  iter != instances_.end(); ++iter) {
3094  localsubs.insert(iter->second);
3095  }
3096  }
3097 
3098  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3099 
3100  for (SubscriptionInstanceSet::iterator iter = localsubs.begin(); iter != localsubs.end(); ++iter) {
3101  const SubscriptionInstance_rch inst = *iter;
3102  if (inst->instance_state_->match(view_states, instance_states)) {
3103  size_t i(0);
3104  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
3105  item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
3106  data.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3107  group_coherent_ordered_data_.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3108  }
3109  }
3110  }
3111 }
3112 
3113 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
3114 
3115 void
3117  const DDS::SubscriberQos & qos)
3118 {
3119  this->subqos_ = qos;
3120 }
3121 
3122 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
3123 void
3125 {
3126  cft->add_reader(*this);
3127  {
3128  ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
3129  content_filtered_topic_ = cft;
3130  }
3131 }
3132 
3133 DDS::ContentFilteredTopic_ptr
3135 {
3136  ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
3137  return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
3138 }
3139 #endif
3140 
3141 #ifndef OPENDDS_NO_MULTI_TOPIC
3142 void
3144 {
3145  multi_topic_ = mt;
3146 }
3147 #endif
3148 
3149 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
3150 
3151 void
3153 {
3154  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3155  disco->update_subscription_params(domain_id_,
3156  dp_id_,
3157  subscription_id_,
3158  params);
3159 }
3160 #endif
3161 
3162 void
3164 {
3165  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3166  for (WriterMapType::iterator iter = writers_.begin();
3167  iter != writers_.end();
3168  ++iter) {
3169  iter->second->set_owner_evaluated(instance, false);
3170  }
3171 }
3172 
3173 void
3175 {
3176  WriterInfo_rch info;
3177  {
3178  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3179  WriterMapType::iterator where = writers_.find(pub_id);
3180  if (writers_.end() != where) {
3181  info = where->second;
3182  }
3183  }
3184 
3185  if (info) {
3187  // Stop filtering these
3188  if (info->check_end_historic_samples(end_historic_sweeper_.in(), to_deliver)) {
3189  deliver_historic(to_deliver);
3191  }
3192  }
3193 }
3194 
3196 {
3197  ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
3198  WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
3199  if (iter != writers_.end()) {
3200  const SequenceNumber& seq = sample.header_.sequence_;
3201  SequenceNumber last_historic_seq;
3202  if (iter->second->check_historic(seq, sample, last_historic_seq)) {
3203  return false;
3204  }
3205  if (last_historic_seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
3206  && !sample.header_.historic_sample_
3207  && seq <= last_historic_seq) {
3208  // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
3209  return false;
3210  }
3211  }
3212  return true;
3213 }
3214 
3216 {
3217  typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
3218  const iter_t end = samples.end();
3219  for (iter_t iter = samples.begin(); iter != end; ++iter) {
3220  iter->second.header_.historic_sample_ = true;
3221  data_received(iter->second);
3222  }
3223 }
3224 
3225 void
3227 {
3228  if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
3229 
3230  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
3231 
3232  WriterMapType::iterator it = writers_.find(peer);
3233  if (it != writers_.end()) {
3234  // Schedule timer if necessary
3235  // - only need to check reader qos - we know the writer must be >= reader
3236  end_historic_sweeper_->schedule_timer(it->second);
3237  }
3238  }
3239  TransportClient::add_link(link, peer);
3240  OPENDDS_STRING type;
3241  {
3242  TransportImpl_rch impl = link->impl();
3243  if (impl) {
3244  type = impl->transport_type();
3245  }
3246  }
3247 
3248  if (type == "rtps_udp" || type == "multicast") {
3249  resume_sample_processing(peer);
3250  }
3251 }
3252 
3253 void
3255  const GUID_t& readerid,
3256  const GUID_t& writerid,
3257  const TransportLocatorSeq& locators,
3258  DiscoveryListener* listener)
3259 {
3260  TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
3261 }
3262 
3263 void
3265  const GUID_t& readerid,
3266  const GUID_t& writerid)
3267 {
3268  TransportClient::unregister_for_writer(participant, readerid, writerid);
3269 }
3270 
3271 void
3273  const TransportLocatorSeq& locators)
3274 {
3275  {
3276  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
3277  WriterMapType::const_iterator iter = writers_.find(writerId);
3278  if (iter == writers_.end()) {
3279  return;
3280  }
3281  }
3282  TransportClient::update_locators(writerId, locators);
3283 }
3284 
3287 {
3289 }
3290 
3292 {
3293  bool xcdr1_mutable = false;
3294  bool illegal_unaligned = false;
3295  for (CORBA::ULong i = 0; i < qos_.representation.value.length(); ++i) {
3296  Encoding::Kind encoding_kind;
3297  if (repr_to_encoding_kind(qos_.representation.value[i], encoding_kind)) {
3298  if (encoding_kind == Encoding::KIND_XCDR1 && type_support_->max_extensibility() == MUTABLE) {
3299  xcdr1_mutable = true;
3300  } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR && cdr_encapsulation()) {
3301  illegal_unaligned = true;
3302  } else {
3303  decoding_modes_.insert(encoding_kind);
3304  }
3305  } else if (DCPS_debug_level) {
3306  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: "
3307  "DataReaderImpl::setup_deserialization: "
3308  "Encountered unsupported or unknown data representation: %C\n",
3309  repr_to_string(qos_.representation.value[i]).c_str()));
3310  }
3311  }
3312  if (decoding_modes_.empty()) {
3313  if (DCPS_debug_level) {
3314  DCPS::String error_message;
3315  if (xcdr1_mutable) {
3316  error_message = " Unsupported combination of XCDR1 and mutable";
3317  } else if (illegal_unaligned) {
3318  error_message = " Unaligned CDR is not allowed in rtps_udp transport";
3319  }
3320  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: "
3321  "DataReaderImpl::setup_deserialization: "
3322  "Could not find a valid data representation.%C\n",
3323  error_message.c_str()));
3324  }
3325  return DDS::RETCODE_ERROR;
3326  }
3327  if (DCPS_debug_level >= 2) {
3328  OPENDDS_STRING encodings;
3329  EncodingKinds::iterator it = decoding_modes_.begin();
3330  for (; it != decoding_modes_.end(); ++it) {
3331  if (!encodings.empty()) {
3332  encodings += ", ";
3333  }
3334  encodings += Encoding::kind_to_string(*it);
3335  }
3336  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::setup_deserialization: "
3337  "Setup successfully with the following data representation%C: %C\n",
3338  encodings.size() != 1 ? "s" : "",
3339  encodings.c_str()));
3340  }
3341 
3342  return DDS::RETCODE_OK;
3343 }
3344 
3346  const DataSampleHeader& header,
3347  bool is_new_instance)
3348 {
3349  bool accepted = true;
3350 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3351  bool verify_coherent = false;
3352 #endif
3353  WriterInfo_rch writer;
3354 
3356  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
3357 
3358  WriterMapType::iterator where = writers_.find(header.publication_id_);
3359 
3360  if (where != writers_.end()) {
3361  if (header.coherent_change_) {
3362 
3363 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3364  // Received coherent change
3365  where->second->coherent_change(header.group_coherent_, header.publisher_id_);
3366  verify_coherent = true;
3367 #endif
3368  writer = where->second;
3369  }
3370  }
3371  else {
3373  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::accept_sample_processing - ")
3374  ACE_TEXT("subscription %C failed to find ")
3375  ACE_TEXT("publication data for %C.\n"),
3376  LogGuid(get_guid()).c_str(),
3377  LogGuid(header.publication_id_).c_str()));
3378  }
3379  }
3380 
3381 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3382  if (verify_coherent) {
3383  accepted = verify_coherent_changes_completion(writer.in());
3384  }
3385 #endif
3386 
3387  if (instance && deadline_queue_enabled_) {
3388  instance->last_sample_tv_ = instance->cur_sample_tv_;
3389  instance->cur_sample_tv_.set_to_now();
3390 
3391  if (is_new_instance) {
3392  schedule_deadline(instance, false);
3393  } else {
3394  process_deadline(instance, MonotonicTimePoint::now(), false);
3395  }
3396  }
3397 
3398  if (accepted) {
3399  notify_read_conditions();
3400  }
3401 }
3402 
3403 #if defined(OPENDDS_SECURITY)
3405 {
3406  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
3407  return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
3408 }
3409 #endif
3410 
3412  ACE_thread_t owner,
3413  DataReaderImpl* reader)
3414  : ReactorInterceptor (reactor, owner)
3415  , reader_(*reader)
3416 { }
3417 
3419 { }
3420 
3422 {
3424  execute_or_enqueue(make_rch<ScheduleCommand>(this, ref(info)));
3425 }
3426 
3428 {
3429  info->waiting_for_end_historic_samples(false);
3430  execute_or_enqueue(make_rch<CancelCommand>(this, ref(info)));
3431 }
3432 
3434  const ACE_Time_Value& ,
3435  const void* arg)
3436 {
3437  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
3438 
3439  WriterInfo* const info =
3440  const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
3441  const GUID_t pub_id = info->writer_id();
3442 
3443  {
3444  ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
3445  info_set_.erase(rchandle_from(info));
3446  }
3447 
3449  if (!reader)
3450  return 0;
3451 
3452  if (DCPS_debug_level >= 1) {
3453  ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
3454  LogGuid(reader->get_guid()).c_str(),
3455  LogGuid(pub_id).c_str()));
3456  }
3457 
3458  reader->resume_sample_processing(pub_id);
3459  return 0;
3460 }
3461 
3463 {
3464  static const ACE_Time_Value ten_seconds(10);
3465  info_->schedule_historic_samples_timer(sweeper_, ten_seconds);
3466  const bool insert_result = sweeper_->info_set_.insert(info_).second;
3467 
3468  if (insert_result && DCPS_debug_level) {
3469  ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - sweeper %@ is now scheduled\n", info_.in()));
3470  }
3471 }
3472 
3474 {
3475  info_->cancel_historic_samples_timer(sweeper_);
3476  const bool erase_result = sweeper_->info_set_.erase(info_) > 0;
3477 
3478  if (erase_result && DCPS_debug_level) {
3479  ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - sweeper %@ is no longer scheduled\n", info_.in()));
3480  }
3481 }
3482 
3484 {
3485  populate_connection_info();
3486  const TransportLocatorSeq& trans_conf_info = connection_info();
3487  const GUID_t dp_id_copy = dp_id_;
3488  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3489  disco->update_subscription_locators(domain_id_,
3490  dp_id_copy,
3491  get_guid(),
3492  trans_conf_info);
3493 }
3494 
3496 {
3497  RcHandle<SubscriberImpl> subscriber = subscriber_.lock();
3498  RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
3499  if (!subscriber || !data_reader) {
3500  return;
3501  }
3502 
3503  if (set_reader_status_) {
3504  data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3505  }
3506  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3507 
3508  if (call_) {
3509  sub_listener_->on_data_on_readers(subscriber.in());
3510  }
3511 }
3512 
3514 {
3515  RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
3516 
3517  if (data_reader && set_reader_status_) {
3518  data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3519  }
3520 
3521  if (data_reader && set_subscriber_status_) {
3522  RcHandle<SubscriberImpl> subscriber = data_reader->get_subscriber_servant();
3523  if (subscriber) {
3524  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3525  }
3526  }
3527 
3528  if (call_ && data_reader) {
3529  listener_->on_data_available(data_reader.in());
3530  }
3531 }
3532 
3534 {
3535  // These all start at 1 (0 mask is bogus) and include the full mask (any)
3536  for (CORBA::ULong is = 1; is <= MAX_SAMPLE_STATE_MASK; ++is) {
3537  for (CORBA::ULong iv = 1; iv <= MAX_VIEW_STATE_MASK; ++iv) {
3538  for (CORBA::ULong ii = 1; ii <= MAX_INSTANCE_STATE_MASK; ++ii) {
3539  combined_state_lookup_[to_combined_states(is, iv, ii)] = HandleSet();
3540  }
3541  }
3542  }
3543  // catch-all for "bogus" lookups
3544  combined_state_lookup_[0] = HandleSet();
3545 }
3546 
3547 void DataReaderImpl::update_lookup_maps(const SubscriptionInstanceMapType::iterator& input)
3548 {
3549  for (LookupMap::iterator it = combined_state_lookup_.begin(); it != combined_state_lookup_.end(); ++it) {
3550  if (it->first == 0) continue;
3552  split_combined_states(it->first, sample_states, view_states, instance_states);
3553  if (input->second->matches(sample_states, view_states, instance_states)) {
3554  it->second.insert(input->first);
3555  } else {
3556  it->second.erase(input->first);
3557  }
3558  }
3559 }
3560 
3562 {
3563  for (LookupMap::iterator it = combined_state_lookup_.begin(), the_end = combined_state_lookup_.end(); it != the_end; ++it) {
3564  if (it->first == 0) continue;
3565  it->second.erase(handle);
3566  }
3567 }
3568 
3570 {
3571  const CORBA::ULong combined_states = to_combined_states(sample_states, view_states, instance_states);
3572  LookupMap::const_iterator ci = combined_state_lookup_.find(combined_states);
3573  OPENDDS_ASSERT(ci != combined_state_lookup_.end());
3574  return ci->second;
3575 }
3576 
3578  bool timer_called)
3579 {
3580  // Should be called with sample_lock_.
3581  if (instance->deadline_ == MonotonicTimePoint::zero_value) {
3582  instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
3583  const bool schedule = deadline_queue_.empty();
3584  deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3585  if (!timer_called) {
3586  if (schedule) {
3587  deadline_task_->schedule(deadline_period_);
3588  } else if (deadline_queue_.begin()->second == instance) {
3589  // Moved to front.
3590  deadline_task_->cancel();
3591  deadline_task_->schedule(deadline_period_);
3592  }
3593  }
3594  }
3595 }
3596 
3598 {
3599  // Should be called with sample_lock_.
3600  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3601  for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3602  if (pos->second == instance) {
3603  deadline_queue_.erase(pos);
3604  break;
3605  }
3606  }
3608  }
3609 }
3610 
3612  const MonotonicTimePoint& now,
3613  bool timer_called)
3614 {
3615  // Should be called with sample_lock_.
3616 
3617  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3618  bool missed = false;
3619 
3620  if (instance->cur_sample_tv_.is_zero()) { // not received any sample.
3621  missed = true;
3622 
3623  } else if (timer_called) { // handle_timeout is called
3624  missed = (now - instance->cur_sample_tv_) >= deadline_period_;
3625 
3626  } else { // upon receiving sample.
3627  missed = (instance->cur_sample_tv_ - instance->last_sample_tv_) > deadline_period_;
3628  }
3629 
3630  if (missed) {
3631  ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, sample_lock_);
3632  // Only update the status upon timer is called and not
3633  // when receiving a sample after the interval.
3634  // Otherwise the counter is doubled.
3635  if (timer_called) {
3636  ++requested_deadline_missed_status_.total_count;
3637  requested_deadline_missed_status_.total_count_change =
3638  requested_deadline_missed_status_.total_count - last_deadline_missed_total_count_;
3639  requested_deadline_missed_status_.last_instance_handle = instance->instance_handle_;
3640 
3641  set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
3642 
3643  DDS::DataReaderListener_var listener = listener_for(DDS::REQUESTED_DEADLINE_MISSED_STATUS);
3644 
3645 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
3646  if (instance->instance_state_->is_exclusive()) {
3647  DataReaderImpl::OwnershipManagerPtr owner_manager = ownership_manager();
3648  if (owner_manager)
3649  owner_manager->remove_writers (instance->instance_handle_);
3650  }
3651 #endif
3652 
3653  if (!CORBA::is_nil(listener.in())) {
3654  // Copy before releasing the lock.
3655  DDS::RequestedDeadlineMissedStatus const status = requested_deadline_missed_status_;
3656 
3657  // Release the lock during the upcall.
3658  ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3659  // @todo Will this operation ever throw? If so we may want to
3660  // catch all exceptions, and act accordingly.
3661  listener->on_requested_deadline_missed(this, status);
3662 
3663  // We need to update the last total count value to our current total
3664  // so that the next time we will calculate the correct total_count_change;
3665  last_deadline_missed_total_count_ = requested_deadline_missed_status_.total_count;
3666  }
3667 
3668  notify_status_condition();
3669  }
3670  }
3671 
3672  // This next part is without status_lock_ held to avoid reactor deadlock.
3673  if (timer_called) {
3675  schedule_deadline(instance, timer_called);
3676  } else {
3677  cancel_deadline(instance);
3678  schedule_deadline(instance, timer_called);
3679  }
3680  }
3681 }
3682 
3684 {
3685  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3686  deadline_queue_.clear();
3687  deadline_task_->cancel();
3688 }
3689 
3691 {
3692  if (deadline_period_ != deadline_period) {
3693  deadline_period_ = deadline_period;
3694 
3695  if (deadline_queue_enabled_) {
3696  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
3698  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
3699  iter != this->instances_.end();
3700  ++iter) {
3701  if (iter->second->deadline_ != MonotonicTimePoint::zero_value) {
3702  reschedule_deadline(iter->second, now);
3703  }
3704  }
3705  }
3706  }
3707 }
3708 
3710  const MonotonicTimePoint& now)
3711 {
3712  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3713 
3714  // So the datareader can call back into us.
3715  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3716 
3717  // Remove.
3718  for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3719  if (pos->second == instance) {
3720  deadline_queue_.erase(pos);
3721  break;
3722  }
3723  }
3724 
3725  instance->deadline_ = now + (deadline_period_ - (instance->deadline_ - now));
3726 
3727  const bool schedule = deadline_queue_.empty();
3728  deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3729  if (schedule) {
3730  deadline_task_->schedule(deadline_period_);
3731  } else if (deadline_queue_.begin()->second == instance) {
3732  // Moved to front.
3733  deadline_task_->cancel();
3734  deadline_task_->schedule(deadline_period_);
3735  }
3736  }
3737 }
3738 
3740 {
3741  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
3742 
3743  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3744  for (DeadlineQueue::iterator pos = deadline_queue_.begin(), limit = deadline_queue_.end(); pos != limit && pos->first <= now;) {
3745  SubscriptionInstance_rch instance = pos->second;
3746  deadline_queue_.erase(pos++);
3747  // pos is no longer valid.
3748  process_deadline(instance, now, true);
3749  }
3750 
3751  if (!deadline_queue_.empty()) {
3752  deadline_task_->schedule(deadline_queue_.begin()->first - now);
3753  }
3754 }
3755 
3756 } // namespace DCPS
3757 } // namespace OpenDDS
3758 
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
DataSampleHeader header_
The demarshalled sample header.
bool is_last(const GUID_t &pub)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
void resume_sample_processing(const GUID_t &pub_id)
when done handling historic samples, resume
virtual void transport_assoc_done(int flags, const GUID_t &remote_id)
#define ACE_DEBUG(X)
void return_handle(DDS::InstanceHandle_t handle)
DDS::DataReaderListener_var listener_
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
bool have_sample_states(DDS::SampleStateMask sample_states) const
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
bool verify_coherent_changes_completion(WriterInfo *writer)
virtual void state_updated_i(DDS::InstanceHandle_t handle)=0
unsigned int size() const
Amount of data actually stored.
virtual DDS::ReturnCode_t get_matched_publications(DDS::InstanceHandleSeq &publication_handles)
#define ACE_ERROR(X)
const DDS::StatusMask NO_STATUS_MASK
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(DDS::RequestedIncompatibleQosStatus &status)
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
const StatusKind SAMPLE_LOST_STATUS
const ValueDispatcher * get_value_dispatcher() const
void accept_sample_processing(const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
const LogLevel::Value value
Definition: debug.cpp:61
const StatusKind LIVELINESS_CHANGED_STATUS
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
virtual void reset_latency_stats()
Clear any intermediate statistical values.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
std::string String
void remove_from_lookup_maps(DDS::InstanceHandle_t handle)
char message_id_
The enum MessageId.
virtual void install_type_support(TypeSupportImpl *)
virtual DDS::ReturnCode_t get_subscription_matched_status(DDS::SubscriptionMatchedStatus &status)
void check_liveliness_i(bool cancel, const MonotonicTimePoint &now)
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
BudgetExceededStatus budget_exceeded_status_
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
virtual DDS::DataReaderListener_ptr get_listener()
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
void coherent_changes_completed(DataReaderImpl *reader)
ReliabilityQosPolicy reliability
CORBA::Long total_samples() const
virtual void data_received(const ReceivedDataSample &sample)
process a message that has been received - could be control or a data sample.
void notify_subscription_disconnected(const WriterIdSeq &pubids)
WeakRcHandle< DataReaderImpl > reader_
virtual void on_deleted(DDS::DataWriter_ptr)
Definition: Observer.h:79
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
DDS::TopicDescription_var topic_desc_
DDS::InstanceHandle_t handle() const
Definition: WriterInfo.h:115
DataType minimum() const
Access the minimum value.
Definition: Stats_T.h:239
void state_updated(DDS::InstanceHandle_t handle)
LM_INFO
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
DurabilityQosPolicy durability
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
virtual DDS::Subscriber_ptr get_subscriber()
TransportLocatorSeq remote_data_
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
DDS::SubscriptionMatchedStatus subscription_match_status_
virtual DDS::TopicDescription_ptr get_topicdescription()
void deadline_task(const MonotonicTimePoint &now)
virtual void get_latency_stats(LatencyStatisticsSeq &stats)
unsigned long n() const
Access the number of values accumulated.
Definition: Stats_T.h:257
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void reject_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
HistoryQosPolicyKind kind
bool is_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id)
virtual void signal_liveliness(const GUID_t &remote_participant)
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
#define ACE_WRITE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
sequence< octet > key
TransportMessageBlockAllocator mb_alloc_
void notify_subscription_lost(const WriterIdSeq &pubids)
void set_group_info(const CoherentChangeControl &info)
Definition: WriterInfo.cpp:291
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
std::ostream & raw_data(std::ostream &str) const
Dump any raw data.
void writer_became_alive(WriterInfo &info, const MonotonicTimePoint &when)
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
String to_dds_string(unsigned short to_convert)
virtual void on_disassociated(DDS::DataWriter_ptr, const GUID_t &)
Definition: Observer.h:87
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
Definition: DCPS_Utils.cpp:473
bool time_based_filter_instance(const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:92
void unregister_reader(const char *type_name, DataReaderImpl *reader)
const octet ENTITYKIND_OPENDDS_NIL_WRITER
Definition: DdsDcpsGuid.idl:53
MonotonicTime_t participant_discovered_at_
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
bool have_view_states(DDS::ViewStateMask view_states) const
OwnershipQosPolicy ownership
unique_ptr< Monitor > monitor_
Monitor object for this entity.
const char * c_str() const
void get_writer_states(WriterStatePairVec &writer_states)
CommandPtr execute_or_enqueue(CommandPtr command)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
void schedule_deadline(SubscriptionInstance_rch instance, bool timer_called)
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
LivelinessQosPolicy liveliness
const SampleStateMask ANY_SAMPLE_STATE
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
long absolute_generation_rank
RcHandle< LivelinessTimer > liveliness_timer_
TimePoint_T< SystemClock > SystemTimePoint
Definition: TimeTypes.h:32
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
const ACE_Time_Value & value() const
OwnershipQosPolicyKind kind
Cached_Allocator_With_Overflow< ReceivedDataElementMemoryBlock, ACE_Thread_Mutex > ReceivedDataAllocator
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
unsigned long InstanceStateMask
sequence< TransportLocator > TransportLocatorSeq
std::pair< GUID_t, WriterInfo::WriterState > WriterStatePair
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
void enable_filtering(ContentFilteredTopicImpl *cft)
bool topicIsBIT(const char *name, const char *type)
void set_sample_lost_status(const DDS::SampleLostStatus &status)
int release(void)
void writer_activity(const DataSampleHeader &header)
update liveliness info for this writer.
unique_ptr< ReceivedDataAllocator > rd_allocator_
void disassociate(const GUID_t &peerId)
virtual CORBA::Boolean statistics_enabled()
Collection of latency statistics for a single association.
bool filter_sample(const DataSampleHeader &header)
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
ACE_Reactor_Timer_Interface * get_reactor()
bool waiting_for_end_historic_samples() const
Definition: WriterInfo.h:145
Coherent_State coherent_change_received()
Definition: WriterInfo.cpp:256
DDS::InstanceStateKind instance_state() const
void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
deliver samples that were held by check_historic()
void writer_became_dead(WriterInfo &info)
virtual void release_instance_i(DDS::InstanceHandle_t handle)=0
bool have_instance_states(DDS::InstanceStateMask instance_states) const
const DDS::StatusMask DEFAULT_STATUS_MASK
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind REQUESTED_INCOMPATIBLE_QOS_STATUS
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
void release_instance(DDS::InstanceHandle_t handle)
Release the instance with the handle.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
long long opendds_reserved_publication_seq
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool check_historic(const ReceivedDataSample &sample)
const StatusKind DATA_ON_READERS_STATUS
SequenceNumber sequence_
The data sample&#39;s sequence number.
const char *const BUILT_IN_PUBLICATION_TOPIC
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
#define OPENDDS_STRING
virtual void dispose_unregister(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance)
void instances_liveliness_update(const GUID_t &writer, DDS::InstanceHandle_t publication_handle)
void update_subscription_params(const DDS::StringSeq &params) const
bool contains_sample(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ContentFilteredTopic_ptr get_cf_topic() const
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
int handle_timeout(const ACE_Time_Value &current_time, const void *arg)
void reset_deadline_period(const TimeDuration &deadline_period)
virtual DDS::ReturnCode_t get_requested_deadline_missed_status(DDS::RequestedDeadlineMissedStatus &status)
void reset_ownership(DDS::InstanceHandle_t instance)
LM_DEBUG
GUID_t publisher_id() const
Definition: WriterInfo.h:186
void cancel_deadline(SubscriptionInstance_rch instance)
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
RcHandle< SubscriberImpl > get_subscriber_servant()
SampleRejectedStatusKind last_reason
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
DDS::DynamicType_var dynamic_type_
ACE_Thread_Mutex subscription_id_mutex_
virtual RcHandle< MessageHolder > dds_demarshal(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type, bool full_copy)=0
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch i, size_t index_in_instance)
bool group_coherent() const
Definition: WriterInfo.h:181
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
ACE_Thread_Mutex listener_mutex_
DurabilityQosPolicyKind kind
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
DurabilityQosPolicy durability
const char * get_state_str() const
Definition: WriterInfo.cpp:84
DataReaderListener_ptr get_ext_listener()
sequence< LatencyStatistics > LatencyStatisticsSeq
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
virtual void add_association(const GUID_t &yourId, const WriterAssociation &writer, bool active)
void add_types(const XTypes::TypeLookupService_rch &tls) const
void add(DataType value)
Definition: Stats_T.h:133
ACE_CDR::Boolean Boolean
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool select_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id, const CORBA::Long &ownership_strength, InstanceState_rch instance_state)
Holds a data sample received by the transport.
virtual DDS::DynamicType_ptr get_type() const
DataRepresentationQosPolicy representation
DWORD ACE_thread_t
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
long no_writers_generation_count
const StatusKind DATA_AVAILABLE_STATUS
DDS::ReturnCode_t setup_deserialization()
Setup deserialization options.
void reschedule_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now)
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
const InstanceStateMask ANY_INSTANCE_STATE
WeakRcHandle< SubscriberImpl > subscriber_servant_
virtual DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition)
Implements the DDS::DataReader interface.
long double mean() const
Calculate the average value.
Definition: Stats_T.h:207
LatencyStatistics get_stats() const
Extract the current latency statistics for this writer.
const unsigned long DURATION_ZERO_NSEC
Definition: DdsDcpsCore.idl:76
bool ownership_filter_instance(const SubscriptionInstance_rch &instance, const GUID_t &pubid)
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
virtual OPENDDS_STRING transport_type() const =0
const ViewStateMask ANY_VIEW_STATE
void reset()
Reset statistics to nil.
Definition: Stats_T.h:119
virtual void lookup_instance(const ReceivedDataSample &sample, SubscriptionInstance_rch &instance)=0
void get_instance_handles(InstanceHandleVec &instance_handles)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
int handle_timeout(const ACE_Time_Value &current_time, const void *arg)
RcHandle< DRISporadicTask > deadline_task_
time_t sec(void) const
sequence< GUID_t > WriterIdSeq
DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t &key)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
void enable_multi_topic(MultiTopicImpl *mt)
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
LM_WARNING
void coherent_change_received(const GUID_t &publisher_id, Coherent_State &result)
RepoIdToHandleMap publication_id_to_handle_map_
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)
WriterStats(int amount=0, DataCollector< double >::OnFull type=DataCollector< double >::KeepOldest)
Default constructor.
OPENDDS_STRING conv_
void accept_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
long double var() const
Calculate the variance value.
Definition: Stats_T.h:231
virtual DDS::InstanceHandle_t get_instance_handle()
void get_ordered_data(GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool notify_all()
Unblock all of the threads waiting on this condition.
const char *const name
Definition: debug.cpp:60
unsigned long SampleStateMask
DDS::SampleLostStatus sample_lost_status_
TimeDuration liveliness_lease_duration_
Definition: WriterInfo.h:52
virtual DDS::ReturnCode_t get_qos(DDS::SubscriberQos &qos)
void process_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now, bool timer_called)
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void add_stat(const TimeDuration &delay)
Add a datum to the latency statistics.
ACE_TEXT("TCP_Factory")
HistoryQosPolicy history
virtual DDS::ReadCondition_ptr create_readcondition(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
long disposed_generation_count
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long StatusMask
virtual void on_enabled(DDS::DataWriter_ptr)
Definition: Observer.h:77
void remove_writer(const GUID_t &pub_id)
bool associate(const AssociationData &peer, bool active)
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
WeakRcHandle< DomainParticipantImpl > participant_servant_
TransportPriorityQosPolicy transport_priority
const ReturnCode_t RETCODE_NOT_ENABLED
ACE_Recursive_Thread_Mutex statistics_lock_
void add_coherent_samples(const SequenceNumber &seq)
Definition: WriterInfo.cpp:165
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
suseconds_t usec(void) const
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
void process_latency(const ReceivedDataSample &sample)
virtual DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t &max_wait)
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void reset_stats()
Reset the latency statistics for this writer.
ReliabilityQosPolicy reliability
virtual RcHandle< EntityImpl > parent() const
virtual bool check_transport_qos(const TransportInst &inst)
TypeSupportImpl * type_support_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
virtual DDS::ReturnCode_t get_liveliness_changed_status(DDS::LivelinessChangedStatus &status)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Implements the DDS::TopicDescription interface.
void set_subscriber_qos(const DDS::SubscriberQos &qos)
const long DURATION_ZERO_SEC
Definition: DdsDcpsCore.idl:75
String str(unsigned decimal_places=3, bool just_sec=false) const
EndHistoricSamplesMissedSweeper(ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *reader)
Sequence number abstraction. Only allows positive 64 bit values.
void to_type_info(XTypes::TypeInformation &type_info) const
virtual DDS::QueryCondition_ptr create_querycondition(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
virtual void remove_associations(const WriterIdSeq &writers, bool callback)
virtual DDS::ReturnCode_t get_sample_rejected_status(DDS::SampleRejectedStatus &status)
ACE_Reactor_Timer_Interface * reactor_
DDS::SampleRejectedStatus sample_rejected_status_
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const ReturnCode_t RETCODE_ERROR
void writer_removed(WriterInfo &info)
void notify_latency(GUID_t writer)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual DDS::ReturnCode_t enable()
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
virtual DDS::ReturnCode_t get_matched_publication_data(DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
const ReturnCode_t RETCODE_OK
virtual DDS::ReturnCode_t enable_specific()=0
Stats< double > stats_
Latency statistics for the DataWriter to this DataReader.
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_associated(DDS::DataWriter_ptr, const GUID_t &)
Definition: Observer.h:85
virtual void on_qos_changed(DDS::DataWriter_ptr)
Definition: Observer.h:81
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const char * to_string(MessageId value)
void received_activity(const MonotonicTimePoint &when)
called when a sample or other activity is received from this writer.
Definition: WriterInfo.h:260
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)
bool has_readcondition(DDS::ReadCondition_ptr a_condition)
ACE_Recursive_Thread_Mutex publication_handle_lock_
virtual DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus &status)
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
bool check_end_historic_samples(EndHistoricSamplesMissedSweeper *sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&to_deliver)
Definition: WriterInfo.cpp:121
const long LENGTH_UNLIMITED
#define ACE_ERROR_RETURN(X, Y)
Security::SecurityConfig_rch security_config_
RcHandle< T > lock() const
Definition: RcObject.h:188
ACE_HANDLE dup(ACE_HANDLE handle)
virtual DDS::ReturnCode_t get_qos(DDS::DataReaderQos &qos)
const StatusKind SUBSCRIPTION_MATCHED_STATUS
GUID_t writer_id() const
Definition: WriterInfo.h:127
const character_type * in(void) const
DataRepresentationIdSeq value
unsigned long StatusKind
const StatusKind REQUESTED_DEADLINE_MISSED_STATUS
OwnershipManagerPtr ownership_manager()
DDS::DataReaderQos passed_qos_
void remove_writers(const DDS::InstanceHandle_t &instance_handle)
DeadlineQosPolicy deadline
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export GUID_t bit_key_to_guid(const DDS::BuiltinTopicKey_t &key)
Definition: GuidUtils.h:251
virtual void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
DDS::LivelinessChangedStatus liveliness_changed_status_
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex instances_lock_
Keeps track of a DataWriter&#39;s liveliness for a DataReader.
Definition: WriterInfo.h:81
Elements stored for managing statistical data.
TopicDescriptionPtr< MultiTopicImpl > multi_topic_
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
const InstanceState_rch instance_state_
Instance state for this instance.
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void purge_data(SubscriptionInstance_rch instance)=0
unsigned long ViewStateMask
void set_sample_rejected_status(const DDS::SampleRejectedStatus &status)
ACE_Thread_Mutex content_filtered_topic_mutex_
void notify_subscription_reconnected(const WriterIdSeq &pubids)
void reset_coherent_info(const GUID_t &writer_id, const GUID_t &publisher_id)
TopicDescriptionPtr< TopicImpl > topic_servant_
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
virtual DDS::ReturnCode_t set_qos(const DDS::DataReaderQos &qos)
virtual bool is_reliable() const =0
Does the transport as configured support RELIABLE_RELIABILITY_QOS?
virtual DDS::ReturnCode_t delete_contained_entities()
ResourceLimitsQosPolicy resource_limits
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
Boolean is_nil(T x)
CORBA::Long last_deadline_missed_total_count_
DataType maximum() const
Access the maximum value.
Definition: Stats_T.h:248
const StatusKind SAMPLE_REJECTED_STATUS
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
#define ACE_INT32_MAX
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
WriterState state() const
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
Definition: WriterInfo.h:103
virtual void qos_change(const DDS::DataReaderQos &qos)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
StatsMapType statistics_
Statistics for this reader, collected for each writer.
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)