OpenDDS  Snapshot(2023/04/28-20:55)
SubscriberImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
8 #include "debug.h"
9 #include "SubscriberImpl.h"
11 #include "DomainParticipantImpl.h"
12 #include "Qos_Helper.h"
13 #include "GuidConverter.h"
14 #include "BuiltInTopicUtils.h"
15 #include "TopicImpl.h"
16 #include "MonitorFactory.h"
17 #include "DataReaderImpl.h"
18 #include "Service_Participant.h"
19 #include "TopicDescriptionImpl.h"
20 #include "Marked_Default_Qos.h"
21 #include "Transient_Kludge.h"
23 #include "MultiTopicImpl.h"
24 #include "GroupRakeData.h"
26 #include "Util.h"
29 #include "DCPS_Utils.h"
30 #include "PoolAllocator.h"
31 
32 #include <dds/DdsDcpsTypeSupportExtC.h>
33 
34 #include <stdexcept>
35 
37 
38 namespace OpenDDS {
39 namespace DCPS {
40 
42  const DDS::SubscriberQos & qos,
43  DDS::SubscriberListener_ptr a_listener,
44  const DDS::StatusMask& mask,
45  DomainParticipantImpl* participant)
46  : handle_(handle),
47  qos_(qos),
48  default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
49  listener_mask_(mask),
50  participant_(*participant),
51  domain_id_(participant->get_domain_id()),
52  raw_latency_buffer_size_(0),
53  raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
54  access_depth_ (0)
55 {
56  //Note: OK to duplicate a nil.
57  listener_ = DDS::SubscriberListener::_duplicate(a_listener);
58 
59  monitor_.reset(TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this));
60 }
61 
63 {
64  const RcHandle<DomainParticipantImpl> participant = participant_.lock();
65  if (participant) {
66  participant->return_handle(handle_);
67  }
68 
69  // The datareaders should be deleted already before calling delete
70  // subscriber.
71  String leftover_entities;
72  if (!is_clean(&leftover_entities)) {
74  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: SubscriberImpl::~SubscriberImpl: "
75  "%C still exist\n", leftover_entities.c_str()));
76  }
77  }
78 }
79 
82 {
83  return handle_;
84 }
85 
86 bool
88 {
90  guard,
91  this->si_lock_,
92  false);
93 
94  for (DataReaderMap::iterator it(datareader_map_.begin());
95  it != datareader_map_.end(); ++it) {
96  if (a_handle == it->second->get_instance_handle()) {
97  return true;
98  }
99  }
100 
101  return false;
102 }
103 
104 DDS::DataReader_ptr
106  DDS::TopicDescription_ptr a_topic_desc,
107  const DDS::DataReaderQos & qos,
108  DDS::DataReaderListener_ptr a_listener,
109  DDS::StatusMask mask)
110 {
111  if (CORBA::is_nil(a_topic_desc)) {
112  if (DCPS_debug_level > 0) {
114  ACE_TEXT("(%P|%t) ERROR: ")
115  ACE_TEXT("SubscriberImpl::create_datareader, ")
116  ACE_TEXT("topic desc is nil.\n")));
117  }
118  return DDS::DataReader::_nil();
119  }
120 
121  DDS::DataReaderQos dr_qos;
122  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
123  if (!participant)
124  return DDS::DataReader::_nil();
125 
126  TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
127 
128 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
129  ContentFilteredTopicImpl* cft = 0;
130 #endif
131 #ifndef OPENDDS_NO_MULTI_TOPIC
132  MultiTopicImpl* mt = 0;
133 #else
134  bool mt = false;
135 #endif
136 
137  if (!topic_servant) {
138 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
139  cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
140  if (cft) {
141  DDS::Topic_var related;
142  related = cft->get_related_topic();
143  topic_servant = dynamic_cast<TopicImpl*>(related.in());
144  }
145  else
146 #endif
147  {
148 #ifndef OPENDDS_NO_MULTI_TOPIC
149  mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
150 #endif
151  }
152  }
153 
154  if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
155  return DDS::DataReader::_nil();
156 
157 #ifndef OPENDDS_NO_MULTI_TOPIC
158  if (mt) {
159  try {
160  DDS::DataReader_var dr =
161  mt->get_type_support()->create_multitopic_datareader();
163  dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
164  mtdr->init(dr_qos, a_listener, mask, this, mt);
166  if (dr->enable() != DDS::RETCODE_OK) {
167  if (DCPS_debug_level > 0) {
169  ACE_TEXT("(%P|%t) ERROR: ")
170  ACE_TEXT("SubscriberImpl::create_datareader, ")
171  ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
172  }
173  return DDS::DataReader::_nil();
174  }
176  }
177  return dr._retn();
178  } catch (const std::exception& e) {
179  if (DCPS_debug_level > 0) {
181  ACE_TEXT("(%P|%t) ERROR: ")
182  ACE_TEXT("SubscriberImpl::create_datareader, ")
183  ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
184  e.what()));
185  }
186  }
187  return DDS::DataReader::_nil();
188  }
189 #endif
190 
191  OpenDDS::DCPS::TypeSupport_ptr typesupport =
192  topic_servant->get_type_support();
193 
194  if (0 == typesupport) {
195  CORBA::String_var name = a_topic_desc->get_name();
196  if (DCPS_debug_level > 0) {
198  ACE_TEXT("(%P|%t) ERROR: ")
199  ACE_TEXT("SubscriberImpl::create_datareader, ")
200  ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
201  name.in()));
202  }
203  return DDS::DataReader::_nil();
204  }
205 
206  DDS::DataReader_var dr_obj = typesupport->create_datareader();
207 
208  DataReaderImpl* dr_servant =
209  dynamic_cast<DataReaderImpl*>(dr_obj.in());
210 
211  if (dr_servant == 0) {
212  if (DCPS_debug_level > 0) {
214  ACE_TEXT("(%P|%t) ERROR: ")
215  ACE_TEXT("SubscriberImpl::create_datareader, ")
216  ACE_TEXT("servant is nil.\n")));
217  }
218  return DDS::DataReader::_nil();
219  }
220 
221 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
222  if (cft) {
223  dr_servant->enable_filtering(cft);
224  }
225 #endif
226 
227  // Propagate the latency buffer data collection configuration.
228  // @TODO: Determine whether we want to exclude the Builtin Topic
229  // readers from data gathering.
232 
233 
234  dr_servant->init(topic_servant,
235  dr_qos,
236  a_listener,
237  mask,
238  participant.in(),
239  this);
240 
242  const DDS::ReturnCode_t ret = dr_servant->enable();
243 
244  if (ret != DDS::RETCODE_OK) {
245  if (DCPS_debug_level > 0) {
247  ACE_TEXT("(%P|%t) WARNING: ")
248  ACE_TEXT("SubscriberImpl::create_datareader, ")
249  ACE_TEXT("enable failed.\n")));
250  }
251  return DDS::DataReader::_nil();
252  }
253  } else {
255  readers_not_enabled_.insert(rchandle_from(dr_servant));
256  }
257 
258  // add created data reader to this' data reader container -
259  // done in enable_reader
260  return DDS::DataReader::_duplicate(dr_obj.in());
261 }
262 
264 SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
265 {
266  DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
267 
268  DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
269 
270  if (dr_servant) { // for MultiTopic this will be false
271  const char* reason = " (ERROR: unknown reason)";
273  RcHandle<SubscriberImpl> dr_subscriber = dr_servant->get_subscriber_servant();
274  if (dr_subscriber.get() != this) {
275  reason = "doesn't belong to this subscriber.";
277  } else if (dr_servant->has_zero_copies()) {
278  reason = "has outstanding zero-copy samples loaned out.";
280  } else if (!dr_servant->read_conditions_.empty()) {
281  reason = "has read conditions attached.";
283  }
284  if (rc != DDS::RETCODE_OK) {
285  if (log_level >= LogLevel::Notice) {
286  DDS::TopicDescription_var topic = a_datareader->get_topicdescription();
287  CORBA::String_var topic_name = topic->get_name();
288  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: SubscriberImpl::delete_datareader: "
289  "on reader %C (topic \"%C\") will return \"%C\" because it %C\n",
290  LogGuid(dr_servant->get_id()).c_str(), topic_name.in(),
291  retcode_to_string(rc), reason));
292  }
293  return rc;
294  }
295 
296  // marks entity as deleted and stops future associating
297  dr_servant->prepare_to_delete();
298  }
299 
300  {
302  si_guard,
303  this->si_lock_,
305 
306  DataReaderMap::iterator it;
307  for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
308  if (it->second == dr_servant) {
309  break;
310  }
311  }
312 
313  if (it == datareader_map_.end()) {
314  DDS::TopicDescription_var td = a_datareader->get_topicdescription();
315  CORBA::String_var topic_name = td->get_name();
316 #ifndef OPENDDS_NO_MULTI_TOPIC
317  MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name.in());
318  if (mt_iter != multitopic_reader_map_.end()) {
319  DDS::DataReader_ptr ptr = mt_iter->second;
320  MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
321  if (!mtdrb) {
322  if (DCPS_debug_level > 0) {
324  ACE_TEXT("(%P|%t) ERROR: ")
325  ACE_TEXT("SubscriberImpl::delete_datareader: ")
326  ACE_TEXT("datareader(topic_name=%C)")
327  ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
328  topic_name.in()));
329  }
331  }
332  mtdrb->cleanup();
333  multitopic_reader_map_.erase(mt_iter);
334  return DDS::RETCODE_OK;
335  }
336 #endif
337  if (!dr_servant) {
338  if (DCPS_debug_level > 0) {
340  ACE_TEXT("(%P|%t) ERROR: ")
341  ACE_TEXT("SubscriberImpl::delete_datareader: ")
342  ACE_TEXT("datareader(topic_name=%C)")
343  ACE_TEXT("for unknown repo id not found.\n"),
344  topic_name.in()));
345  }
347  }
348  if (DCPS_debug_level > 0) {
349  GUID_t id = dr_servant->get_guid();
351  ACE_TEXT("(%P|%t) ERROR: ")
352  ACE_TEXT("SubscriberImpl::delete_datareader: ")
353  ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
354  topic_name.in(),
355  LogGuid(id).c_str()));
356  }
358  }
359 
360  datareader_map_.erase(it);
361 
363  dr_set_guard,
364  this->dr_set_lock_,
366  datareader_set_.erase(dr_servant);
367  }
368 
369  if (this->monitor_) {
370  this->monitor_->report();
371  }
372 
373  const GUID_t subscription_id = dr_servant->get_guid();
374  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
375  if (!disco->remove_subscription(this->domain_id_,
376  this->dp_id_,
377  subscription_id)) {
378  if (DCPS_debug_level > 0) {
380  ACE_TEXT("(%P|%t) ERROR: ")
381  ACE_TEXT("SubscriberImpl::delete_datareader: ")
382  ACE_TEXT(" could not remove subscription from discovery.\n")));
383  }
385  }
386 
387  // Call remove association before unregistering the datareader from the transport,
388  // otherwise some callbacks resulted from remove_association may be lost.
389  dr_servant->remove_all_associations();
390  dr_servant->cleanup();
391  return DDS::RETCODE_OK;
392 }
393 
396 {
397  // mark that the entity is being deleted
398  set_deleted(true);
399 
401 
402 #ifndef OPENDDS_NO_MULTI_TOPIC
403  {
405  guard,
406  this->si_lock_,
408  for (MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.begin();
409  mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
410  drs.push_back(mt_iter->second);
411  }
412  }
413 
414  for (size_t i = 0; i < drs.size(); ++i) {
415  DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
416  if (ret == DDS::RETCODE_OK) {
417  ret = delete_datareader(drs[i]);
418  }
419  if (ret != DDS::RETCODE_OK) {
420  if (DCPS_debug_level > 0) {
422  ACE_TEXT("(%P|%t) ERROR: ")
423  ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
424  ACE_TEXT("failed to delete datareader\n")));
425  }
426  return ret;
427  }
428  }
429  drs.clear();
430 #endif
431 
432  {
434  guard,
435  this->si_lock_,
437  DataReaderMap::iterator it;
438  DataReaderMap::iterator itEnd = datareader_map_.end();
439 
440  for (it = datareader_map_.begin(); it != itEnd; ++it) {
441  drs.push_back(it->second.in());
442  }
443  }
444 
445  for (size_t i = 0; i < drs.size(); ++i) {
446  DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
447  if (ret == DDS::RETCODE_OK) {
448  ret = delete_datareader(drs[i]);
449  }
450  if (ret != DDS::RETCODE_OK) {
451  if (DCPS_debug_level > 0) {
453  ACE_TEXT("(%P|%t) ERROR: ")
454  ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
455  ACE_TEXT("failed to delete datareader\n")));
456  }
457  return ret;
458  }
459  }
460 
461  // the subscriber can now start creating new publications
462  set_deleted(false);
463 
464  return DDS::RETCODE_OK;
465 }
466 
467 DDS::DataReader_ptr
469  const char * topic_name)
470 {
472  guard,
473  this->si_lock_,
474  DDS::DataReader::_nil());
475 
476  // If multiple entries whose key is "topic_name" then which one is
477  // returned ? Spec does not limit which one should give.
478  DataReaderMap::iterator it = datareader_map_.find(topic_name);
479 
480  if (it == datareader_map_.end()) {
481 #ifndef OPENDDS_NO_MULTI_TOPIC
482  MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name);
483  if (mt_iter != multitopic_reader_map_.end()) {
484  return DDS::DataReader::_duplicate(mt_iter->second);
485  }
486 #endif
487 
488  if (DCPS_debug_level >= 2) {
490  ACE_TEXT("(%P|%t) ")
491  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
492  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
493  topic_name));
494  }
495 
496  return DDS::DataReader::_nil();
497 
498  } else {
499  return DDS::DataReader::_duplicate(it->second.in());
500  }
501 }
502 
505  DDS::DataReaderSeq & readers,
509 {
510  DataReaderSet localreaders;
511  {
513  guard,
514  this->dr_set_lock_,
516  localreaders = datareader_set_;
517  }
518 
519 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
520  // If access_scope is GROUP and ordered_access is true then return readers as
521  // list which may contain same readers multiple times. Otherwise return readers
522  // as set.
524  if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
526  }
527  if (this->qos_.presentation.ordered_access) {
528 
529  GroupRakeData data;
530  for (DataReaderSet::const_iterator pos = localreaders.begin();
531  pos != localreaders.end(); ++pos) {
532  (*pos)->get_ordered_data(data, sample_states, view_states, instance_states);
533  }
534 
535  // Return list of readers in the order of the source timestamp of the received
536  // samples from readers.
537  data.get_datareaders(readers);
538  return DDS::RETCODE_OK;
539  }
540  }
541 #endif
542 
543  // Return set of datareaders.
544  readers.length(0);
545  for (DataReaderSet::const_iterator pos = localreaders.begin();
546  pos != localreaders.end(); ++pos) {
547  if ((*pos)->have_sample_states(sample_states) &&
548  (*pos)->have_view_states(view_states) &&
549  (*pos)->have_instance_states(instance_states)) {
550  push_back(readers, DDS::DataReader::_duplicate(pos->in()));
551  }
552  }
553 
554  return DDS::RETCODE_OK;
555 }
556 
559 {
560  DataReaderMap localreadermap;
561  {
563  guard,
564  this->si_lock_,
566  localreadermap = datareader_map_;
567  }
568  for (DataReaderMap::iterator it = localreadermap.begin(); it != localreadermap.end(); ++it) {
569  if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
570  DDS::DataReaderListener_var listener = it->second->get_listener();
571  if (!it->second->is_bit()) {
572  it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
573  if (listener) {
574  listener->on_data_available(it->second.in());
575  }
576  } else {
577  TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(listener, it->second, listener, true, false));
578  }
579  }
580  }
581 
582 #ifndef OPENDDS_NO_MULTI_TOPIC
583  MultitopicReaderMap localmtr;
584  {
586  guard,
587  this->si_lock_,
589  localmtr = multitopic_reader_map_;
590  }
591 
592  for (MultitopicReaderMap::iterator it = localmtr.begin();
593  it != localmtr.end(); ++it) {
595  dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
596 
597  if (!dri) {
598  if (DCPS_debug_level > 0) {
600  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
601  ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")));
602  }
604  }
605 
607  DDS::DataReaderListener_var listener = dri->get_listener();
609  if (!CORBA::is_nil(listener)) {
610  listener->on_data_available(dri);
611  }
612  }
613  }
614 #endif
615 
616  return DDS::RETCODE_OK;
617 }
618 
621  const DDS::SubscriberQos & qos)
622 {
623 
625 
626  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
627  if (qos_ == qos)
628  return DDS::RETCODE_OK;
629 
630  // for the not changeable qos, it can be changed before enable
631  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
633 
634  } else {
635  qos_ = qos;
636 
637  DrIdToQosMap idToQosMap;
638  {
640  guard,
641  this->si_lock_,
643  // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
644  DataReaderMap::iterator endIter = datareader_map_.end();
645 
646  for (DataReaderMap::iterator iter = datareader_map_.begin();
647  iter != endIter; ++iter) {
648  DataReaderImpl_rch reader = iter->second;
649  reader->set_subscriber_qos (qos);
650  DDS::DataReaderQos qos = reader->qos_;
651  GUID_t id = reader->get_guid();
652  std::pair<DrIdToQosMap::iterator, bool> pair
653  = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
654 
655  if (!pair.second) {
656  if (DCPS_debug_level > 0) {
658  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
659  ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
660  LogGuid(id).c_str()));
661  }
663  }
664  }
665  }
666 
667  DrIdToQosMap::iterator iter = idToQosMap.begin();
668 
669  while (iter != idToQosMap.end()) {
670  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
671  const bool status
672  = disco->update_subscription_qos(this->domain_id_,
673  this->dp_id_,
674  iter->first,
675  iter->second,
676  this->qos_);
677 
678  if (!status) {
679  if (DCPS_debug_level > 0) {
681  ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
682  ACE_TEXT("failed.\n")));
683  }
684  return DDS::RETCODE_ERROR;
685  }
686 
687  ++iter;
688  }
689  }
690 
691  return DDS::RETCODE_OK;
692 
693  } else {
695  }
696 }
697 
700  DDS::SubscriberQos & qos)
701 {
702  qos = qos_;
703  return DDS::RETCODE_OK;
704 }
705 
708  DDS::SubscriberListener_ptr a_listener,
709  DDS::StatusMask mask)
710 {
712  listener_mask_ = mask;
713  //note: OK to duplicate a nil object ref
714  listener_ = DDS::SubscriberListener::_duplicate(a_listener);
715  return DDS::RETCODE_OK;
716 }
717 
718 DDS::SubscriberListener_ptr
720 {
722  return DDS::SubscriberListener::_duplicate(listener_.in());
723 }
724 
725 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
726 
729 {
730  DataReaderSet to_call;
731  {
733  si_guard,
734  si_lock_,
736  if (!enabled_) {
737  if (DCPS_debug_level > 0) {
739  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
740  ACE_TEXT(" Subscriber is not enabled!\n")));
741  }
743  }
744 
746  return DDS::RETCODE_OK;
747  }
748 
749  ++access_depth_;
750  // We should only notify subscription on the first
751  // and last change to the current change set:
752  if (access_depth_ == 1) {
754  dr_set_guard,
755  dr_set_lock_,
757  to_call = datareader_set_;
758  }
759  }
760 
761  for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
762  (*it)->begin_access();
763  }
764  return DDS::RETCODE_OK;
765 }
766 
769 {
770  DataReaderSet to_call;
771  {
773  si_guard,
774  si_lock_,
776  if (!enabled_) {
777  if (DCPS_debug_level > 0) {
779  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
780  ACE_TEXT(" Publisher is not enabled!\n")));
781  }
783  }
784 
786  return DDS::RETCODE_OK;
787  }
788 
789  if (access_depth_ == 0) {
790  if (DCPS_debug_level > 0) {
792  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
793  ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
794  }
796  }
797 
798  --access_depth_;
799  // We should only notify subscription on the first
800  // and last change to the current change set:
801  if (access_depth_ == 0) {
803  dr_set_guard,
804  dr_set_lock_,
806  to_call = datareader_set_;
807  }
808  }
809 
810  for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
811  (*it)->end_access();
812  }
813  return DDS::RETCODE_OK;
814 }
815 
816 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
817 
818 DDS::DomainParticipant_ptr
820 {
821  return participant_.lock()._retn();
822 }
823 
826  const DDS::DataReaderQos & qos)
827 {
828  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
830  return DDS::RETCODE_OK;
831 
832  } else {
834  }
835 }
836 
839  DDS::DataReaderQos & qos)
840 {
842  return DDS::RETCODE_OK;
843 }
844 
847  DDS::DataReaderQos & a_datareader_qos,
848  const DDS::TopicQos & a_topic_qos)
849 {
850  if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
851  return DDS::RETCODE_OK;
852 
853  } else {
855  }
856 }
857 
860 {
861  //According spec:
862  // - Calling enable on an already enabled Entity returns OK and has no
863  // effect.
864  // - Calling enable on an Entity whose factory is not enabled will fail
865  // and return PRECONDITION_NOT_MET.
866 
867  if (this->is_enabled()) {
868  return DDS::RETCODE_OK;
869  }
870 
871  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
872  if (!participant || !participant->is_enabled()) {
874  }
875 
876  dp_id_ = participant->get_id();
877 
878  if (this->monitor_) {
879  this->monitor_->report();
880  }
881 
882  this->set_enabled();
883 
885  DataReaderSet readers;
886  {
888  readers_not_enabled_.swap(readers);
889  }
890  for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
891  (*it)->enable();
892  }
893  }
894 
895  return DDS::RETCODE_OK;
896 }
897 
898 bool SubscriberImpl::is_clean(String* leftover_entities) const
899 {
900  if (leftover_entities) {
901  leftover_entities->clear();
902  }
903 
904  size_t reader_count = datareader_map_.size();
905  if (reader_count && !TheTransientKludge->is_enabled()) {
906  // BIT datareaders.
907  reader_count = reader_count == NUMBER_OF_BUILT_IN_TOPICS ? 0 : reader_count;
908  }
909  if (leftover_entities && reader_count) {
910  *leftover_entities += to_dds_string(reader_count) + " reader(s)";
911  }
912 
913  return reader_count == 0;
914 }
915 
916 void
918 {
920  guard,
921  this->dr_set_lock_);
922  datareader_set_.insert(rchandle_from(reader));
923 }
924 
926 SubscriberImpl::reader_enabled(const char* topic_name,
927  DataReaderImpl* reader_ptr)
928 {
929  if (DCPS_debug_level >= 4) {
931  ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
932  ACE_TEXT("datareader(topic_name=%C) enabled\n"),
933  topic_name));
934  }
935 
937  DataReaderImpl_rch reader = rchandle_from(reader_ptr);
938  readers_not_enabled_.erase(reader);
939 
940  this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
941 
942  if (this->monitor_) {
943  this->monitor_->report();
944  }
945 
946  return DDS::RETCODE_OK;
947 }
948 
949 #ifndef OPENDDS_NO_MULTI_TOPIC
951 SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
952 {
953  DDS::TopicDescription_var td = reader->get_topicdescription();
954  CORBA::String_var topic = td->get_name();
955  multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
956  return DDS::RETCODE_OK;
957 }
958 
959 void
961 {
963  datareader_set_.erase(rchandle_from(reader));
964 }
965 #endif
966 
967 DDS::SubscriberListener_ptr
969 {
970  // per 2.1.4.3.1 Listener Access to Plain Communication Status
971  // use this entities factory if listener is mask not enabled
972  // for this kind.
973  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
974  if (! participant)
975  return 0;
976 
978  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
979  g.release();
980  return participant->listener_for(kind);
981 
982  } else {
983  return DDS::SubscriberListener::_duplicate(listener_.in());
984  }
985 }
986 
987 unsigned int&
989 {
990  return this->raw_latency_buffer_size_;
991 }
992 
995 {
996  return this->raw_latency_buffer_type_;
997 }
998 
999 void
1001 {
1003  guard,
1004  this->si_lock_,
1005  );
1006 
1007  subs.reserve(datareader_map_.size());
1008  for (DataReaderMap::iterator iter = datareader_map_.begin();
1009  iter != datareader_map_.end();
1010  ++iter) {
1011  subs.push_back(iter->second->get_guid());
1012  }
1013 }
1014 
1015 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1016 void
1018  const CORBA::Long& ownership_strength)
1019 {
1021  guard,
1022  this->si_lock_,
1023  );
1024 
1025  for (DataReaderMap::iterator iter = datareader_map_.begin();
1026  iter != datareader_map_.end();
1027  ++iter) {
1028  if (!iter->second->is_bit()) {
1029  iter->second->update_ownership_strength(pub_id, ownership_strength);
1030  }
1031  }
1032 }
1033 #endif
1034 
1035 
1036 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1037 void
1039  DataReaderImpl* reader,
1040  Coherent_State& group_state)
1041 {
1042  DataReaderSet localdrs;
1043  {
1045  guard,
1046  this->dr_set_lock_);
1047  localdrs = datareader_set_;
1048  }
1049  // Verify if all readers complete the coherent changes. The result
1050  // is either COMPLETED or REJECTED.
1051  group_state = COMPLETED;
1052  for (DataReaderSet::const_iterator iter = localdrs.begin();
1053  iter != localdrs.end(); ++iter) {
1054 
1056  (*iter)->coherent_change_received (publisher_id, state);
1057  if (state == NOT_COMPLETED_YET) {
1058  group_state = NOT_COMPLETED_YET;
1059  return;
1060  }
1061  else if (state == REJECTED) {
1062  group_state = REJECTED;
1063  }
1064  }
1065 
1066  GUID_t writerId = GUID_UNKNOWN;
1067  for (DataReaderSet::const_iterator iter = localdrs.begin();
1068  iter != localdrs.end(); ++iter) {
1069  if (group_state == COMPLETED) {
1070  (*iter)->accept_coherent (writerId, publisher_id);
1071  }
1072  else { //REJECTED
1073  (*iter)->reject_coherent (writerId, publisher_id);
1074  }
1075  }
1076 
1077  if (group_state == COMPLETED) {
1078  for (DataReaderSet::const_iterator iter = localdrs.begin();
1079  iter != localdrs.end(); ++iter) {
1080  (*iter)->coherent_changes_completed (reader);
1081  (*iter)->reset_coherent_info (writerId, publisher_id);
1082  }
1083  }
1084 }
1085 #endif
1086 
1089 {
1090  return this->participant_.lock();
1091 }
1092 
1093 bool
1095  const DDS::DataReaderQos & default_qos,
1096  DDS::Topic_ptr a_topic,
1097  DDS::DataReaderQos & dr_qos,
1098  bool mt)
1099 {
1100 
1101 
1102  if (qos == DATAREADER_QOS_DEFAULT) {
1103  dr_qos = default_qos;
1104 
1105  } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
1106 
1107 #ifndef OPENDDS_NO_MULTI_TOPIC
1108  if (mt) {
1109  if (DCPS_debug_level > 0) {
1110  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1111  ACE_TEXT("SubscriberImpl::create_datareader, ")
1112  ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
1113  ACE_TEXT("to create a MultiTopic DataReader.\n")));
1114  }
1115  return false;
1116  }
1117 #else
1118  ACE_UNUSED_ARG(mt);
1119 #endif
1120  DDS::TopicQos topic_qos;
1121  a_topic->get_qos(topic_qos);
1122 
1123  dr_qos = default_qos;
1124 
1126  topic_qos);
1127 
1128  } else {
1129  dr_qos = qos;
1130  }
1131 
1135 
1136  if (!Qos_Helper::valid(dr_qos)) {
1137  if (DCPS_debug_level > 0) {
1139  ACE_TEXT("(%P|%t) ERROR: ")
1140  ACE_TEXT("SubscriberImpl::create_datareader, ")
1141  ACE_TEXT("invalid qos.\n")));
1142  }
1143  return false;
1144  }
1145 
1146  if (!Qos_Helper::consistent(dr_qos)) {
1147  if (DCPS_debug_level > 0) {
1149  ACE_TEXT("(%P|%t) ERROR: ")
1150  ACE_TEXT("SubscriberImpl::create_datareader, ")
1151  ACE_TEXT("inconsistent qos.\n")));
1152  }
1153  return false;
1154  }
1155 
1156  return true;
1157 }
1158 
1159 
1160 } // namespace DCPS
1161 } // namespace OpenDDS
1162 
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
void set_status_changed_flag(DDS::StatusKind status, bool flag)
DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
void get_datareaders(DDS::DataReaderSeq &readers)
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
std::string String
virtual DDS::ReturnCode_t delete_datareader(DDS::DataReader_ptr a_datareader)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
bool have_sample_states(DDS::SampleStateMask sample_states) const
virtual DDS::ReturnCode_t get_datareaders(DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
SubscriberImpl(DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
virtual DDS::ReturnCode_t begin_access()
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
String to_dds_string(unsigned short to_convert)
virtual DDS::ReturnCode_t enable()
virtual DDS::InstanceHandle_t get_instance_handle()
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
PresentationQosPolicyAccessScopeKind access_scope
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
virtual DDS::ReturnCode_t notify_datareaders()
const char * c_str() const
virtual DDS::ReturnCode_t set_qos(const DDS::SubscriberQos &qos)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
DDS::ReturnCode_t reader_enabled(const char *topic_name, DataReaderImpl *reader)
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.
DDS::DataReaderQos default_datareader_qos_
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
unsigned long InstanceStateMask
EntityFactoryQosPolicy entity_factory
void enable_filtering(ContentFilteredTopicImpl *cft)
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
int release(void)
virtual DDS::ReturnCode_t copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
typedef OPENDDS_VECTOR(GUID_t) SubscriptionIdVec
virtual DDS::ReturnCode_t get_default_datareader_qos(DDS::DataReaderQos &qos)
#define DATAREADER_QOS_DEFAULT
virtual DDS::ReturnCode_t delete_contained_entities()
void init(const DDS::DataReaderQos &dr_qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask, SubscriberImpl *parent, MultiTopicImpl *multitopic)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
LM_DEBUG
#define DATAREADER_QOS_USE_TOPIC_QOS
virtual DDS::ReturnCode_t set_listener(DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataReader_ptr lookup_datareader(const char *topic_name)
void get_subscription_ids(SubscriptionIdVec &subs)
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
#define TheTransientKludge
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const StatusKind DATA_AVAILABLE_STATUS
Implements the DDS::DataReader interface.
LM_NOTICE
virtual RcHandle< EntityImpl > parent() const
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.
ACE_Recursive_Thread_Mutex si_lock_
DDS::SubscriberListener_var listener_
LM_WARNING
DDS::InstanceHandle_t handle_
const char *const name
Definition: debug.cpp:60
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
unsigned long SampleStateMask
virtual DDS::ReturnCode_t get_qos(DDS::SubscriberQos &qos)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
unsigned long StatusMask
const ReturnCode_t RETCODE_NOT_ENABLED
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
virtual DDS::ReturnCode_t end_access()
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual DDS::ReturnCode_t set_default_datareader_qos(const DDS::DataReaderQos &qos)
WeakRcHandle< DomainParticipantImpl > participant_
OpenDDS_Dcps_Export LogLevel log_level
virtual DDS::DomainParticipant_ptr get_participant()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::DataReader_ptr create_datareader(DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
const ReturnCode_t RETCODE_ERROR
virtual DDS::SubscriberListener_ptr get_listener()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual DDS::ReturnCode_t enable()
const size_t NUMBER_OF_BUILT_IN_TOPICS
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
static bool validate_datareader_qos(const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
void data_received(DataReaderImpl *reader)
const character_type * in(void) const
unsigned long StatusKind
bool contains_reader(DDS::InstanceHandle_t a_handle)
sequence< DataReader > DataReaderSeq
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
#define TheServiceParticipant
void remove_from_datareader_set(DataReaderImpl *reader)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
LM_ERROR
void coherent_change_received(const GUID_t &publisher_id, DataReaderImpl *reader, Coherent_State &group_state)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
bool is_clean(String *leftover_entities=0) const
PresentationQosPolicy presentation
ACE_Recursive_Thread_Mutex dr_set_lock_
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Boolean is_nil(T x)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
ACE_Thread_Mutex listener_mutex_
MultitopicReaderMap multitopic_reader_map_