OpenDDS  Snapshot(2023/04/28-20:55)
DomainParticipantImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
9 
11 #include "Service_Participant.h"
12 #include "Qos_Helper.h"
13 #include "GuidConverter.h"
14 #include "PublisherImpl.h"
15 #include "SubscriberImpl.h"
16 #include "DataWriterImpl.h"
17 #include "Marked_Default_Qos.h"
18 #include "Registered_Data_Types.h"
19 #include "Transient_Kludge.h"
21 #include "Util.h"
22 #include "DCPS_Utils.h"
23 #include "MonitorFactory.h"
25 #include "MultiTopicImpl.h"
26 #include "Service_Participant.h"
27 #include "RecorderImpl.h"
28 #include "ReplayerImpl.h"
29 #include "BuiltInTopicUtils.h"
32 #ifdef OPENDDS_SECURITY
36 #endif
37 #include "XTypes/Utils.h"
38 
39 #include <dds/DdsDcpsGuidC.h>
40 #ifndef DDS_HAS_MINIMUM_BIT
41 # include <dds/DdsDcpsCoreTypeSupportImpl.h>
42 #endif
43 
44 #include <ace/Reactor.h>
45 #include <ace/OS_NS_unistd.h>
46 
47 namespace Util {
48 
49  template <typename Key>
50  int find(
51  OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
52  const Key& key,
53  OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
54  {
55  OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
56  c.find(key);
57 
58  if (iter == c.end()) {
59  return -1;
60  }
61 
62  value = &iter->second;
63  return 0;
64  }
65 
66  DDS::PropertySeq filter_properties(const DDS::PropertySeq& properties, const std::string& prefix)
67  {
68  DDS::PropertySeq result(properties.length());
69  result.length(properties.length());
70  unsigned int count = 0;
71  for (unsigned int i = 0; i < properties.length(); ++i) {
72  if (std::string(properties[i].name.in()).find(prefix) == 0) {
73  result[count++] = properties[i];
74  }
75  }
76  result.length(count);
77  return result;
78  }
79 
80 } // namespace Util
81 
83 
84 namespace OpenDDS {
85 namespace DCPS {
86 
87 //TBD - add check for enabled in most methods.
88 // Currently this is not needed because auto_enable_created_entities
89 // cannot be false.
90 
91 // Implementation skeleton constructor
92 DomainParticipantImpl::DomainParticipantImpl(
93  InstanceHandleGenerator& handle_generator,
94  const DDS::DomainId_t& domain_id,
95  const DDS::DomainParticipantQos& qos,
96  DDS::DomainParticipantListener_ptr a_listener,
97  const DDS::StatusMask& mask)
98  : default_topic_qos_(TheServiceParticipant->initial_TopicQos())
99  , default_publisher_qos_(TheServiceParticipant->initial_PublisherQos())
100  , default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos())
101  , qos_(qos)
102 #ifdef OPENDDS_SECURITY
103  , id_handle_(DDS::HANDLE_NIL)
104  , perm_handle_(DDS::HANDLE_NIL)
105  , part_crypto_handle_(DDS::HANDLE_NIL)
106 #endif
107  , domain_id_(domain_id)
108  , dp_id_(GUID_UNKNOWN)
109  , federated_(false)
110  , handle_waiters_(handle_protector_)
111  , shutdown_condition_(shutdown_mutex_)
112  , shutdown_complete_(false)
113  , participant_handles_(handle_generator)
114  , pub_id_gen_(dp_id_)
115  , automatic_liveliness_timer_(make_rch<AutomaticLivelinessTimer>(ref(*this)))
116  , automatic_liveliness_task_(make_rch<AutomaticLivelinessTask>(
117  TheServiceParticipant->time_source(),
118  TheServiceParticipant->interceptor(),
119  automatic_liveliness_timer_,
120  &LivelinessTimer::execute))
121  , participant_liveliness_timer_(make_rch<ParticipantLivelinessTimer>(ref(*this)))
122  , participant_liveliness_task_(make_rch<ParticipantLivelinessTask>(
123  TheServiceParticipant->time_source(),
124  TheServiceParticipant->interceptor(),
125  participant_liveliness_timer_,
126  &LivelinessTimer::execute))
127 {
128  (void) this->set_listener(a_listener, mask);
129  monitor_.reset(TheServiceParticipant->monitor_factory_->create_dp_monitor(this));
130  type_lookup_service_ = make_rch<XTypes::TypeLookupService>();
131 }
132 
134 {
135 #ifdef OPENDDS_SECURITY
137  Security::AccessControl_var access = security_config_->get_access_control();
139  if (!access->return_permissions_handle(perm_handle_, se)) {
140  if (DCPS::security_debug.auth_warn) {
142  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::~DomainParticipantImpl: ")
143  ACE_TEXT("Unable to return permissions handle. SecurityException[%d.%d]: %C\n"),
144  se.code, se.minor_code, se.message.in()));
145  }
146  }
147  }
148 #endif
149 
150 }
151 
152 DDS::Publisher_ptr
154  const DDS::PublisherQos & qos,
155  DDS::PublisherListener_ptr a_listener,
156  DDS::StatusMask mask)
157 {
158  DDS::PublisherQos pub_qos = qos;
159 
160  if (! this->validate_publisher_qos(pub_qos))
161  return DDS::Publisher::_nil();
162 
163  // Although Publisher entities have GUIDs assigned (see pub_id_gen_),
164  // these are not GUIDs from the RTPS spec and
165  // so the handle doesn't need to correlate to the GUID.
166  const DDS::InstanceHandle_t handle = assign_handle();
167 
168  PublisherImpl* pub = 0;
169  ACE_NEW_RETURN(pub,
170  PublisherImpl(handle,
171  pub_id_gen_.next(),
172  pub_qos,
173  a_listener,
174  mask,
175  this),
176  DDS::Publisher::_nil());
177 
179  pub->enable();
180  }
181 
182  DDS::Publisher_ptr pub_obj(pub);
183 
184  // this object will also act as the guard for leaking Publisher Impl
185  Publisher_Pair pair(pub, pub_obj, false);
186 
188  tao_mon,
189  this->publishers_protector_,
190  DDS::Publisher::_nil());
191 
192  if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
193  if (DCPS_debug_level > 0) {
195  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
196  ACE_TEXT("%p\n"),
197  ACE_TEXT("insert")));
198  }
199  return DDS::Publisher::_nil();
200  }
201 
202  return DDS::Publisher::_duplicate(pub_obj);
203 }
204 
207  DDS::Publisher_ptr p)
208 {
209  // The servant's ref count should be 2 at this point,
210  // one referenced by poa, one referenced by the publisher
211  // set.
212  PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
213  if (!the_servant) {
214  if (log_level >= LogLevel::Notice) {
215  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
216  "Failed to obtain PublisherImpl\n"));
217  }
218  return DDS::RETCODE_ERROR;
219  }
220 
221  const Publisher_Pair pub_pair(the_servant, p, true);
222 
223  {
226  if (publishers_.count(pub_pair) == 0) {
227  if (log_level >= LogLevel::Notice) {
228  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
229  "This publisher doesn't belong to this participant\n"));
230  }
232  }
233  }
234 
235  String leftover_entities;
236  if (!the_servant->is_clean(&leftover_entities)) {
237  if (log_level >= LogLevel::Notice) {
238  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
239  "The publisher is not empty. %C leftover\n",
240  leftover_entities.c_str()));
241  }
243  }
244 
245  const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
246  if (ret != DDS::RETCODE_OK) {
247  if (log_level >= LogLevel::Notice) {
248  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
249  "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
250  }
251  return ret;
252  }
253 
254  {
257  if (remove(publishers_, pub_pair) == -1) {
258  if (log_level >= LogLevel::Notice) {
259  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
260  "publisher not found\n"));
261  }
262  return DDS::RETCODE_ERROR;
263  }
264  }
265 
266  return DDS::RETCODE_OK;
267 }
268 
269 DDS::Subscriber_ptr
271  const DDS::SubscriberQos & qos,
272  DDS::SubscriberListener_ptr a_listener,
273  DDS::StatusMask mask)
274 {
275  DDS::SubscriberQos sub_qos = qos;
276 
277  if (! this->validate_subscriber_qos(sub_qos)) {
278  return DDS::Subscriber::_nil();
279  }
280 
281  const DDS::InstanceHandle_t handle = assign_handle();
282 
283  SubscriberImpl* sub = 0;
284  ACE_NEW_RETURN(sub,
285  SubscriberImpl(handle,
286  sub_qos,
287  a_listener,
288  mask,
289  this),
290  DDS::Subscriber::_nil());
291 
293  sub->enable();
294  }
295 
296  DDS::Subscriber_ptr sub_obj(sub);
297 
298  Subscriber_Pair pair(sub, sub_obj, false);
299 
301  tao_mon,
303  DDS::Subscriber::_nil());
304 
305  if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
306  if (DCPS_debug_level > 0) {
308  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
309  ACE_TEXT("%p\n"),
310  ACE_TEXT("insert")));
311  }
312  return DDS::Subscriber::_nil();
313  }
314 
315  return DDS::Subscriber::_duplicate(sub_obj);
316 }
317 
320  DDS::Subscriber_ptr s)
321 {
322  // The servant's ref count should be 2 at this point,
323  // one referenced by poa, one referenced by the subscriber
324  // set.
325  SubscriberImpl* const the_servant = dynamic_cast<SubscriberImpl*>(s);
326  if (!the_servant) {
327  if (log_level >= LogLevel::Notice) {
328  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
329  "Failed to obtain SubscriberImpl\n"));
330  }
331  return DDS::RETCODE_ERROR;
332  }
333 
334  const Subscriber_Pair sub_pair(the_servant, s, true);
335 
336  {
339  if (subscribers_.count(sub_pair) == 0) {
340  if (log_level >= LogLevel::Notice) {
341  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
342  "This subscriber doesn't belong to this participant\n"));
343  }
345  }
346  }
347 
348  String leftover_entities;
349  if (!the_servant->is_clean(&leftover_entities)) {
350  if (log_level >= LogLevel::Notice) {
351  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
352  "The subscriber is not empty. %C leftover\n",
353  leftover_entities.c_str()));
354  }
356  }
357 
358  const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
359  if (ret != DDS::RETCODE_OK) {
360  if (log_level >= LogLevel::Notice) {
361  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
362  "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
363  }
364  return ret;
365  }
366 
367  {
370  if (remove(subscribers_, sub_pair) == -1) {
371  if (log_level >= LogLevel::Notice) {
372  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
373  "subscriber not found\n"));
374  }
375  return DDS::RETCODE_ERROR;
376  }
377  }
378 
379  return DDS::RETCODE_OK;
380 }
381 
382 DDS::Subscriber_ptr
384 {
385  return bit_subscriber_->get();
386 }
387 
390 {
391  return bit_subscriber_;
392 }
393 
394 DDS::Topic_ptr
396  const char * topic_name,
397  const char * type_name,
398  const DDS::TopicQos & qos,
399  DDS::TopicListener_ptr a_listener,
400  DDS::StatusMask mask)
401 {
402  return create_topic_i(topic_name,
403  type_name,
404  qos,
405  a_listener,
406  mask,
407  0);
408 }
409 
410 DDS::Topic_ptr
412  const char * topic_name,
413  const char * type_name,
414  bool type_has_keys,
415  const DDS::TopicQos & qos,
416  DDS::TopicListener_ptr a_listener,
417  DDS::StatusMask mask)
418 {
419  int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
420 
421  return create_topic_i(topic_name,
422  type_name,
423  qos,
424  a_listener,
425  mask,
426  topic_mask);
427 }
428 
429 
430 DDS::Topic_ptr
432  const char * topic_name,
433  const char * type_name,
434  const DDS::TopicQos & qos,
435  DDS::TopicListener_ptr a_listener,
436  DDS::StatusMask mask,
437  int topic_mask)
438 {
439  DDS::TopicQos topic_qos;
440 
441  if (qos == TOPIC_QOS_DEFAULT) {
442  this->get_default_topic_qos(topic_qos);
443 
444  } else {
445  topic_qos = qos;
446  }
447 
449  OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
450  OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
452 
453  if (!Qos_Helper::valid(topic_qos)) {
454  if (DCPS_debug_level > 0) {
456  ACE_TEXT("(%P|%t) ERROR: ")
457  ACE_TEXT("DomainParticipantImpl::create_topic, ")
458  ACE_TEXT("invalid qos.\n")));
459  }
460  return DDS::Topic::_nil();
461  }
462 
463  if (!Qos_Helper::consistent(topic_qos)) {
464  if (DCPS_debug_level > 0) {
466  ACE_TEXT("(%P|%t) ERROR: ")
467  ACE_TEXT("DomainParticipantImpl::create_topic, ")
468  ACE_TEXT("inconsistent qos.\n")));
469  }
470  return DDS::Topic::_nil();
471  }
472 
473  // See if there is a Topic with the same name.
474  TopicMap::mapped_type* entry = 0;
475  bool found = false;
476  {
478  tao_mon,
479  this->topics_protector_,
480  DDS::Topic::_nil());
481 
482 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
483  if (topic_descrs_.count(topic_name)) {
484  if (DCPS_debug_level > 3) {
485  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
486  ACE_TEXT("DomainParticipantImpl::create_topic, ")
487  ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
488  ACE_TEXT("by a TopicDescription.\n"), topic_name));
489  }
490  return 0;
491  }
492 #endif
493 
494  if (Util::find(topics_, topic_name, entry) == 0) {
495  found = true;
496  }
497  }
498 
499  /*
500  * If there is a topic with the same name, return the topic if it has the
501  * same type name and QoS, else it is an error.
502  */
503  if (found) {
504  CORBA::String_var found_type = entry->pair_.svt_->get_type_name();
505  if (ACE_OS::strcmp(type_name, found_type) == 0) {
506  DDS::TopicQos found_qos;
507  entry->pair_.svt_->get_qos(found_qos);
508 
509  if (topic_qos == found_qos) { // match type name, qos
510  {
512  tao_mon,
513  this->topics_protector_,
514  DDS::Topic::_nil());
515  ++entry->client_refs_;
516  }
517  return DDS::Topic::_duplicate(entry->pair_.obj_.in());
518 
519  } else { // Same Name and Type, Different QoS
520  if (DCPS_debug_level >= 1) {
522  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
523  ACE_TEXT("topic with name \"%C\" and type %C already exists, ")
524  ACE_TEXT("but the QoS doesn't match.\n"),
525  topic_name, type_name));
526  }
527 
528  return DDS::Topic::_nil();
529  }
530 
531  } else { // Same Name, Different Type
532  if (DCPS_debug_level >= 1) {
534  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
535  ACE_TEXT("topic with name \"%C\" already exists, but its type, %C ")
536  ACE_TEXT("is not the same as %C.\n"),
537  topic_name, found_type.in(), type_name));
538  }
539 
540  return DDS::Topic::_nil();
541  }
542 
543  } else {
544 
545  OpenDDS::DCPS::TypeSupport_var type_support;
546 
547  if (0 == topic_mask) {
548  // creating a topic with compile time type
549  type_support = Registered_Data_Types->lookup(this, type_name);
550  if (CORBA::is_nil(type_support)) {
551  if (DCPS_debug_level >= 1) {
552  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
553  ACE_TEXT("DomainParticipantImpl::create_topic, ")
554  ACE_TEXT("can't create a topic=%C type_name=%C ")
555  ACE_TEXT("is not registered.\n"),
556  topic_name, type_name));
557  }
558  return DDS::Topic::_nil();
559  }
560  }
561 
562  DDS::Topic_var new_topic = create_new_topic(topic_name,
563  type_name,
564  topic_qos,
565  a_listener,
566  mask,
567  type_support);
568 
569  if (!new_topic) {
570  if (DCPS_debug_level > 0) {
572  ACE_TEXT("(%P|%t) WARNING: ")
573  ACE_TEXT("DomainParticipantImpl::create_topic, ")
574  ACE_TEXT("create_new_topic failed.\n")));
575  }
576  return DDS::Topic::_nil();
577  }
578 
580  if (new_topic->enable() != DDS::RETCODE_OK) {
581  if (DCPS_debug_level > 0) {
583  ACE_TEXT("(%P|%t) WARNING: ")
584  ACE_TEXT("DomainParticipantImpl::create_topic, ")
585  ACE_TEXT("enable failed.\n")));
586  }
587  return DDS::Topic::_nil();
588  }
589  }
590  return new_topic._retn();
591  }
592 }
593 
596  DDS::Topic_ptr a_topic)
597 {
598  return delete_topic_i(a_topic, false);
599 }
600 
602  DDS::Topic_ptr a_topic,
603  bool remove_objref)
604 {
606 
607  try {
608  // The servant's ref count should be greater than 2 at this point,
609  // one referenced by poa, one referenced by the topic map and
610  // others referenced by the datareader/datawriter.
611  TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
612 
613  if (!the_topic_servant) {
614  if (log_level >= LogLevel::Notice) {
615  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: %p\n"
616  "failed to obtain TopicImpl."));
617  }
618  return DDS::RETCODE_ERROR;
619  }
620 
621  DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
622 
623  DomainParticipantImpl* the_dp_servant =
624  dynamic_cast<DomainParticipantImpl*>(dp.in());
625 
626  if (the_dp_servant != this) {
627  if (log_level >= LogLevel::Notice) {
628  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
629  "will return PRECONDITION_NOT_MET because this is not the "
630  "participant that owns this topic\n"));
631  }
633  }
634  if (!remove_objref && the_topic_servant->has_entity_refs()) {
635  // If entity_refs is true (nonzero), then some reader or writer is using
636  // this topic and the spec requires delete_topic() to fail with the error:
637  if (log_level >= LogLevel::Notice) {
638  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
639  "will return PRECONDITION_NOT_MET because there are still "
640  "outstanding references to this topic\n"));
641  }
643  }
644 
645  {
647  tao_mon,
648  this->topics_protector_,
650 
651  CORBA::String_var topic_name = the_topic_servant->get_name();
652  TopicMap::mapped_type* entry = 0;
653 
654  TopicMapIteratorPair iters = topics_.equal_range(topic_name.in());
655  TopicMapIterator iter;
656  for (iter = iters.first; iter != iters.second; ++iter) {
657  if (iter->second.pair_.svt_ == the_topic_servant) {
658  entry = &iter->second;
659  break;
660  }
661  }
662  if (entry == 0) {
663  if (log_level >= LogLevel::Notice) {
664  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: not found\n"));
665  }
666  return DDS::RETCODE_ERROR;
667  }
668 
669  const CORBA::ULong client_refs = --entry->client_refs_;
670 
671  if (remove_objref || 0 == client_refs) {
672  const GUID_t topicId = the_topic_servant->get_id();
673  topics_.erase(iter);
674 
675  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
676  TopicStatus status = disco->remove_topic(
677  the_dp_servant->get_domain_id(), the_dp_servant->get_id(), topicId);
678 
679  if (status != REMOVED) {
680  if (log_level >= LogLevel::Notice) {
681  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
682  "remove_topic failed with return value <%C>\n", topicstatus_to_string(status)));
683  }
684  return DDS::RETCODE_ERROR;
685  }
686 
687  return DDS::RETCODE_OK;
688 
689  } else {
690  if (DCPS_debug_level > 4) {
691  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::delete_topic_i: "
692  "Didn't remove topic from the map, remove_objref %d client_refs %d\n",
693  remove_objref, client_refs));
694  }
695  }
696  }
697 
698  } catch (...) {
699  if (log_level >= LogLevel::Notice) {
700  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
701  " Caught Unknown Exception\n"));
702  }
703  ret = DDS::RETCODE_ERROR;
704  }
705 
706  return ret;
707 }
708 
709 DDS::Topic_ptr
711  const char* topic_name,
712  const DDS::Duration_t& timeout)
713 {
714  const MonotonicTimePoint timeout_at(MonotonicTimePoint::now() + TimeDuration(timeout));
715 
716  bool first_time = true;
717  while (first_time || MonotonicTimePoint::now() < timeout_at) {
718  if (first_time) {
719  first_time = false;
720  }
721 
722  GUID_t topic_id;
723  CORBA::String_var type_name;
724  DDS::TopicQos_var qos;
725 
726  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
727  TopicStatus status = disco->find_topic(domain_id_,
728  get_id(),
729  topic_name,
730  type_name.out(),
731  qos.out(),
732  topic_id);
733 
735  if (status == FOUND) {
736  OpenDDS::DCPS::TypeSupport_var type_support =
737  Registered_Data_Types->lookup(this, type_name.in());
738  if (CORBA::is_nil(type_support)) {
739  if (DCPS_debug_level) {
740  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
741  ACE_TEXT("DomainParticipantImpl::find_topic, ")
742  ACE_TEXT("can't create a Topic: type_name \"%C\" ")
743  ACE_TEXT("is not registered.\n"), type_name.in()));
744  }
745 
746  return DDS::Topic::_nil();
747  }
748 
749  DDS::Topic_ptr new_topic = create_new_topic(topic_name,
750  type_name,
751  qos,
752  DDS::TopicListener::_nil(),
754  type_support);
755  return new_topic;
756 
757  } else if (status == INTERNAL_ERROR) {
758  if (DCPS_debug_level > 0) {
760  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
761  ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
762  }
763  return DDS::Topic::_nil();
764  } else if (now < timeout_at) {
765  const TimeDuration remaining = timeout_at - now;
766 
767  if (remaining.value().sec() >= 1) {
768  ACE_OS::sleep(1);
769 
770  } else {
771  ACE_OS::sleep(remaining.value());
772  }
773  }
774  }
775 
776  if (DCPS_debug_level >= 1) {
777  // timed out
779  ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
780  ACE_TEXT("timed out.\n")));
781  }
782 
783  return DDS::Topic::_nil();
784 }
785 
786 DDS::TopicDescription_ptr
788 {
790  tao_mon,
791  this->topics_protector_,
792  DDS::Topic::_nil());
793 
794  TopicMap::mapped_type* entry = 0;
795 
796  if (Util::find(topics_, name, entry) == -1) {
797 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
798  TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
799  if (iter != topic_descrs_.end()) {
800  return DDS::TopicDescription::_duplicate(iter->second);
801  }
802 #endif
803  return DDS::TopicDescription::_nil();
804 
805  } else {
806  return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
807  }
808 }
809 
810 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
811 
812 DDS::ContentFilteredTopic_ptr
814  const char* name,
815  DDS::Topic_ptr related_topic,
816  const char* filter_expression,
817  const DDS::StringSeq& expression_parameters)
818 {
819  if (CORBA::is_nil(related_topic)) {
820  if (DCPS_debug_level > 3) {
821  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
822  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
823  ACE_TEXT("can't create a content-filtered topic due to null related ")
824  ACE_TEXT("topic.\n")));
825  }
826  return 0;
827  }
828 
830 
831  if (topics_.count(name)) {
832  if (DCPS_debug_level > 3) {
833  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
834  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
835  ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
836  ACE_TEXT("already in use by a Topic.\n"), name));
837  }
838  return 0;
839  }
840 
841  if (topic_descrs_.count(name)) {
842  if (DCPS_debug_level > 3) {
843  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
844  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
845  ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
846  ACE_TEXT("already in use by a TopicDescription.\n"), name));
847  }
848  return 0;
849  }
850 
851  DDS::ContentFilteredTopic_var cft;
852  try {
853  // Create the cft in two steps so that we only have one place to
854  // check the expression parameters
855  cft = new ContentFilteredTopicImpl(name, related_topic, filter_expression, this);
856  if (cft->set_expression_parameters(expression_parameters) != DDS::RETCODE_OK) {
857  return 0;
858  }
859  } catch (const std::exception& e) {
860  if (DCPS_debug_level) {
861  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
862  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
863  ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
864  ACE_TEXT("%C.\n"), e.what()));
865  }
866  return 0;
867  }
868  DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
869  topic_descrs_[name] = td;
870  return cft._retn();
871 }
872 
874  DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
875 {
878  DDS::ContentFilteredTopic_var cft =
879  DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
880  CORBA::String_var name = cft->get_name();
881  TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
882  if (iter == topic_descrs_.end()) {
883  if (DCPS_debug_level > 3) {
884  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
885  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
886  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
887  ACE_TEXT("because it is not in the set.\n"), name.in ()));
888  }
890  }
891 
892  TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
893 
894  if (!tdi) {
895  if (DCPS_debug_level > 3) {
896  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
897  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
898  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
899  ACE_TEXT("failed to obtain TopicDescriptionImpl\n"), name.in()));
900  }
901  return DDS::RETCODE_ERROR;
902  }
903 
904  if (tdi->has_entity_refs()) {
905  if (DCPS_debug_level > 3) {
906  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
907  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
908  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
909  ACE_TEXT("because it is used by a datareader\n"), name.in ()));
910  }
912  }
913  topic_descrs_.erase(iter);
914  return DDS::RETCODE_OK;
915 }
916 
917 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
918 
919 #ifndef OPENDDS_NO_MULTI_TOPIC
920 
922  const char* name, const char* type_name,
923  const char* subscription_expression,
924  const DDS::StringSeq& expression_parameters)
925 {
927 
928  if (topics_.count(name)) {
929  if (DCPS_debug_level > 3) {
930  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
931  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
932  ACE_TEXT("can't create a multi topic due to name \"%C\" ")
933  ACE_TEXT("already in use by a Topic.\n"), name));
934  }
935  return 0;
936  }
937 
938  if (topic_descrs_.count(name)) {
939  if (DCPS_debug_level > 3) {
940  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
941  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
942  ACE_TEXT("can't create a multi topic due to name \"%C\" ")
943  ACE_TEXT("already in use by a TopicDescription.\n"), name));
944  }
945  return 0;
946  }
947 
948  DDS::MultiTopic_var mt;
949  try {
950  mt = new MultiTopicImpl(name, type_name, subscription_expression,
951  expression_parameters, this);
952  } catch (const std::exception& e) {
953  if (DCPS_debug_level) {
954  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
955  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
956  ACE_TEXT("can't create a multi topic due to runtime error: ")
957  ACE_TEXT("%C.\n"), e.what()));
958  }
959  return 0;
960  }
961  DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
962  topic_descrs_[name] = td;
963  return mt._retn();
964 }
965 
967  DDS::MultiTopic_ptr a_multitopic)
968 {
971  DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
972  CORBA::String_var mt_name = mt->get_name();
973  TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
974  if (iter == topic_descrs_.end()) {
975  if (DCPS_debug_level > 3) {
976  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
977  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
978  ACE_TEXT("can't delete a multitopic \"%C\" ")
979  ACE_TEXT("because it is not in the set.\n"), mt_name.in ()));
980  }
982  }
983 
984  TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
985 
986  if (!tdi) {
987  if (DCPS_debug_level > 3) {
988  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
989  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
990  ACE_TEXT("can't delete a multitopic topic \"%C\" ")
991  ACE_TEXT("failed to obtain TopicDescriptionImpl.\n"),
992  mt_name.in()));
993  }
994  return DDS::RETCODE_ERROR;
995  }
996 
997  if (tdi->has_entity_refs()) {
998  if (DCPS_debug_level > 3) {
999  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1000  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
1001  ACE_TEXT("can't delete a multitopic topic \"%C\" ")
1002  ACE_TEXT("because it is used by a datareader.\n"), mt_name.in ()));
1003  }
1005  }
1006  topic_descrs_.erase(iter);
1007  return DDS::RETCODE_OK;
1008 }
1009 
1010 #endif // OPENDDS_NO_MULTI_TOPIC
1011 
1012 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1013 
1016 {
1019 
1020  RcHandle<FilterEvaluator>& result = filter_cache_[filter];
1021  if (!result) {
1022  try {
1023  result = make_rch<FilterEvaluator>(filter, false);
1024  } catch (const std::exception& e) {
1025  filter_cache_.erase(filter);
1026  if (DCPS_debug_level) {
1027  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1028  ACE_TEXT("DomainParticipantImpl::get_filter_eval, ")
1029  ACE_TEXT("can't create a writer-side content filter due to ")
1030  ACE_TEXT("runtime error: %C.\n"), e.what()));
1031  }
1032  }
1033  }
1034  return result;
1035 }
1036 
1037 void
1039 {
1041  typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
1042  Map::iterator iter = filter_cache_.find(filter);
1043  if (iter != filter_cache_.end()) {
1044  if (iter->second->ref_count() == 1) {
1045  filter_cache_.erase(iter);
1046  }
1047  }
1048 }
1049 
1050 #endif
1051 
1054 {
1055  if (!get_deleted()) {
1056  // mark that the entity is being deleted
1057  set_deleted(true);
1058 
1060  return DDS::RETCODE_ERROR;
1061  }
1062  if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
1063  return DDS::RETCODE_ERROR;
1064  }
1065  }
1066 
1067  // BIT subscriber and data readers will be deleted with the
1068  // rest of the entities, so need to report to discovery that
1069  // BIT is no longer available
1070  Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
1071  if (disc)
1072  disc->fini_bit(this);
1073 
1074  if (ACE_OS::thr_equal(TheServiceParticipant->reactor_owner(),
1075  ACE_Thread::self())) {
1076  handle_exception(0);
1077 
1078  } else {
1079  TheServiceParticipant->reactor()->notify(this);
1080 
1082  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1083  while (!shutdown_complete_) {
1084  shutdown_condition_.wait(thread_status_manager);
1085  }
1086  shutdown_complete_ = false;
1088  }
1089 
1090  bit_subscriber_.reset();
1091 
1092  Registered_Data_Types->unregister_participant(this);
1093 
1094  // the participant can now start creating new contained entities
1095  set_deleted(false);
1096  return shutdown_result_;
1097 }
1098 
1101 {
1102  /// Check top-level containers for Topic, Subscriber,
1103  /// and Publisher instances.
1104  {
1106  guard,
1107  this->topics_protector_,
1108  false);
1109 
1110  for (TopicMap::iterator it(topics_.begin());
1111  it != topics_.end(); ++it) {
1112  if (a_handle == it->second.pair_.svt_->get_instance_handle())
1113  return true;
1114  }
1115  }
1116 
1117  {
1119  guard,
1120  this->subscribers_protector_,
1121  false);
1122 
1123  for (SubscriberSet::iterator it(subscribers_.begin());
1124  it != subscribers_.end(); ++it) {
1125  if (a_handle == it->svt_->get_instance_handle())
1126  return true;
1127  }
1128  }
1129 
1130  {
1132  guard,
1133  this->publishers_protector_,
1134  false);
1135 
1136  for (PublisherSet::iterator it(publishers_.begin());
1137  it != publishers_.end(); ++it) {
1138  if (a_handle == it->svt_->get_instance_handle())
1139  return true;
1140  }
1141  }
1142 
1143  /// Recurse into SubscriberImpl and PublisherImpl for
1144  /// DataReader and DataWriter instances respectively.
1145  for (SubscriberSet::iterator it(subscribers_.begin());
1146  it != subscribers_.end(); ++it) {
1147  if (it->svt_->contains_reader(a_handle))
1148  return true;
1149  }
1150 
1151  for (PublisherSet::iterator it(publishers_.begin());
1152  it != publishers_.end(); ++it) {
1153  if (it->svt_->contains_writer(a_handle))
1154  return true;
1155  }
1156 
1157  return false;
1158 }
1159 
1162  const DDS::DomainParticipantQos & qos)
1163 {
1164  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1165  if (qos_ == qos)
1166  return DDS::RETCODE_OK;
1167 
1168  // for the not changeable qos, it can be changed before enable
1169  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
1171 
1172  } else {
1173  qos_ = qos;
1174 
1175  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1176  const bool status =
1177  disco->update_domain_participant_qos(domain_id_,
1178  dp_id_,
1179  qos_);
1180 
1181  if (!status) {
1182  if (DCPS_debug_level > 0) {
1184  ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
1185  ACE_TEXT("failed on compatibility check.\n")));
1186  }
1187  return DDS::RETCODE_ERROR;
1188  }
1189  }
1190 
1191  return DDS::RETCODE_OK;
1192 
1193  } else {
1195  }
1196 }
1197 
1201 {
1202  qos = qos_;
1203  return DDS::RETCODE_OK;
1204 }
1205 
1208  DDS::DomainParticipantListener_ptr a_listener,
1209  DDS::StatusMask mask)
1210 {
1212  listener_mask_ = mask;
1213  //note: OK to duplicate a nil object ref
1214  listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
1215  return DDS::RETCODE_OK;
1216 }
1217 
1218 DDS::DomainParticipantListener_ptr
1220 {
1222  return DDS::DomainParticipantListener::_duplicate(listener_.in());
1223 }
1224 
1227  DDS::InstanceHandle_t handle)
1228 {
1229 #ifndef DDS_HAS_MINIMUM_BIT
1230  if (!enabled_) {
1231  if (DCPS_debug_level > 0) {
1233  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1234  ACE_TEXT("Entity is not enabled.\n")));
1235  }
1236  return DDS::RETCODE_NOT_ENABLED;
1237  }
1238 
1239  GUID_t ignoreId = get_repoid(handle);
1240  HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
1241 
1242  if (location == this->ignored_participants_.end()) {
1243  this->ignored_participants_[ ignoreId] = handle;
1244  }
1245  else {// ignore same participant again, just return ok.
1246  return DDS::RETCODE_OK;
1247  }
1248 
1249  if (DCPS_debug_level >= 4) {
1251  ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
1252  ACE_TEXT("%C ignoring handle %x.\n"),
1253  LogGuid(dp_id_).c_str(),
1254  handle));
1255  }
1256 
1257  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1258  if (!disco->ignore_domain_participant(domain_id_,
1259  dp_id_,
1260  ignoreId)) {
1261  if (DCPS_debug_level > 0) {
1263  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1264  ACE_TEXT("Could not ignore domain participant.\n")));
1265  }
1266  return DDS::RETCODE_ERROR;
1267  }
1268 
1269 
1270  if (DCPS_debug_level >= 4) {
1272  ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
1273  ACE_TEXT("%C repo call returned.\n"),
1274  LogGuid(dp_id_).c_str()));
1275  }
1276 
1277  return DDS::RETCODE_OK;
1278 #else
1279  ACE_UNUSED_ARG(handle);
1280  return DDS::RETCODE_UNSUPPORTED;
1281 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1282 }
1283 
1286  DDS::InstanceHandle_t handle)
1287 {
1288 #ifndef DDS_HAS_MINIMUM_BIT
1289  if (!enabled_) {
1290  if (DCPS_debug_level > 0) {
1292  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1293  ACE_TEXT(" Entity is not enabled.\n")));
1294  }
1295  return DDS::RETCODE_NOT_ENABLED;
1296  }
1297 
1298  GUID_t ignoreId = get_repoid(handle);
1299  HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
1300 
1301  if (location == this->ignored_topics_.end()) {
1302  this->ignored_topics_[ ignoreId] = handle;
1303  }
1304  else { // ignore same topic again, just return ok.
1305  return DDS::RETCODE_OK;
1306  }
1307 
1308  if (DCPS_debug_level >= 4) {
1310  ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
1311  ACE_TEXT("%C ignoring handle %x.\n"),
1312  LogGuid(dp_id_).c_str(),
1313  handle));
1314  }
1315 
1316  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1317  if (!disco->ignore_topic(domain_id_,
1318  dp_id_,
1319  ignoreId)) {
1320  if (DCPS_debug_level > 0) {
1322  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1323  ACE_TEXT(" Could not ignore topic.\n")));
1324  }
1325  }
1326 
1327  return DDS::RETCODE_OK;
1328 #else
1329  ACE_UNUSED_ARG(handle);
1330  return DDS::RETCODE_UNSUPPORTED;
1331 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1332 }
1333 
1336  DDS::InstanceHandle_t handle)
1337 {
1338 #ifndef DDS_HAS_MINIMUM_BIT
1339  if (!enabled_) {
1340  if (DCPS_debug_level > 0) {
1342  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1343  ACE_TEXT(" Entity is not enabled.\n")));
1344  }
1345  return DDS::RETCODE_NOT_ENABLED;
1346  }
1347 
1348  if (DCPS_debug_level >= 4) {
1350  ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
1351  ACE_TEXT("%C ignoring handle %x.\n"),
1352  LogGuid(dp_id_).c_str(),
1353  handle));
1354  }
1355 
1356  GUID_t ignoreId = get_repoid(handle);
1357  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1358  if (!disco->ignore_publication(domain_id_,
1359  dp_id_,
1360  ignoreId)) {
1361  if (DCPS_debug_level > 0) {
1363  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1364  ACE_TEXT(" could not ignore publication in discovery.\n")));
1365  }
1366  return DDS::RETCODE_ERROR;
1367  }
1368 
1369  return DDS::RETCODE_OK;
1370 #else
1371  ACE_UNUSED_ARG(handle);
1372  return DDS::RETCODE_UNSUPPORTED;
1373 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1374 }
1375 
1378  DDS::InstanceHandle_t handle)
1379 {
1380 #ifndef DDS_HAS_MINIMUM_BIT
1381  if (!enabled_) {
1382  if (DCPS_debug_level > 0) {
1384  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1385  ACE_TEXT(" Entity is not enabled.\n")));
1386  }
1387  return DDS::RETCODE_NOT_ENABLED;
1388  }
1389 
1390  if (DCPS_debug_level >= 4) {
1392  ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
1393  ACE_TEXT("%C ignoring handle %d.\n"),
1394  LogGuid(dp_id_).c_str(),
1395  handle));
1396  }
1397 
1398  GUID_t ignoreId = get_repoid(handle);
1399  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1400  if (!disco->ignore_subscription(domain_id_,
1401  dp_id_,
1402  ignoreId)) {
1403  if (DCPS_debug_level > 0) {
1405  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1406  ACE_TEXT(" could not ignore subscription in discovery.\n")));
1407  }
1408  return DDS::RETCODE_ERROR;
1409  }
1410 
1411  return DDS::RETCODE_OK;
1412 #else
1413  ACE_UNUSED_ARG(handle);
1414  return DDS::RETCODE_UNSUPPORTED;
1415 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1416 }
1417 
1420 {
1421  return domain_id_;
1422 }
1423 
1426 {
1427  // This operation needs to only be used if the DomainParticipant contains
1428  // DataWriter entities with the LIVELINESS set to MANUAL_BY_PARTICIPANT and
1429  // it only affects the liveliness of those DataWriter entities. Otherwise,
1430  // it has no effect.
1431  // This will do nothing in current implementation since we only
1432  // support the AUTOMATIC liveliness qos for datawriter.
1433  // Add implementation here.
1434 
1436  tao_mon,
1437  this->publishers_protector_,
1439 
1440  for (PublisherSet::iterator it(publishers_.begin());
1441  it != publishers_.end(); ++it) {
1442  it->svt_->assert_liveliness_by_participant();
1443  }
1444 
1446 
1447  return DDS::RETCODE_OK;
1448 }
1449 
1452  const DDS::PublisherQos & qos)
1453 {
1454  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1455  default_publisher_qos_ = qos;
1456  return DDS::RETCODE_OK;
1457 
1458  } else {
1460  }
1461 }
1462 
1465  DDS::PublisherQos & qos)
1466 {
1467  qos = default_publisher_qos_;
1468  return DDS::RETCODE_OK;
1469 }
1470 
1473  const DDS::SubscriberQos & qos)
1474 {
1475  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1477  return DDS::RETCODE_OK;
1478 
1479  } else {
1481  }
1482 }
1483 
1486  DDS::SubscriberQos & qos)
1487 {
1489  return DDS::RETCODE_OK;
1490 }
1491 
1494  const DDS::TopicQos & qos)
1495 {
1496  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1497  default_topic_qos_ = qos;
1498  return DDS::RETCODE_OK;
1499 
1500  } else {
1502  }
1503 }
1504 
1507  DDS::TopicQos & qos)
1508 {
1509  qos = default_topic_qos_;
1510  return DDS::RETCODE_OK;
1511 }
1512 
1515 {
1516  current_time = SystemTimePoint::now().to_dds_time();
1517  return DDS::RETCODE_OK;
1518 }
1519 
1520 #if !defined (DDS_HAS_MINIMUM_BIT)
1521 
1524 {
1526 
1527  const CountedHandleMap::const_iterator itEnd = handles_.end();
1528  for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1529  GuidConverter converter(iter->first);
1530 
1531  if (converter.entityKind() == KIND_PARTICIPANT) {
1532  // skip itself and the ignored participant
1533  if (iter->first == dp_id_ || ignored_participants_.count(iter->first)) {
1534  continue;
1535  }
1536 
1537  push_back(participant_handles, iter->second.first);
1538  }
1539  }
1540 
1541  return DDS::RETCODE_OK;
1542 }
1543 
1546  DDS::InstanceHandle_t participant_handle)
1547 {
1548  {
1550 
1551  bool found = false;
1552  const CountedHandleMap::const_iterator itEnd = handles_.end();
1553  for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1554  GuidConverter converter(iter->first);
1555 
1556  if (participant_handle == iter->second.first
1557  && converter.entityKind() == KIND_PARTICIPANT) {
1558  found = true;
1559  break;
1560  }
1561  }
1562 
1563  if (!found)
1565  }
1566 
1567  return bit_subscriber_->get_discovered_participant_data(participant_data, participant_handle);
1568 }
1569 
1572 {
1574 
1575  const CountedHandleMap::const_iterator itEnd = handles_.end();
1576  for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1577  GuidConverter converter(iter->first);
1578  if (converter.isTopic()) {
1579  if (ignored_topics_.count(iter->first)) {
1580  continue;
1581  }
1582 
1583  push_back(topic_handles, iter->second.first);
1584  }
1585  }
1586 
1587  return DDS::RETCODE_OK;
1588 }
1589 
1592  DDS::InstanceHandle_t topic_handle)
1593 {
1594  {
1596 
1597  bool found = false;
1598  const CountedHandleMap::const_iterator itEnd = handles_.end();
1599  for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1600  GuidConverter converter(iter->first);
1601  if (topic_handle == iter->second.first && converter.isTopic()) {
1602  found = true;
1603  break;
1604  }
1605  }
1606 
1607  if (!found)
1609  }
1610 
1611  return bit_subscriber_->get_discovered_topic_data(topic_data, topic_handle);
1612 }
1613 
1614 #endif
1615 
1618 {
1619  //According spec:
1620  // - Calling enable on an already enabled Entity returns OK and has no
1621  // effect.
1622  // - Calling enable on an Entity whose factory is not enabled will fail
1623  // and return PRECONDITION_NOT_MET.
1624 
1625  if (this->is_enabled()) {
1626  return DDS::RETCODE_OK;
1627  }
1628 
1629 #ifdef OPENDDS_SECURITY
1630  if (!security_config_ && TheServiceParticipant->get_security()) {
1631  security_config_ = TheSecurityRegistry->default_config();
1632  if (!security_config_) {
1633  security_config_ = TheSecurityRegistry->builtin_config();
1634  TheSecurityRegistry->default_config(security_config_);
1635  }
1636  }
1637 #endif
1638 
1639  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1640 
1641  if (disco.is_nil()) {
1642  if (DCPS_debug_level > 0) {
1644  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1645  ACE_TEXT("no discovery found for domain id: %d.\n"), domain_id_));
1646  }
1647  return DDS::RETCODE_ERROR;
1648  }
1649 
1650 #ifdef OPENDDS_SECURITY
1651  if (TheServiceParticipant->get_security() && !security_config_) {
1652  if (DCPS::security_debug.new_entity_error) {
1654  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1655  ACE_TEXT("DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
1656  }
1657  return DDS::RETCODE_ERROR;
1658  }
1659 #endif
1660 
1661  AddDomainStatus value = {GUID_UNKNOWN, false};
1662 
1663 #ifdef OPENDDS_SECURITY
1664  if (TheServiceParticipant->get_security() && security_config_->qos_implies_security(qos_)) {
1665  Security::Authentication_var auth = security_config_->get_authentication();
1666 
1669  auth->validate_local_identity(id_handle_, dp_id_, domain_id_, qos_, disco->generate_participant_guid(), se);
1670 
1671  /* TODO - Handle VALIDATION_PENDING_RETRY */
1672  if (val_res != DDS::Security::VALIDATION_OK) {
1673  if (DCPS::security_debug.new_entity_error) {
1675  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1676  ACE_TEXT("Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
1677  se.code, se.minor_code, se.message.in()));
1678  }
1680  }
1681 
1682  Security::AccessControl_var access = security_config_->get_access_control();
1683 
1684  perm_handle_ = access->validate_local_permissions(auth, id_handle_, domain_id_, qos_, se);
1685 
1686  if (perm_handle_ == DDS::HANDLE_NIL) {
1687  if (DCPS::security_debug.new_entity_error) {
1689  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1690  ACE_TEXT("Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
1691  se.code, se.minor_code, se.message.in()));
1692  }
1694  }
1695 
1696  const bool check_create = access->check_create_participant(perm_handle_, domain_id_, qos_, se);
1697  if (!check_create) {
1698  if (DCPS::security_debug.new_entity_error) {
1700  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1701  ACE_TEXT("Unable to create participant. SecurityException[%d.%d]: %C\n"),
1702  se.code, se.minor_code, se.message.in()));
1703  }
1705  }
1706 
1708  const bool check_part_sec_attr = access->get_participant_sec_attributes(perm_handle_, part_sec_attr, se);
1709 
1710  if (!check_part_sec_attr) {
1711  if (DCPS::security_debug.new_entity_error) {
1713  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable,")
1714  ACE_TEXT("Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
1715  se.code, se.minor_code, se.message.in()));
1716  }
1717  return DDS::RETCODE_ERROR;
1718  }
1719 
1720  if (part_sec_attr.is_rtps_protected) { // DDS-Security v1.1 8.4.2.4 Table 27 is_rtps_protected
1721  if (part_sec_attr.allow_unauthenticated_participants) {
1722  if (DCPS::security_debug.new_entity_error) {
1724  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1725  ACE_TEXT("allow_unauthenticated_participants is not possible with is_rtps_protected\n")));
1726  }
1728  }
1729 
1730  const Security::CryptoKeyFactory_var crypto = security_config_->get_crypto_key_factory();
1731  part_crypto_handle_ = crypto->register_local_participant(id_handle_, perm_handle_,
1732  Util::filter_properties(qos_.property.value, "dds.sec.crypto."), part_sec_attr, se);
1734  if (DCPS::security_debug.new_entity_error) {
1736  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1737  ACE_TEXT("Unable to register local participant. SecurityException[%d.%d]: %C\n"),
1738  se.code, se.minor_code, se.message.in()));
1739  }
1740  return DDS::RETCODE_ERROR;
1741  }
1742 
1743  } else {
1745  }
1746 
1747  value = disco->add_domain_participant_secure(domain_id_, qos_, type_lookup_service_,
1749 
1750  if (value.id == GUID_UNKNOWN) {
1751  if (DCPS::security_debug.new_entity_error) {
1753  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1754  ACE_TEXT("add_domain_participant_secure returned invalid id.\n")));
1755  }
1756  return DDS::RETCODE_ERROR;
1757  }
1758 
1759  } else {
1760 #endif
1761 
1762  value = disco->add_domain_participant(domain_id_, qos_, type_lookup_service_);
1763 
1764  if (value.id == GUID_UNKNOWN) {
1765  if (DCPS_debug_level > 0) {
1767  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1768  ACE_TEXT("add_domain_participant returned invalid id.\n")));
1769  }
1770  return DDS::RETCODE_ERROR;
1771  }
1772 
1773 #ifdef OPENDDS_SECURITY
1774  }
1775 #endif
1776 
1777  dp_id_ = value.id;
1778  federated_ = value.federated;
1779 
1780  if (monitor_) {
1781  monitor_->report();
1782  }
1783 
1784  if (TheServiceParticipant->monitor_) {
1785  TheServiceParticipant->monitor_->report();
1786  }
1787 
1788  const DDS::ReturnCode_t ret = this->set_enabled();
1789 
1790  if (DCPS_debug_level > 1) {
1791  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DomainParticipantImpl::enable: ")
1792  ACE_TEXT("enabled participant %C in domain %d\n"),
1793  LogGuid(dp_id_).c_str(), domain_id_));
1794  }
1795 
1796  if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
1797  Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
1798  this->bit_subscriber_ = disc->init_bit(this);
1799  }
1800 
1801  if (ret != DDS::RETCODE_OK) {
1802  return ret;
1803  }
1804 
1806 
1807  for (TopicMap::iterator it = topics_.begin(); it != topics_.end(); ++it) {
1808  it->second.pair_.svt_->enable();
1809  }
1810 
1811  for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
1812  it->svt_->enable();
1813  }
1814 
1815  for (SubscriberSet::iterator it = subscribers_.begin(); it != subscribers_.end(); ++it) {
1816  it->svt_->enable();
1817  }
1818  }
1819 
1820  return DDS::RETCODE_OK;
1821 }
1822 
1823 GUID_t
1825 {
1826  return dp_id_;
1827 }
1828 
1831 {
1833 }
1834 
1835 
1838 {
1840 }
1841 
1843 {
1845  if (id == GUID_UNKNOWN) {
1846  const DDS::InstanceHandle_t ih =
1848  if (DCPS_debug_level > 5) {
1849  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1850  "New unmapped InstanceHandle %d\n", ih));
1851  }
1852  return ih;
1853  }
1854 
1855  const CountedHandleMap::iterator location = handles_.find(id);
1856  if (location == handles_.end()) {
1857  const DDS::InstanceHandle_t handle =
1859  if (DCPS_debug_level > 5) {
1860  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1861  "New mapped InstanceHandle %d for %C\n",
1862  handle, LogGuid(id).c_str()));
1863  }
1864  handles_[id] = std::make_pair(handle, 1);
1865  repoIds_[handle] = id;
1867  return handle;
1868  }
1869 
1870  HandleWithCounter& mapped = location->second;
1871  ++mapped.second;
1872  if (DCPS_debug_level > 5) {
1873  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1874  "Incremented refcount for InstanceHandle %d to %d\n",
1875  mapped.first, mapped.second));
1876  }
1877  return mapped.first;
1878 }
1879 
1881  TimeDuration max_wait) const
1882 {
1883  MonotonicTimePoint expire_at = MonotonicTimePoint::now() + max_wait;
1885  CountedHandleMap::const_iterator iter = handles_.find(id);
1887  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1888  while (res == CvStatus_NoTimeout && iter == handles_.end()) {
1889  res = max_wait.is_zero() ? handle_waiters_.wait(thread_status_manager) : handle_waiters_.wait_until(expire_at, thread_status_manager);
1890  iter = handles_.find(id);
1891  }
1892  return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
1893 }
1894 
1896 {
1898  const CountedHandleMap::const_iterator iter = handles_.find(id);
1899  return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
1900 }
1901 
1903 {
1905  const RepoIdMap::iterator r_iter = repoIds_.find(handle);
1906  if (r_iter == repoIds_.end()) {
1907  reusable_handles_.add(handle);
1908  if (DCPS_debug_level > 5) {
1909  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
1910  "Returned unmapped InstanceHandle %d\n", handle));
1911  }
1912  return;
1913  }
1914 
1915  const CountedHandleMap::iterator h_iter = handles_.find(r_iter->second);
1916  if (h_iter == handles_.end()) {
1917  return;
1918  }
1919 
1920  if (DCPS_debug_level > 5) {
1921  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
1922  "Returned mapped InstanceHandle %d refcount %d\n",
1923  handle, h_iter->second.second));
1924  }
1925 
1926  HandleWithCounter& mapped = h_iter->second;
1927  if (--mapped.second == 0) {
1928  handles_.erase(h_iter);
1929  repoIds_.erase(r_iter);
1930  reusable_handles_.add(handle);
1931  }
1932 }
1933 
1935 {
1937  const RepoIdMap::const_iterator location = repoIds_.find(handle);
1938  return location == repoIds_.end() ? GUID_UNKNOWN : location->second;
1939 }
1940 
1941 DDS::Topic_ptr
1943  const char * topic_name,
1944  const char * type_name,
1945  const DDS::TopicQos & qos,
1946  DDS::TopicListener_ptr a_listener,
1947  const DDS::StatusMask & mask,
1948  OpenDDS::DCPS::TypeSupport_ptr type_support)
1949 {
1951  tao_mon,
1952  this->topics_protector_,
1953  DDS::Topic::_nil());
1954 
1955 #ifdef OPENDDS_SECURITY
1956  if (perm_handle_ && !topicIsBIT(topic_name, type_name)) {
1957  Security::AccessControl_var access = security_config_->get_access_control();
1958 
1960 
1962  if (!access->get_topic_sec_attributes(perm_handle_, topic_name, sec_attr, se)) {
1963  if (DCPS::security_debug.new_entity_warn) {
1965  ACE_TEXT("(%P|%t) WARNING: ")
1966  ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
1967  ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
1968  topic_name, se.code, se.minor_code, se.message.in()));
1969  }
1970  return DDS::Topic::_nil();
1971  }
1972 
1973  if ((sec_attr.is_write_protected || sec_attr.is_read_protected) &&
1974  !access->check_create_topic(perm_handle_, domain_id_, topic_name, qos, se)) {
1975  if (DCPS::security_debug.new_entity_warn) {
1977  ACE_TEXT("(%P|%t) WARNING: ")
1978  ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
1979  ACE_TEXT("Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
1980  topic_name, se.code, se.minor_code, se.message.in()));
1981  }
1982  return DDS::Topic::_nil();
1983  }
1984  }
1985 #endif
1986 
1987  TopicImpl* topic_servant = 0;
1988 
1989  ACE_NEW_RETURN(topic_servant,
1990  TopicImpl(topic_name,
1991  type_name,
1992  type_support,
1993  qos,
1994  a_listener,
1995  mask,
1996  this),
1997  DDS::Topic::_nil());
1998 
2000  const DDS::ReturnCode_t ret = topic_servant->enable();
2001 
2002  if (ret != DDS::RETCODE_OK) {
2004  ACE_TEXT("(%P|%t) WARNING: ")
2005  ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
2006  ACE_TEXT("enable failed.\n")));
2007  return DDS::Topic::_nil();
2008  }
2009  }
2010 
2011  DDS::Topic_ptr obj(topic_servant);
2012 
2013  // this object will also act as a guard against leaking the new TopicImpl
2014  RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, false));
2015  topics_.insert(std::make_pair(topic_name, refCounted_topic));
2016 
2017  if (this->monitor_) {
2018  this->monitor_->report();
2019  }
2020 
2021  // the topics_ map has one reference and we duplicate to give
2022  // the caller another reference.
2023  return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
2024 }
2025 
2026 bool DomainParticipantImpl::is_clean(String* leftover_entities) const
2027 {
2028  if (leftover_entities) {
2029  leftover_entities->clear();
2030  }
2031 
2032  // check that the only remaining topics are built-in topics
2033  size_t topic_count = 0;
2034  for (TopicMap::const_iterator it = topics_.begin(); it != topics_.end(); ++it) {
2035  if (!topicIsBIT(it->second.pair_.svt_->topic_name(), it->second.pair_.svt_->type_name())) {
2036  ++topic_count;
2037  }
2038  }
2039  if (topic_count) {
2040  *leftover_entities += to_dds_string(topic_count) + " topic(s)";
2041  }
2042 
2043  size_t sub_count = subscribers_.size();
2044  if (!TheTransientKludge->is_enabled()) {
2045  // There are built-in topics and built-in topic subscribers left.
2046  sub_count = sub_count <= 1 ? 0 : sub_count;
2047  }
2048  if (leftover_entities && sub_count) {
2049  if (leftover_entities->size()) {
2050  *leftover_entities += ", ";
2051  }
2052  *leftover_entities += to_dds_string(sub_count) + " subscriber(s)";
2053  }
2054 
2055  const size_t pub_count = publishers_.size();
2056  if (leftover_entities && pub_count) {
2057  if (leftover_entities->size()) {
2058  *leftover_entities += ", ";
2059  }
2060  *leftover_entities += to_dds_string(pub_count) + " publisher(s)";
2061  }
2062 
2063  return topic_count == 0 && sub_count == 0 && pub_count == 0;
2064 }
2065 
2066 DDS::DomainParticipantListener_ptr
2068 {
2070  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
2071  return DDS::DomainParticipantListener::_nil ();
2072  } else {
2073  return DDS::DomainParticipantListener::_duplicate(listener_.in());
2074  }
2075 }
2076 
2077 void
2079 {
2081  guard,
2082  this->topics_protector_);
2083 
2084  topics.reserve(topics_.size());
2085  for (TopicMap::iterator it(topics_.begin());
2086  it != topics_.end(); ++it) {
2087  topics.push_back(it->second.pair_.svt_->get_id());
2088  }
2089 }
2090 
2091 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2092 
2095 {
2096 #if !defined (DDS_HAS_MINIMUM_BIT)
2097  if (bit_subscriber_) {
2098  bit_subscriber_->bit_pub_listener_hack(this);
2099  } else {
2100  if (log_level >= LogLevel::Warning) {
2102  "(%P|%t) WARNING: DomainParticipantImpl::ownership_manager: bit_subscriber_ is null"));
2103  }
2104  }
2105 #endif
2106  return &owner_man_;
2107 }
2108 
2109 void
2111  const CORBA::Long& ownership_strength)
2112 {
2114  tao_mon,
2115  this->subscribers_protector_);
2116 
2117  if (this->get_deleted ())
2118  return;
2119 
2120  for (SubscriberSet::iterator it(this->subscribers_.begin());
2121  it != this->subscribers_.end(); ++it) {
2122  it->svt_->update_ownership_strength(pub_id, ownership_strength);
2123  }
2124 }
2125 
2126 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2127 
2129  base_(base),
2130  serial_(0),
2131  builder_(base_)
2132 {
2133 }
2134 
2135 GUID_t
2137 {
2139  return builder_;
2140 }
2141 
2142 
2143 ////////////////////////////////////////////////////////////////
2144 
2145 
2146 bool
2148 {
2149  if (pub_qos == PUBLISHER_QOS_DEFAULT) {
2150  this->get_default_publisher_qos(pub_qos);
2151  }
2152 
2154 
2155  if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
2156  if (DCPS_debug_level > 0) {
2158  ACE_TEXT("(%P|%t) ERROR: ")
2159  ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
2160  ACE_TEXT("invalid qos.\n")));
2161  }
2162  return false;
2163  }
2164 
2165  return true;
2166 }
2167 
2168 bool
2170 {
2171  if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
2172  this->get_default_subscriber_qos(subscriber_qos);
2173  }
2174 
2176 
2177  if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
2178  if (DCPS_debug_level > 0) {
2180  ACE_TEXT("(%P|%t) ERROR: ")
2181  ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
2182  ACE_TEXT("invalid qos.\n")));
2183  }
2184  return false;
2185  }
2186 
2187 
2188  return true;
2189 }
2190 
2193  const DDS::SubscriberQos& subscriber_qos,
2194  const DDS::DataReaderQos& datareader_qos,
2195  const RecorderListener_rch& a_listener,
2196  DDS::StatusMask mask)
2197 {
2198  if (CORBA::is_nil(a_topic)) {
2199  if (DCPS_debug_level > 0) {
2201  ACE_TEXT("(%P|%t) ERROR: ")
2202  ACE_TEXT("DomainParticipantImpl::create_recorder, ")
2203  ACE_TEXT("topic desc is nil.\n")));
2204  }
2205  return 0;
2206  }
2207 
2208  DDS::SubscriberQos sub_qos = subscriber_qos;
2209  DDS::DataReaderQos dr_qos;
2210 
2211  if (! this->validate_subscriber_qos(sub_qos) ||
2212  ! SubscriberImpl::validate_datareader_qos(datareader_qos,
2213  TheServiceParticipant->initial_DataReaderQos(),
2214  a_topic,
2215  dr_qos, false) ) {
2216  return 0;
2217  }
2218 
2219  RecorderImpl* recorder(new RecorderImpl);
2220  Recorder_var result(recorder);
2221 
2222  recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
2223  dr_qos, a_listener,
2224  mask, this, sub_qos);
2225 
2227  recorder->enable();
2228  }
2229 
2231  recorders_.insert(result);
2232 
2233  return result._retn();
2234 }
2235 
2238  const DDS::PublisherQos& publisher_qos,
2239  const DDS::DataWriterQos& datawriter_qos,
2240  const ReplayerListener_rch& a_listener,
2241  DDS::StatusMask mask)
2242 {
2243  if (CORBA::is_nil(a_topic)) {
2244  if (DCPS_debug_level > 0) {
2246  ACE_TEXT("(%P|%t) ERROR: ")
2247  ACE_TEXT("DomainParticipantImpl::create_replayer, ")
2248  ACE_TEXT("topic desc is nil.\n")));
2249  }
2250  return 0;
2251  }
2252 
2253  DDS::PublisherQos pub_qos = publisher_qos;
2254  DDS::DataWriterQos dw_qos;
2255 
2256  if (! this->validate_publisher_qos(pub_qos) ||
2257  ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
2258  TheServiceParticipant->initial_DataWriterQos(),
2259  a_topic,
2260  dw_qos)) {
2261  return 0;
2262  }
2263 
2264  TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
2265 
2266  ReplayerImpl* replayer(new ReplayerImpl);
2267  Replayer_var result(replayer);
2268 
2269  replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
2270 
2272  const DDS::ReturnCode_t ret = replayer->enable();
2273 
2274  if (ret != DDS::RETCODE_OK) {
2275  if (DCPS_debug_level > 0) {
2277  ACE_TEXT("(%P|%t) ERROR: ")
2278  ACE_TEXT("DomainParticipantImpl::create_replayer, ")
2279  ACE_TEXT("enable failed.\n")));
2280  }
2281  return 0;
2282  }
2283  }
2284 
2286  replayers_.insert(result);
2287  return result._retn();
2288 }
2289 
2290 void
2292 {
2293  const Recorder_var recvar(Recorder::_duplicate(recorder));
2295  recorders_.erase(recvar);
2296 }
2297 
2298 void
2300 {
2301  const Replayer_var repvar(Replayer::_duplicate(replayer));
2303  replayers_.erase(repvar);
2304 }
2305 
2306 void
2308 {
2309  automatic_liveliness_timer_->add_adjust(writer);
2310  participant_liveliness_timer_->add_adjust(writer);
2311 }
2312 
2313 void
2315 {
2316  automatic_liveliness_timer_->remove_adjust();
2317  participant_liveliness_timer_->remove_adjust();
2318 }
2319 
2322  : impl_(impl)
2323  , kind_(kind)
2324  , interval_(TimeDuration::max_value)
2325  , recalculate_interval_(false)
2326  , scheduled_(false)
2327 { }
2328 
2330 {
2331 }
2332 
2333 void
2335 {
2336  ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2337 
2339 
2340  // Calculate the time remaining to liveliness check.
2341  const TimeDuration remaining = interval_ - (now - last_liveliness_check_);
2342 
2343  // Adopt a smaller interval.
2344  interval_ = std::min(interval_, writer->liveliness_check_interval(kind_));
2345 
2346  // Reschedule or schedule a timer if necessary.
2347  if (scheduled_ && interval_ < remaining) {
2348  cancel();
2349  schedule(interval_);
2350  } else if (!scheduled_) {
2351  schedule(interval_);
2352  scheduled_ = true;
2353  last_liveliness_check_ = now;
2354  }
2355 }
2356 
2357 void
2359 {
2360  ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2361 
2362  recalculate_interval_ = true;
2363 }
2364 
2366 {
2367  ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2368 
2369  if (recalculate_interval_) {
2372  while (recalculate_interval_) {
2373  recalculate_interval_ = false;
2374  ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rev_guard, rev_lock);
2376  }
2377  interval_ = interval;
2378  }
2379 
2380  scheduled_ = false;
2381 
2382  if (!interval_.is_max()) {
2383  dispatch(now);
2384  last_liveliness_check_ = now;
2386  scheduled_ = true;
2387  }
2388 }
2389 
2392 { }
2393 
2394 void
2396 {
2398 }
2399 
2402 { }
2403 
2404 void
2406 {
2409  }
2410 }
2411 
2414 {
2416 
2418  tao_mon,
2420  tv);
2421 
2422  for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
2423  tv = std::min(tv, it->svt_->liveliness_check_interval(kind));
2424  }
2425 
2426  return tv;
2427 }
2428 
2429 bool
2431 {
2432  if (last_liveliness_activity_ > tv) {
2433  return true;
2434  }
2435 
2437 
2438  for (PublisherSet::iterator it(publishers_.begin());
2439  it != publishers_.end(); ++it) {
2440  if (it->svt_->participant_liveliness_activity_after(tv)) {
2441  return true;
2442  }
2443  }
2444 
2445  return false;
2446 }
2447 
2448 void
2450 {
2451  TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
2452 }
2453 
2454 int
2456 {
2457  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2458 
2460 
2461  automatic_liveliness_timer_->cancel();
2463 
2464  // delete publishers
2465  {
2467  tao_mon,
2468  this->publishers_protector_,
2470 
2471  PublisherSet::iterator pubIter = publishers_.begin();
2472  DDS::Publisher_ptr pubPtr;
2473  size_t pubsize = publishers_.size();
2474 
2475  while (pubsize > 0) {
2476  pubPtr = (*pubIter).obj_.in();
2477  ++pubIter;
2478 
2479  DDS::ReturnCode_t result = pubPtr->delete_contained_entities();
2480  if (result != DDS::RETCODE_OK) {
2481  ret = result;
2482  }
2483 
2484  result = delete_publisher(pubPtr);
2485 
2486  if (result != DDS::RETCODE_OK) {
2487  ret = result;
2488  }
2489 
2490  --pubsize;
2491  }
2492 
2493  }
2494 
2495  // delete subscribers
2496  {
2498  tao_mon,
2499  this->subscribers_protector_,
2501 
2502  SubscriberSet::iterator subIter = subscribers_.begin();
2503  DDS::Subscriber_ptr subPtr;
2504  size_t subsize = subscribers_.size();
2505 
2506  while (subsize > 0) {
2507  subPtr = (*subIter).obj_.in();
2508  ++subIter;
2509 
2510  DDS::ReturnCode_t result = subPtr->delete_contained_entities();
2511 
2512  if (result != DDS::RETCODE_OK) {
2513  ret = result;
2514  }
2515 
2516  result = delete_subscriber(subPtr);
2517 
2518  if (result != DDS::RETCODE_OK) {
2519  ret = result;
2520  }
2521 
2522  --subsize;
2523  }
2524  }
2525 
2526  {
2528  tao_mon,
2529  this->recorders_protector_,
2531 
2532  RecorderSet::iterator it = recorders_.begin();
2533  for (; it != recorders_.end(); ++it ){
2534  RecorderImpl* impl = dynamic_cast<RecorderImpl* >(it->in());
2536  if (impl) result = impl->cleanup();
2537  if (result != DDS::RETCODE_OK) ret = result;
2538  }
2539  recorders_.clear();
2540  }
2541 
2542  {
2544  tao_mon,
2545  this->replayers_protector_,
2547 
2548  ReplayerSet::iterator it = replayers_.begin();
2549  for (; it != replayers_.end(); ++it ){
2550  ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
2552  if (impl) result = impl->cleanup();
2553  if (result != DDS::RETCODE_OK) ret = result;
2554 
2555  }
2556 
2557  replayers_.clear();
2558  }
2559 
2560  // delete topics
2561  {
2563  tao_mon,
2564  this->topics_protector_,
2566 
2567  TopicMap::iterator topicIter = topics_.begin();
2568  DDS::Topic_ptr topicPtr;
2569  size_t topicsize = topics_.size();
2570 
2571  while (topicsize > 0) {
2572  topicPtr = topicIter->second.pair_.obj_.in();
2573  ++topicIter;
2574 
2575  // Delete the topic the reference count.
2576  const DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
2577 
2578  if (result != DDS::RETCODE_OK) {
2579  ret = result;
2580  }
2581  --topicsize;
2582  }
2583  }
2584 
2586  shutdown_result_ = ret;
2587  shutdown_complete_ = true;
2590 
2591  return 0;
2592 }
2593 
2595 {
2597  bool result = true;
2598  const PublisherSet::iterator end = publishers_.end();
2599  for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
2600  result &= i->svt_->prepare_to_delete_datawriters();
2601  }
2602  return result;
2603 }
2604 
2606 {
2608  bool result = true;
2609  const PublisherSet::iterator end = publishers_.end();
2610  for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
2611  result &= i->svt_->set_wait_pending_deadline(deadline);
2612  }
2613  return result;
2614 }
2615 
2616 #ifndef OPENDDS_SAFETY_PROFILE
2618  DDS::DynamicType_var& type, const DDS::BuiltinTopicKey_t& key)
2619 {
2620  if (!type_lookup_service_) {
2621  if (log_level >= LogLevel::Notice) {
2622  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2623  "Can't get a DynamicType, no type lookup service\n"));
2624  }
2625  return DDS::RETCODE_UNSUPPORTED;
2626  }
2627 
2628  XTypes::TypeInformation ti = type_lookup_service_->get_type_info(key);
2630  if (log_level >= LogLevel::Notice) {
2631  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2632  "Can't get a DynamicType, type info is missing complete\n"));
2633  }
2634  return DDS::RETCODE_NO_DATA;
2635  }
2636 
2638  const GUID_t entity = bit_key_to_guid(key);
2639  if (!type_lookup_service_->has_complete(ctid)) {
2640  // We don't have it, try to asking the remote for the complete
2641  // TypeObjects.
2642  if (DCPS_debug_level >= 4) {
2643  ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::get_dynamic_type: "
2644  "requesting remote complete TypeObject from %C\n", LogGuid(entity).c_str()));
2645  }
2646  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
2647  TypeObjReqCond cond;
2648  disco->request_remote_complete_type_objects(domain_id_, dp_id_, entity, ti, cond);
2649  const DDS::ReturnCode_t rc = cond.wait();
2650  if (rc != DDS::RETCODE_OK) {
2651  if (log_level >= LogLevel::Notice) {
2652  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2653  "Couldn't get remote complete type object: %C\n", retcode_to_string(rc)));
2654  }
2655  return rc;
2656  }
2657 
2658  if (!type_lookup_service_->has_complete(ctid)) {
2659  if (log_level >= LogLevel::Notice) {
2660  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2661  "request_remote_complete_type_objects succeeded, but type lookup service still says it "
2662  "doesn't have the complete TypeObject?\n"));
2663  }
2664  return DDS::RETCODE_ERROR;
2665  }
2666  }
2667 
2668  DDS::DynamicType_var got_type = type_lookup_service_->type_identifier_to_dynamic(ctid, entity);
2669  if (!XTypes::dynamic_type_is_valid(got_type)) {
2670  if (log_level >= LogLevel::Notice) {
2671  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2672  "Got an invalid DynamicType\n"));
2673  }
2674  return DDS::RETCODE_ERROR;
2675  }
2676  type = got_type;
2677 
2678  XTypes::DynamicTypeImpl* impl = dynamic_cast<XTypes::DynamicTypeImpl*>(type.in());
2679  impl->set_complete_type_identifier(ctid);
2681  impl->set_preset_type_info(ti);
2682 
2683  return DDS::RETCODE_OK;
2684 }
2685 #endif
2686 
2687 } // namespace DCPS
2688 } // namespace OpenDDS
2689 
virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle)
virtual DDS::InstanceHandle_t get_instance_handle()
PublisherSet publishers_
Collection of publishers.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< ParticipantLivelinessTimer > participant_liveliness_timer_
DDS::Topic_ptr create_topic_i(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask, int topic_mask)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
DDS::SubscriberQos default_subscriber_qos_
The default subscriber qos.
DDS::Security::ParticipantCryptoHandle part_crypto_handle_
This participant crypto handle given by crypto.
Send raw data samples in the system.
Definition: Replayer.h:60
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual DDS::Topic_ptr find_topic(const char *topic_name, const DDS::Duration_t &timeout)
ACE_Recursive_Thread_Mutex publishers_protector_
Protect the publisher collection.
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
const LogLevel::Value value
Definition: debug.cpp:61
virtual DDS::ReturnCode_t get_default_subscriber_qos(DDS::SubscriberQos &qos)
LivelinessTimer(DomainParticipantImpl &impl, DDS::LivelinessQosPolicyKind kind)
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
virtual DDS::ReturnCode_t assert_liveliness()
const InstanceHandle_t HANDLE_NIL
virtual DDS::ReturnCode_t set_default_subscriber_qos(const DDS::SubscriberQos &qos)
std::string String
EntityFactoryQosPolicy entity_factory
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
virtual void init(DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
virtual DDS::ReturnCode_t get_discovered_topics(DDS::InstanceHandleSeq &topic_handles)
void add_adjust_liveliness_timers(DataWriterImpl *writer)
DDS::TopicQos default_topic_qos_
The default topic qos.
DDS::PropertySeq filter_properties(const DDS::PropertySeq &properties, const std::string &prefix)
int access(const char *path, int amode)
void signal_liveliness(DDS::LivelinessQosPolicyKind kind)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
ENTITYKIND_BUILTIN_PARTICIPANT.
Definition: GuidUtils.h:68
virtual DDS::ReturnCode_t delete_contained_entities()
const DDS::DomainId_t domain_id_
The id of the domain that creates this participant.
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
SubscriberSet subscribers_
Collection of subscribers.
DDS::PublisherQos default_publisher_qos_
The default publisher qos.
virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic)
sequence< octet > key
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
DDS::Security::IdentityHandle id_handle_
This participant id handle given by authentication.
String to_dds_string(unsigned short to_convert)
character_type *& out(void)
virtual void schedule(const TimeDuration &interval)=0
virtual DDS::ReturnCode_t enable()
virtual DDS::ReturnCode_t ignore_publication(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t set_qos(const DDS::DomainParticipantQos &qos)
ACE_Recursive_Thread_Mutex recorders_protector_
Protect the recorders collection.
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
virtual DDS::ReturnCode_t delete_contentfilteredtopic(DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
HandleMap ignored_participants_
Collection of ignored participants.
bool dynamic_type_is_valid(DDS::DynamicType_ptr type)
Definition: XTypes/Utils.h:20
Implementation of Recorder functionality.
Definition: RecorderImpl.h:45
int acquire(void)
virtual DDS::ReturnCode_t ignore_participant(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t set_default_topic_qos(const DDS::TopicQos &qos)
virtual DDS::ReturnCode_t get_discovered_participants(DDS::InstanceHandleSeq &participant_handles)
virtual DDS::ReturnCode_t delete_publisher(DDS::Publisher_ptr p)
int sleep(u_int seconds)
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
Objref_Servant_Pair< TopicImpl, DDS::Topic, DDS::Topic_ptr, DDS::Topic_var > Topic_Pair
const ACE_Time_Value & value() const
Security::SecurityConfig_rch security_config_
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
bool topicIsBIT(const char *name, const char *type)
void return_handle(DDS::InstanceHandle_t handle)
DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind)
DisjointSequence::OrderedRanges< DDS::InstanceHandle_t > reusable_handles_
Keep track of handles that can be reused (use handle_protector_)
ACE_Recursive_Thread_Mutex topics_protector_
Protect the topic collection.
EntityKind entityKind() const
Extract the EntityKind value.
Implementation of Replayer functionality.
Definition: ReplayerImpl.h:61
GUID_t get_repoid(DDS::InstanceHandle_t id) const
ACE_Recursive_Thread_Mutex subscribers_protector_
Protect the subscriber collection.
int release(void)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
const DDS::StatusMask DEFAULT_STATUS_MASK
virtual DDS::Topic_ptr create_topic(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::ReturnCode_t delete_contained_entities()
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
#define OPENDDS_STRING
RcHandle< DCPS::BitSubscriber > get_builtin_subscriber_proxy()
bool validate_subscriber_qos(DDS::SubscriberQos &subscriber_qos)
DOMAINID_TYPE_NATIVE DomainId_t
CountedHandleMap handles_
Instance handles assigned which are mapped to GUIDs (use handle_protector_)
LM_DEBUG
#define Registered_Data_Types
virtual DDS::ReturnCode_t delete_topic(DDS::Topic_ptr a_topic)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
DDS::ReturnCode_t wait()
Definition: Discovery.cpp:29
ACE_Recursive_Thread_Mutex replayers_protector_
Protect the replayers collection.
RcHandle< FilterEvaluator > get_filter_eval(const char *filter)
virtual DDS::ReturnCode_t get_current_time(DDS::Time_t &current_time)
ACE_CDR::ULong ULong
static bool validate_datawriter_qos(const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
virtual DDS::ReturnCode_t get_default_publisher_qos(DDS::PublisherQos &qos)
RcHandle< AutomaticLivelinessTimer > automatic_liveliness_timer_
RepoIdSequence pub_id_gen_
Publisher ID generator.
TypeIdentifierWithDependencies complete
Definition: TypeObject.h:3374
virtual DDS::DomainParticipantListener_ptr get_listener()
ACE_Thread_Mutex handle_protector_
Protect the handle collection.
ACE_CDR::Boolean Boolean
DDS::ReturnCode_t get_dynamic_type(DDS::DynamicType_var &type, const DDS::BuiltinTopicKey_t &key)
virtual DDS::ReturnCode_t set_default_publisher_qos(const DDS::PublisherQos &qos)
DDS::ReturnCode_t cleanup()
ConditionVariable< ACE_Thread_Mutex > shutdown_condition_
#define TheTransientKludge
TopicMap topics_
Collection of topics.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
static ACE_thread_t self(void)
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
virtual DDS::Subscriber_ptr get_builtin_subscriber()
virtual DDS::ReturnCode_t delete_contained_entities()
DDS::ReturnCode_t delete_topic_i(DDS::Topic_ptr a_topic, bool remove_objref)
DDS::ReturnCode_t enable()
virtual DDS::ReturnCode_t get_discovered_participant_data(DDS::ParticipantBuiltinTopicData &participant_data, DDS::InstanceHandle_t participant_handle)
LM_NOTICE
GUID_t dp_id_
This participant id given by discovery.
std::pair< DDS::InstanceHandle_t, unsigned int > HandleWithCounter
ACE_Thread_Mutex shutdown_mutex_
Protect the shutdown.
#define TheSecurityRegistry
bool is_clean(String *leftover_entities=0) const
Replayer_ptr create_replayer(DDS::Topic_ptr a_topic, const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos, const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
DDS::ReturnCode_t enable()
time_t sec(void) const
DDS::Topic_ptr create_typeless_topic(const char *topic_name, const char *type_name, bool type_has_keys, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
DDS::PropertySeq PropertySeq
Definition: RtpsCore.idl:49
DDS::InstanceHandle_t await_handle(const GUID_t &id, TimeDuration max_wait=TimeDuration::zero_value) const
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
#define PUBLISHER_QOS_DEFAULT
InstanceHandleGenerator & participant_handles_
Get instances handles from DomainParticipantFactory (use handle_protector_)
ConditionVariable< ACE_Thread_Mutex > handle_waiters_
_retn_type _retn(void)
virtual DDS::ReturnCode_t get_discovered_topic_data(DDS::TopicBuiltinTopicData &topic_data, DDS::InstanceHandle_t topic_handle)
LM_WARNING
bool is_clean(String *leftover_entities=0) const
void set_complete_type_identifier(const TypeIdentifier &ti)
const char * topicstatus_to_string(TopicStatus value)
Definition: DCPS_Utils.cpp:70
DDS::DomainParticipantQos qos_
The qos of this DomainParticipant.
The End User API.
bool isTopic() const
Returns true if the GUID represents a topic entity.
OPENDDS_STRING uniqueParticipantId() const
virtual DDS::ReturnCode_t set_listener(DDS::DomainParticipantListener_ptr a_listener, DDS::StatusMask mask)
bool validate_publisher_qos(DDS::PublisherQos &publisher_qos)
int strcmp(const char *s, const char *t)
TopicDescriptionMap topic_descrs_
Collection of TopicDescriptions which are not also Topics.
bool notify_all()
Unblock all of the threads waiting on this condition.
virtual DDS::ReturnCode_t enable()
Definition: TopicImpl.cpp:127
const char *const name
Definition: debug.cpp:60
void entityKey(long entityKey)
Definition: GuidBuilder.cpp:96
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
ACE_TEXT("TCP_Factory")
XTypes::TypeLookupService_rch type_lookup_service_
unsigned long StatusMask
const ReturnCode_t RETCODE_NOT_ENABLED
#define SUBSCRIBER_QOS_DEFAULT
const ReturnCode_t RETCODE_NO_DATA
virtual DDS::ReturnCode_t ignore_topic(DDS::InstanceHandle_t handle)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
static Recorder_ptr _duplicate(Recorder_ptr obj)
Definition: Recorder.cpp:26
static Replayer_ptr _duplicate(Replayer_ptr obj)
Definition: Replayer.cpp:29
void set_minimal_type_identifier(const TypeIdentifier &ti)
DDS::ReturnCode_t cleanup()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
Implements the DDS::TopicDescription interface.
HandleMap ignored_topics_
Collection of ignored topics.
void set_preset_type_info(const TypeInformation &type_info)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
RepoIdMap repoIds_
By-handle lookup of instance handles assigned to GUIDs (use handle_protector_)
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
const ReturnCode_t RETCODE_ERROR
virtual void dispatch(const MonotonicTimePoint &tv)=0
virtual DDS::MultiTopic_ptr create_multitopic(const char *name, const char *type_name, const char *subscription_expression, const DDS::StringSeq &expression_parameters)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int find(OpenDDS::DCPS::DomainParticipantImpl::TopicMap &c, const Key &key, OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type *&value)
This struct holds both object reference and the corresponding servant.
Definition: Definitions.h:99
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
const ReturnCode_t RETCODE_OK
virtual DDS::ReturnCode_t get_default_topic_qos(DDS::TopicQos &qos)
const ReturnCode_t RETCODE_UNSUPPORTED
static bool validate_datareader_qos(const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
RcHandle< BitSubscriber > bit_subscriber_
The built in topic subscriber.
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY
virtual DDS::ReturnCode_t get_qos(DDS::DomainParticipantQos &qos)
Recorder_ptr create_recorder(DDS::Topic_ptr a_topic, const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos, const RecorderListener_rch &a_listener, DDS::StatusMask mask)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
virtual DDS::ReturnCode_t delete_subscriber(DDS::Subscriber_ptr s)
void add_adjust(OpenDDS::DCPS::DataWriterImpl *writer)
The wait has returned because it was woken up.
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const character_type * in(void) const
unsigned long StatusKind
OpenDDS_Dcps_Export GUID_t bit_key_to_guid(const DDS::BuiltinTopicKey_t &key)
Definition: GuidUtils.h:251
virtual DDS::ReturnCode_t ignore_subscription(DDS::InstanceHandle_t handle)
DDS::Security::PermissionsHandle perm_handle_
This participant permissions handle given by access constrol.
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
std::pair< TopicMapIterator, TopicMapIterator > TopicMapIteratorPair
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
virtual DDS::ReturnCode_t enable()
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
virtual DDS::Publisher_ptr create_publisher(const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
bool is_clean(String *leftover_entities=0) const
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(const char *name, DDS::Topic_ptr related_topic, const char *filter_expression, const DDS::StringSeq &expression_parameters)
TypeIdentifierWithDependencies minimal
Definition: TypeObject.h:3373
bool set_wait_pending_deadline(const MonotonicTimePoint &deadline)
virtual DDS::DomainParticipant_ptr get_participant()
LivelinessQosPolicyKind
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
Boolean is_nil(T x)
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
DDS::DomainParticipantListener_var listener_
Used to notify the entity for relevant events.
PropertyQosPolicy property
GUID_t get_id() const
Definition: TopicImpl.cpp:172
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
DDS::Topic_ptr create_new_topic(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::TypeSupport_ptr type_support)
#define TOPIC_QOS_DEFAULT
static const TimeDuration max_value
Definition: TimeDuration.h:32