Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "DomainParticipantImpl.h"
9 :
10 : #include "FeatureDisabledQosCheck.h"
11 : #include "Service_Participant.h"
12 : #include "Qos_Helper.h"
13 : #include "GuidConverter.h"
14 : #include "PublisherImpl.h"
15 : #include "SubscriberImpl.h"
16 : #include "DataWriterImpl.h"
17 : #include "Marked_Default_Qos.h"
18 : #include "Registered_Data_Types.h"
19 : #include "Transient_Kludge.h"
20 : #include "DomainParticipantFactoryImpl.h"
21 : #include "Util.h"
22 : #include "DCPS_Utils.h"
23 : #include "MonitorFactory.h"
24 : #include "ContentFilteredTopicImpl.h"
25 : #include "MultiTopicImpl.h"
26 : #include "Service_Participant.h"
27 : #include "RecorderImpl.h"
28 : #include "ReplayerImpl.h"
29 : #include "BuiltInTopicUtils.h"
30 : #include "transport/framework/TransportRegistry.h"
31 : #include "transport/framework/TransportExceptions.h"
32 : #ifdef OPENDDS_SECURITY
33 : # include "security/framework/SecurityRegistry.h"
34 : # include "security/framework/SecurityConfig.h"
35 : # include "security/framework/Properties.h"
36 : #endif
37 : #include "XTypes/Utils.h"
38 :
39 : #include <dds/DdsDcpsGuidC.h>
40 : #ifndef DDS_HAS_MINIMUM_BIT
41 : # include <dds/DdsDcpsCoreTypeSupportImpl.h>
42 : #endif
43 :
44 : #include <ace/Reactor.h>
45 : #include <ace/OS_NS_unistd.h>
46 :
47 : namespace Util {
48 :
49 : template <typename Key>
50 0 : int find(
51 : OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
52 : const Key& key,
53 : OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
54 : {
55 : OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
56 0 : c.find(key);
57 :
58 0 : if (iter == c.end()) {
59 0 : return -1;
60 : }
61 :
62 0 : value = &iter->second;
63 0 : return 0;
64 : }
65 :
66 0 : DDS::PropertySeq filter_properties(const DDS::PropertySeq& properties, const std::string& prefix)
67 : {
68 0 : DDS::PropertySeq result(properties.length());
69 0 : result.length(properties.length());
70 0 : unsigned int count = 0;
71 0 : for (unsigned int i = 0; i < properties.length(); ++i) {
72 0 : if (std::string(properties[i].name.in()).find(prefix) == 0) {
73 0 : result[count++] = properties[i];
74 : }
75 : }
76 0 : result.length(count);
77 0 : return result;
78 0 : }
79 :
80 : } // namespace Util
81 :
82 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
83 :
84 : namespace OpenDDS {
85 : namespace DCPS {
86 :
87 : //TBD - add check for enabled in most methods.
88 : // Currently this is not needed because auto_enable_created_entities
89 : // cannot be false.
90 :
91 : // Implementation skeleton constructor
92 0 : DomainParticipantImpl::DomainParticipantImpl(
93 : InstanceHandleGenerator& handle_generator,
94 : const DDS::DomainId_t& domain_id,
95 : const DDS::DomainParticipantQos& qos,
96 : DDS::DomainParticipantListener_ptr a_listener,
97 0 : const DDS::StatusMask& mask)
98 0 : : default_topic_qos_(TheServiceParticipant->initial_TopicQos())
99 0 : , default_publisher_qos_(TheServiceParticipant->initial_PublisherQos())
100 0 : , default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos())
101 0 : , qos_(qos)
102 : #ifdef OPENDDS_SECURITY
103 0 : , id_handle_(DDS::HANDLE_NIL)
104 0 : , perm_handle_(DDS::HANDLE_NIL)
105 0 : , part_crypto_handle_(DDS::HANDLE_NIL)
106 : #endif
107 0 : , domain_id_(domain_id)
108 0 : , dp_id_(GUID_UNKNOWN)
109 0 : , federated_(false)
110 0 : , handle_waiters_(handle_protector_)
111 0 : , shutdown_condition_(shutdown_mutex_)
112 0 : , shutdown_complete_(false)
113 0 : , participant_handles_(handle_generator)
114 0 : , pub_id_gen_(dp_id_)
115 0 : , automatic_liveliness_timer_(make_rch<AutomaticLivelinessTimer>(ref(*this)))
116 0 : , automatic_liveliness_task_(make_rch<AutomaticLivelinessTask>(
117 : TheServiceParticipant->time_source(),
118 0 : TheServiceParticipant->interceptor(),
119 0 : automatic_liveliness_timer_,
120 0 : &LivelinessTimer::execute))
121 0 : , participant_liveliness_timer_(make_rch<ParticipantLivelinessTimer>(ref(*this)))
122 0 : , participant_liveliness_task_(make_rch<ParticipantLivelinessTask>(
123 : TheServiceParticipant->time_source(),
124 0 : TheServiceParticipant->interceptor(),
125 0 : participant_liveliness_timer_,
126 0 : &LivelinessTimer::execute))
127 : {
128 0 : (void) this->set_listener(a_listener, mask);
129 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_dp_monitor(this));
130 0 : type_lookup_service_ = make_rch<XTypes::TypeLookupService>();
131 0 : }
132 :
133 0 : DomainParticipantImpl::~DomainParticipantImpl()
134 : {
135 : #ifdef OPENDDS_SECURITY
136 0 : if (security_config_ && perm_handle_ != DDS::HANDLE_NIL) {
137 0 : Security::AccessControl_var access = security_config_->get_access_control();
138 0 : DDS::Security::SecurityException se;
139 0 : if (!access->return_permissions_handle(perm_handle_, se)) {
140 0 : if (DCPS::security_debug.auth_warn) {
141 0 : ACE_ERROR((LM_ERROR,
142 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::~DomainParticipantImpl: ")
143 : ACE_TEXT("Unable to return permissions handle. SecurityException[%d.%d]: %C\n"),
144 : se.code, se.minor_code, se.message.in()));
145 : }
146 : }
147 0 : }
148 : #endif
149 :
150 0 : }
151 :
152 : DDS::Publisher_ptr
153 0 : DomainParticipantImpl::create_publisher(
154 : const DDS::PublisherQos & qos,
155 : DDS::PublisherListener_ptr a_listener,
156 : DDS::StatusMask mask)
157 : {
158 0 : DDS::PublisherQos pub_qos = qos;
159 :
160 0 : if (! this->validate_publisher_qos(pub_qos))
161 0 : return DDS::Publisher::_nil();
162 :
163 : // Although Publisher entities have GUIDs assigned (see pub_id_gen_),
164 : // these are not GUIDs from the RTPS spec and
165 : // so the handle doesn't need to correlate to the GUID.
166 0 : const DDS::InstanceHandle_t handle = assign_handle();
167 :
168 0 : PublisherImpl* pub = 0;
169 0 : ACE_NEW_RETURN(pub,
170 : PublisherImpl(handle,
171 : pub_id_gen_.next(),
172 : pub_qos,
173 : a_listener,
174 : mask,
175 : this),
176 : DDS::Publisher::_nil());
177 :
178 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
179 0 : pub->enable();
180 : }
181 :
182 0 : DDS::Publisher_ptr pub_obj(pub);
183 :
184 : // this object will also act as the guard for leaking Publisher Impl
185 0 : Publisher_Pair pair(pub, pub_obj, false);
186 :
187 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
188 : tao_mon,
189 : this->publishers_protector_,
190 : DDS::Publisher::_nil());
191 :
192 0 : if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
193 0 : if (DCPS_debug_level > 0) {
194 0 : ACE_ERROR((LM_ERROR,
195 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
196 : ACE_TEXT("%p\n"),
197 : ACE_TEXT("insert")));
198 : }
199 0 : return DDS::Publisher::_nil();
200 : }
201 :
202 0 : return DDS::Publisher::_duplicate(pub_obj);
203 0 : }
204 :
205 : DDS::ReturnCode_t
206 0 : DomainParticipantImpl::delete_publisher(
207 : DDS::Publisher_ptr p)
208 : {
209 : // The servant's ref count should be 2 at this point,
210 : // one referenced by poa, one referenced by the publisher
211 : // set.
212 0 : PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
213 0 : if (!the_servant) {
214 0 : if (log_level >= LogLevel::Notice) {
215 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
216 : "Failed to obtain PublisherImpl\n"));
217 : }
218 0 : return DDS::RETCODE_ERROR;
219 : }
220 :
221 0 : const Publisher_Pair pub_pair(the_servant, p, true);
222 :
223 : {
224 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
225 : publishers_protector_, DDS::RETCODE_ERROR);
226 0 : if (publishers_.count(pub_pair) == 0) {
227 0 : if (log_level >= LogLevel::Notice) {
228 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
229 : "This publisher doesn't belong to this participant\n"));
230 : }
231 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
232 : }
233 0 : }
234 :
235 0 : String leftover_entities;
236 0 : if (!the_servant->is_clean(&leftover_entities)) {
237 0 : if (log_level >= LogLevel::Notice) {
238 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
239 : "The publisher is not empty. %C leftover\n",
240 : leftover_entities.c_str()));
241 : }
242 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
243 : }
244 :
245 0 : const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
246 0 : if (ret != DDS::RETCODE_OK) {
247 0 : if (log_level >= LogLevel::Notice) {
248 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
249 : "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
250 : }
251 0 : return ret;
252 : }
253 :
254 : {
255 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
256 : publishers_protector_, DDS::RETCODE_ERROR);
257 0 : if (remove(publishers_, pub_pair) == -1) {
258 0 : if (log_level >= LogLevel::Notice) {
259 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
260 : "publisher not found\n"));
261 : }
262 0 : return DDS::RETCODE_ERROR;
263 : }
264 0 : }
265 :
266 0 : return DDS::RETCODE_OK;
267 0 : }
268 :
269 : DDS::Subscriber_ptr
270 0 : DomainParticipantImpl::create_subscriber(
271 : const DDS::SubscriberQos & qos,
272 : DDS::SubscriberListener_ptr a_listener,
273 : DDS::StatusMask mask)
274 : {
275 0 : DDS::SubscriberQos sub_qos = qos;
276 :
277 0 : if (! this->validate_subscriber_qos(sub_qos)) {
278 0 : return DDS::Subscriber::_nil();
279 : }
280 :
281 0 : const DDS::InstanceHandle_t handle = assign_handle();
282 :
283 0 : SubscriberImpl* sub = 0;
284 0 : ACE_NEW_RETURN(sub,
285 : SubscriberImpl(handle,
286 : sub_qos,
287 : a_listener,
288 : mask,
289 : this),
290 : DDS::Subscriber::_nil());
291 :
292 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
293 0 : sub->enable();
294 : }
295 :
296 0 : DDS::Subscriber_ptr sub_obj(sub);
297 :
298 0 : Subscriber_Pair pair(sub, sub_obj, false);
299 :
300 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
301 : tao_mon,
302 : this->subscribers_protector_,
303 : DDS::Subscriber::_nil());
304 :
305 0 : if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
306 0 : if (DCPS_debug_level > 0) {
307 0 : ACE_ERROR((LM_ERROR,
308 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
309 : ACE_TEXT("%p\n"),
310 : ACE_TEXT("insert")));
311 : }
312 0 : return DDS::Subscriber::_nil();
313 : }
314 :
315 0 : return DDS::Subscriber::_duplicate(sub_obj);
316 0 : }
317 :
318 : DDS::ReturnCode_t
319 0 : DomainParticipantImpl::delete_subscriber(
320 : DDS::Subscriber_ptr s)
321 : {
322 : // The servant's ref count should be 2 at this point,
323 : // one referenced by poa, one referenced by the subscriber
324 : // set.
325 0 : SubscriberImpl* const the_servant = dynamic_cast<SubscriberImpl*>(s);
326 0 : if (!the_servant) {
327 0 : if (log_level >= LogLevel::Notice) {
328 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
329 : "Failed to obtain SubscriberImpl\n"));
330 : }
331 0 : return DDS::RETCODE_ERROR;
332 : }
333 :
334 0 : const Subscriber_Pair sub_pair(the_servant, s, true);
335 :
336 : {
337 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
338 : subscribers_protector_, DDS::RETCODE_ERROR);
339 0 : if (subscribers_.count(sub_pair) == 0) {
340 0 : if (log_level >= LogLevel::Notice) {
341 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
342 : "This subscriber doesn't belong to this participant\n"));
343 : }
344 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
345 : }
346 0 : }
347 :
348 0 : String leftover_entities;
349 0 : if (!the_servant->is_clean(&leftover_entities)) {
350 0 : if (log_level >= LogLevel::Notice) {
351 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
352 : "The subscriber is not empty. %C leftover\n",
353 : leftover_entities.c_str()));
354 : }
355 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
356 : }
357 :
358 0 : const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
359 0 : if (ret != DDS::RETCODE_OK) {
360 0 : if (log_level >= LogLevel::Notice) {
361 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
362 : "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
363 : }
364 0 : return ret;
365 : }
366 :
367 : {
368 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
369 : subscribers_protector_, DDS::RETCODE_ERROR);
370 0 : if (remove(subscribers_, sub_pair) == -1) {
371 0 : if (log_level >= LogLevel::Notice) {
372 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
373 : "subscriber not found\n"));
374 : }
375 0 : return DDS::RETCODE_ERROR;
376 : }
377 0 : }
378 :
379 0 : return DDS::RETCODE_OK;
380 0 : }
381 :
382 : DDS::Subscriber_ptr
383 0 : DomainParticipantImpl::get_builtin_subscriber()
384 : {
385 0 : return bit_subscriber_->get();
386 : }
387 :
388 : RcHandle<BitSubscriber>
389 0 : DomainParticipantImpl::get_builtin_subscriber_proxy()
390 : {
391 0 : return bit_subscriber_;
392 : }
393 :
394 : DDS::Topic_ptr
395 0 : DomainParticipantImpl::create_topic(
396 : const char * topic_name,
397 : const char * type_name,
398 : const DDS::TopicQos & qos,
399 : DDS::TopicListener_ptr a_listener,
400 : DDS::StatusMask mask)
401 : {
402 0 : return create_topic_i(topic_name,
403 : type_name,
404 : qos,
405 : a_listener,
406 : mask,
407 0 : 0);
408 : }
409 :
410 : DDS::Topic_ptr
411 0 : DomainParticipantImpl::create_typeless_topic(
412 : const char * topic_name,
413 : const char * type_name,
414 : bool type_has_keys,
415 : const DDS::TopicQos & qos,
416 : DDS::TopicListener_ptr a_listener,
417 : DDS::StatusMask mask)
418 : {
419 0 : int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
420 :
421 0 : return create_topic_i(topic_name,
422 : type_name,
423 : qos,
424 : a_listener,
425 : mask,
426 0 : topic_mask);
427 : }
428 :
429 :
430 : DDS::Topic_ptr
431 0 : DomainParticipantImpl::create_topic_i(
432 : const char * topic_name,
433 : const char * type_name,
434 : const DDS::TopicQos & qos,
435 : DDS::TopicListener_ptr a_listener,
436 : DDS::StatusMask mask,
437 : int topic_mask)
438 : {
439 0 : DDS::TopicQos topic_qos;
440 :
441 0 : if (qos == TOPIC_QOS_DEFAULT) {
442 0 : this->get_default_topic_qos(topic_qos);
443 :
444 : } else {
445 0 : topic_qos = qos;
446 : }
447 :
448 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
449 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
450 : OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
451 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
452 :
453 0 : if (!Qos_Helper::valid(topic_qos)) {
454 0 : if (DCPS_debug_level > 0) {
455 0 : ACE_ERROR((LM_ERROR,
456 : ACE_TEXT("(%P|%t) ERROR: ")
457 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
458 : ACE_TEXT("invalid qos.\n")));
459 : }
460 0 : return DDS::Topic::_nil();
461 : }
462 :
463 0 : if (!Qos_Helper::consistent(topic_qos)) {
464 0 : if (DCPS_debug_level > 0) {
465 0 : ACE_ERROR((LM_ERROR,
466 : ACE_TEXT("(%P|%t) ERROR: ")
467 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
468 : ACE_TEXT("inconsistent qos.\n")));
469 : }
470 0 : return DDS::Topic::_nil();
471 : }
472 :
473 : // See if there is a Topic with the same name.
474 0 : TopicMap::mapped_type* entry = 0;
475 0 : bool found = false;
476 : {
477 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
478 : tao_mon,
479 : this->topics_protector_,
480 : DDS::Topic::_nil());
481 :
482 : #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
483 0 : if (topic_descrs_.count(topic_name)) {
484 0 : if (DCPS_debug_level > 3) {
485 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
486 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
487 : ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
488 : ACE_TEXT("by a TopicDescription.\n"), topic_name));
489 : }
490 0 : return 0;
491 : }
492 : #endif
493 :
494 0 : if (Util::find(topics_, topic_name, entry) == 0) {
495 0 : found = true;
496 : }
497 0 : }
498 :
499 : /*
500 : * If there is a topic with the same name, return the topic if it has the
501 : * same type name and QoS, else it is an error.
502 : */
503 0 : if (found) {
504 0 : CORBA::String_var found_type = entry->pair_.svt_->get_type_name();
505 0 : if (ACE_OS::strcmp(type_name, found_type) == 0) {
506 0 : DDS::TopicQos found_qos;
507 0 : entry->pair_.svt_->get_qos(found_qos);
508 :
509 0 : if (topic_qos == found_qos) { // match type name, qos
510 : {
511 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
512 : tao_mon,
513 : this->topics_protector_,
514 : DDS::Topic::_nil());
515 0 : ++entry->client_refs_;
516 0 : }
517 0 : return DDS::Topic::_duplicate(entry->pair_.obj_.in());
518 :
519 : } else { // Same Name and Type, Different QoS
520 0 : if (DCPS_debug_level >= 1) {
521 0 : ACE_ERROR((LM_ERROR,
522 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
523 : ACE_TEXT("topic with name \"%C\" and type %C already exists, ")
524 : ACE_TEXT("but the QoS doesn't match.\n"),
525 : topic_name, type_name));
526 : }
527 :
528 0 : return DDS::Topic::_nil();
529 : }
530 :
531 0 : } else { // Same Name, Different Type
532 0 : if (DCPS_debug_level >= 1) {
533 0 : ACE_ERROR((LM_ERROR,
534 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
535 : ACE_TEXT("topic with name \"%C\" already exists, but its type, %C ")
536 : ACE_TEXT("is not the same as %C.\n"),
537 : topic_name, found_type.in(), type_name));
538 : }
539 :
540 0 : return DDS::Topic::_nil();
541 : }
542 :
543 0 : } else {
544 :
545 0 : OpenDDS::DCPS::TypeSupport_var type_support;
546 :
547 0 : if (0 == topic_mask) {
548 : // creating a topic with compile time type
549 0 : type_support = Registered_Data_Types->lookup(this, type_name);
550 0 : if (CORBA::is_nil(type_support)) {
551 0 : if (DCPS_debug_level >= 1) {
552 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
553 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
554 : ACE_TEXT("can't create a topic=%C type_name=%C ")
555 : ACE_TEXT("is not registered.\n"),
556 : topic_name, type_name));
557 : }
558 0 : return DDS::Topic::_nil();
559 : }
560 : }
561 :
562 0 : DDS::Topic_var new_topic = create_new_topic(topic_name,
563 : type_name,
564 : topic_qos,
565 : a_listener,
566 : mask,
567 0 : type_support);
568 :
569 0 : if (!new_topic) {
570 0 : if (DCPS_debug_level > 0) {
571 0 : ACE_ERROR((LM_WARNING,
572 : ACE_TEXT("(%P|%t) WARNING: ")
573 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
574 : ACE_TEXT("create_new_topic failed.\n")));
575 : }
576 0 : return DDS::Topic::_nil();
577 : }
578 :
579 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
580 0 : if (new_topic->enable() != DDS::RETCODE_OK) {
581 0 : if (DCPS_debug_level > 0) {
582 0 : ACE_ERROR((LM_WARNING,
583 : ACE_TEXT("(%P|%t) WARNING: ")
584 : ACE_TEXT("DomainParticipantImpl::create_topic, ")
585 : ACE_TEXT("enable failed.\n")));
586 : }
587 0 : return DDS::Topic::_nil();
588 : }
589 : }
590 0 : return new_topic._retn();
591 0 : }
592 0 : }
593 :
594 : DDS::ReturnCode_t
595 0 : DomainParticipantImpl::delete_topic(
596 : DDS::Topic_ptr a_topic)
597 : {
598 0 : return delete_topic_i(a_topic, false);
599 : }
600 :
601 0 : DDS::ReturnCode_t DomainParticipantImpl::delete_topic_i(
602 : DDS::Topic_ptr a_topic,
603 : bool remove_objref)
604 : {
605 0 : DDS::ReturnCode_t ret = DDS::RETCODE_OK;
606 :
607 : try {
608 : // The servant's ref count should be greater than 2 at this point,
609 : // one referenced by poa, one referenced by the topic map and
610 : // others referenced by the datareader/datawriter.
611 0 : TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
612 :
613 0 : if (!the_topic_servant) {
614 0 : if (log_level >= LogLevel::Notice) {
615 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: %p\n"
616 : "failed to obtain TopicImpl."));
617 : }
618 0 : return DDS::RETCODE_ERROR;
619 : }
620 :
621 0 : DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
622 :
623 : DomainParticipantImpl* the_dp_servant =
624 0 : dynamic_cast<DomainParticipantImpl*>(dp.in());
625 :
626 0 : if (the_dp_servant != this) {
627 0 : if (log_level >= LogLevel::Notice) {
628 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
629 : "will return PRECONDITION_NOT_MET because this is not the "
630 : "participant that owns this topic\n"));
631 : }
632 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
633 : }
634 0 : if (!remove_objref && the_topic_servant->has_entity_refs()) {
635 : // If entity_refs is true (nonzero), then some reader or writer is using
636 : // this topic and the spec requires delete_topic() to fail with the error:
637 0 : if (log_level >= LogLevel::Notice) {
638 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
639 : "will return PRECONDITION_NOT_MET because there are still "
640 : "outstanding references to this topic\n"));
641 : }
642 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
643 : }
644 :
645 : {
646 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
647 : tao_mon,
648 : this->topics_protector_,
649 : DDS::RETCODE_ERROR);
650 :
651 0 : CORBA::String_var topic_name = the_topic_servant->get_name();
652 0 : TopicMap::mapped_type* entry = 0;
653 :
654 0 : TopicMapIteratorPair iters = topics_.equal_range(topic_name.in());
655 0 : TopicMapIterator iter;
656 0 : for (iter = iters.first; iter != iters.second; ++iter) {
657 0 : if (iter->second.pair_.svt_ == the_topic_servant) {
658 0 : entry = &iter->second;
659 0 : break;
660 : }
661 : }
662 0 : if (entry == 0) {
663 0 : if (log_level >= LogLevel::Notice) {
664 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: not found\n"));
665 : }
666 0 : return DDS::RETCODE_ERROR;
667 : }
668 :
669 0 : const CORBA::ULong client_refs = --entry->client_refs_;
670 :
671 0 : if (remove_objref || 0 == client_refs) {
672 0 : const GUID_t topicId = the_topic_servant->get_id();
673 0 : topics_.erase(iter);
674 :
675 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
676 0 : TopicStatus status = disco->remove_topic(
677 0 : the_dp_servant->get_domain_id(), the_dp_servant->get_id(), topicId);
678 :
679 0 : if (status != REMOVED) {
680 0 : if (log_level >= LogLevel::Notice) {
681 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
682 : "remove_topic failed with return value <%C>\n", topicstatus_to_string(status)));
683 : }
684 0 : return DDS::RETCODE_ERROR;
685 : }
686 :
687 0 : return DDS::RETCODE_OK;
688 :
689 0 : } else {
690 0 : if (DCPS_debug_level > 4) {
691 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::delete_topic_i: "
692 : "Didn't remove topic from the map, remove_objref %d client_refs %d\n",
693 : remove_objref, client_refs));
694 : }
695 : }
696 0 : }
697 :
698 0 : } catch (...) {
699 0 : if (log_level >= LogLevel::Notice) {
700 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
701 : " Caught Unknown Exception\n"));
702 : }
703 0 : ret = DDS::RETCODE_ERROR;
704 0 : }
705 :
706 0 : return ret;
707 : }
708 :
709 : DDS::Topic_ptr
710 0 : DomainParticipantImpl::find_topic(
711 : const char* topic_name,
712 : const DDS::Duration_t& timeout)
713 : {
714 0 : const MonotonicTimePoint timeout_at(MonotonicTimePoint::now() + TimeDuration(timeout));
715 :
716 0 : bool first_time = true;
717 0 : while (first_time || MonotonicTimePoint::now() < timeout_at) {
718 0 : if (first_time) {
719 0 : first_time = false;
720 : }
721 :
722 : GUID_t topic_id;
723 0 : CORBA::String_var type_name;
724 0 : DDS::TopicQos_var qos;
725 :
726 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
727 0 : TopicStatus status = disco->find_topic(domain_id_,
728 0 : get_id(),
729 : topic_name,
730 0 : type_name.out(),
731 : qos.out(),
732 : topic_id);
733 :
734 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
735 0 : if (status == FOUND) {
736 : OpenDDS::DCPS::TypeSupport_var type_support =
737 0 : Registered_Data_Types->lookup(this, type_name.in());
738 0 : if (CORBA::is_nil(type_support)) {
739 0 : if (DCPS_debug_level) {
740 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
741 : ACE_TEXT("DomainParticipantImpl::find_topic, ")
742 : ACE_TEXT("can't create a Topic: type_name \"%C\" ")
743 : ACE_TEXT("is not registered.\n"), type_name.in()));
744 : }
745 :
746 0 : return DDS::Topic::_nil();
747 : }
748 :
749 0 : DDS::Topic_ptr new_topic = create_new_topic(topic_name,
750 : type_name,
751 : qos,
752 : DDS::TopicListener::_nil(),
753 : OpenDDS::DCPS::DEFAULT_STATUS_MASK,
754 : type_support);
755 0 : return new_topic;
756 :
757 0 : } else if (status == INTERNAL_ERROR) {
758 0 : if (DCPS_debug_level > 0) {
759 0 : ACE_ERROR((LM_ERROR,
760 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
761 : ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
762 : }
763 0 : return DDS::Topic::_nil();
764 0 : } else if (now < timeout_at) {
765 0 : const TimeDuration remaining = timeout_at - now;
766 :
767 0 : if (remaining.value().sec() >= 1) {
768 0 : ACE_OS::sleep(1);
769 :
770 : } else {
771 0 : ACE_OS::sleep(remaining.value());
772 : }
773 0 : }
774 0 : }
775 :
776 0 : if (DCPS_debug_level >= 1) {
777 : // timed out
778 0 : ACE_DEBUG((LM_DEBUG,
779 : ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
780 : ACE_TEXT("timed out.\n")));
781 : }
782 :
783 0 : return DDS::Topic::_nil();
784 0 : }
785 :
786 : DDS::TopicDescription_ptr
787 0 : DomainParticipantImpl::lookup_topicdescription(const char* name)
788 : {
789 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
790 : tao_mon,
791 : this->topics_protector_,
792 : DDS::Topic::_nil());
793 :
794 0 : TopicMap::mapped_type* entry = 0;
795 :
796 0 : if (Util::find(topics_, name, entry) == -1) {
797 : #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
798 0 : TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
799 0 : if (iter != topic_descrs_.end()) {
800 0 : return DDS::TopicDescription::_duplicate(iter->second);
801 : }
802 : #endif
803 0 : return DDS::TopicDescription::_nil();
804 :
805 : } else {
806 0 : return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
807 : }
808 0 : }
809 :
810 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
811 :
812 : DDS::ContentFilteredTopic_ptr
813 0 : DomainParticipantImpl::create_contentfilteredtopic(
814 : const char* name,
815 : DDS::Topic_ptr related_topic,
816 : const char* filter_expression,
817 : const DDS::StringSeq& expression_parameters)
818 : {
819 0 : if (CORBA::is_nil(related_topic)) {
820 0 : if (DCPS_debug_level > 3) {
821 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
822 : ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
823 : ACE_TEXT("can't create a content-filtered topic due to null related ")
824 : ACE_TEXT("topic.\n")));
825 : }
826 0 : return 0;
827 : }
828 :
829 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
830 :
831 0 : if (topics_.count(name)) {
832 0 : if (DCPS_debug_level > 3) {
833 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
834 : ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
835 : ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
836 : ACE_TEXT("already in use by a Topic.\n"), name));
837 : }
838 0 : return 0;
839 : }
840 :
841 0 : if (topic_descrs_.count(name)) {
842 0 : if (DCPS_debug_level > 3) {
843 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
844 : ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
845 : ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
846 : ACE_TEXT("already in use by a TopicDescription.\n"), name));
847 : }
848 0 : return 0;
849 : }
850 :
851 0 : DDS::ContentFilteredTopic_var cft;
852 : try {
853 : // Create the cft in two steps so that we only have one place to
854 : // check the expression parameters
855 0 : cft = new ContentFilteredTopicImpl(name, related_topic, filter_expression, this);
856 0 : if (cft->set_expression_parameters(expression_parameters) != DDS::RETCODE_OK) {
857 0 : return 0;
858 : }
859 0 : } catch (const std::exception& e) {
860 0 : if (DCPS_debug_level) {
861 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
862 : ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
863 : ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
864 : ACE_TEXT("%C.\n"), e.what()));
865 : }
866 0 : return 0;
867 0 : }
868 0 : DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
869 0 : topic_descrs_[name] = td;
870 0 : return cft._retn();
871 0 : }
872 :
873 0 : DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
874 : DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
875 : {
876 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
877 : DDS::RETCODE_OUT_OF_RESOURCES);
878 : DDS::ContentFilteredTopic_var cft =
879 0 : DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
880 0 : CORBA::String_var name = cft->get_name();
881 0 : TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
882 0 : if (iter == topic_descrs_.end()) {
883 0 : if (DCPS_debug_level > 3) {
884 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
885 : ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
886 : ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
887 : ACE_TEXT("because it is not in the set.\n"), name.in ()));
888 : }
889 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
890 : }
891 :
892 0 : TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
893 :
894 0 : if (!tdi) {
895 0 : if (DCPS_debug_level > 3) {
896 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
897 : ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
898 : ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
899 : ACE_TEXT("failed to obtain TopicDescriptionImpl\n"), name.in()));
900 : }
901 0 : return DDS::RETCODE_ERROR;
902 : }
903 :
904 0 : if (tdi->has_entity_refs()) {
905 0 : if (DCPS_debug_level > 3) {
906 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
907 : ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
908 : ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
909 : ACE_TEXT("because it is used by a datareader\n"), name.in ()));
910 : }
911 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
912 : }
913 0 : topic_descrs_.erase(iter);
914 0 : return DDS::RETCODE_OK;
915 0 : }
916 :
917 : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
918 :
919 : #ifndef OPENDDS_NO_MULTI_TOPIC
920 :
921 0 : DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
922 : const char* name, const char* type_name,
923 : const char* subscription_expression,
924 : const DDS::StringSeq& expression_parameters)
925 : {
926 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
927 :
928 0 : if (topics_.count(name)) {
929 0 : if (DCPS_debug_level > 3) {
930 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
931 : ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
932 : ACE_TEXT("can't create a multi topic due to name \"%C\" ")
933 : ACE_TEXT("already in use by a Topic.\n"), name));
934 : }
935 0 : return 0;
936 : }
937 :
938 0 : if (topic_descrs_.count(name)) {
939 0 : if (DCPS_debug_level > 3) {
940 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
941 : ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
942 : ACE_TEXT("can't create a multi topic due to name \"%C\" ")
943 : ACE_TEXT("already in use by a TopicDescription.\n"), name));
944 : }
945 0 : return 0;
946 : }
947 :
948 0 : DDS::MultiTopic_var mt;
949 : try {
950 0 : mt = new MultiTopicImpl(name, type_name, subscription_expression,
951 0 : expression_parameters, this);
952 0 : } catch (const std::exception& e) {
953 0 : if (DCPS_debug_level) {
954 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
955 : ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
956 : ACE_TEXT("can't create a multi topic due to runtime error: ")
957 : ACE_TEXT("%C.\n"), e.what()));
958 : }
959 0 : return 0;
960 0 : }
961 0 : DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
962 0 : topic_descrs_[name] = td;
963 0 : return mt._retn();
964 0 : }
965 :
966 0 : DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
967 : DDS::MultiTopic_ptr a_multitopic)
968 : {
969 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
970 : DDS::RETCODE_OUT_OF_RESOURCES);
971 0 : DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
972 0 : CORBA::String_var mt_name = mt->get_name();
973 0 : TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
974 0 : if (iter == topic_descrs_.end()) {
975 0 : if (DCPS_debug_level > 3) {
976 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
977 : ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
978 : ACE_TEXT("can't delete a multitopic \"%C\" ")
979 : ACE_TEXT("because it is not in the set.\n"), mt_name.in ()));
980 : }
981 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
982 : }
983 :
984 0 : TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
985 :
986 0 : if (!tdi) {
987 0 : if (DCPS_debug_level > 3) {
988 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
989 : ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
990 : ACE_TEXT("can't delete a multitopic topic \"%C\" ")
991 : ACE_TEXT("failed to obtain TopicDescriptionImpl.\n"),
992 : mt_name.in()));
993 : }
994 0 : return DDS::RETCODE_ERROR;
995 : }
996 :
997 0 : if (tdi->has_entity_refs()) {
998 0 : if (DCPS_debug_level > 3) {
999 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1000 : ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
1001 : ACE_TEXT("can't delete a multitopic topic \"%C\" ")
1002 : ACE_TEXT("because it is used by a datareader.\n"), mt_name.in ()));
1003 : }
1004 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1005 : }
1006 0 : topic_descrs_.erase(iter);
1007 0 : return DDS::RETCODE_OK;
1008 0 : }
1009 :
1010 : #endif // OPENDDS_NO_MULTI_TOPIC
1011 :
1012 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1013 :
1014 : RcHandle<FilterEvaluator>
1015 0 : DomainParticipantImpl::get_filter_eval(const char* filter)
1016 : {
1017 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
1018 : RcHandle<FilterEvaluator>());
1019 :
1020 0 : RcHandle<FilterEvaluator>& result = filter_cache_[filter];
1021 0 : if (!result) {
1022 : try {
1023 0 : result = make_rch<FilterEvaluator>(filter, false);
1024 0 : } catch (const std::exception& e) {
1025 0 : filter_cache_.erase(filter);
1026 0 : if (DCPS_debug_level) {
1027 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1028 : ACE_TEXT("DomainParticipantImpl::get_filter_eval, ")
1029 : ACE_TEXT("can't create a writer-side content filter due to ")
1030 : ACE_TEXT("runtime error: %C.\n"), e.what()));
1031 : }
1032 0 : }
1033 : }
1034 0 : return result;
1035 0 : }
1036 :
1037 : void
1038 0 : DomainParticipantImpl::deref_filter_eval(const char* filter)
1039 : {
1040 0 : ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
1041 : typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
1042 0 : Map::iterator iter = filter_cache_.find(filter);
1043 0 : if (iter != filter_cache_.end()) {
1044 0 : if (iter->second->ref_count() == 1) {
1045 0 : filter_cache_.erase(iter);
1046 : }
1047 : }
1048 0 : }
1049 :
1050 : #endif
1051 :
1052 : DDS::ReturnCode_t
1053 0 : DomainParticipantImpl::delete_contained_entities()
1054 : {
1055 0 : if (!get_deleted()) {
1056 : // mark that the entity is being deleted
1057 0 : set_deleted(true);
1058 :
1059 0 : if (!prepare_to_delete_datawriters()) {
1060 0 : return DDS::RETCODE_ERROR;
1061 : }
1062 0 : if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
1063 0 : return DDS::RETCODE_ERROR;
1064 : }
1065 : }
1066 :
1067 : // BIT subscriber and data readers will be deleted with the
1068 : // rest of the entities, so need to report to discovery that
1069 : // BIT is no longer available
1070 0 : Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
1071 0 : if (disc)
1072 0 : disc->fini_bit(this);
1073 :
1074 0 : if (ACE_OS::thr_equal(TheServiceParticipant->reactor_owner(),
1075 : ACE_Thread::self())) {
1076 0 : handle_exception(0);
1077 :
1078 : } else {
1079 0 : TheServiceParticipant->reactor()->notify(this);
1080 :
1081 0 : shutdown_mutex_.acquire();
1082 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1083 0 : while (!shutdown_complete_) {
1084 0 : shutdown_condition_.wait(thread_status_manager);
1085 : }
1086 0 : shutdown_complete_ = false;
1087 0 : shutdown_mutex_.release();
1088 : }
1089 :
1090 0 : bit_subscriber_.reset();
1091 :
1092 0 : Registered_Data_Types->unregister_participant(this);
1093 :
1094 : // the participant can now start creating new contained entities
1095 0 : set_deleted(false);
1096 0 : return shutdown_result_;
1097 0 : }
1098 :
1099 : CORBA::Boolean
1100 0 : DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
1101 : {
1102 : /// Check top-level containers for Topic, Subscriber,
1103 : /// and Publisher instances.
1104 : {
1105 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1106 : guard,
1107 : this->topics_protector_,
1108 : false);
1109 :
1110 0 : for (TopicMap::iterator it(topics_.begin());
1111 0 : it != topics_.end(); ++it) {
1112 0 : if (a_handle == it->second.pair_.svt_->get_instance_handle())
1113 0 : return true;
1114 : }
1115 0 : }
1116 :
1117 : {
1118 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1119 : guard,
1120 : this->subscribers_protector_,
1121 : false);
1122 :
1123 0 : for (SubscriberSet::iterator it(subscribers_.begin());
1124 0 : it != subscribers_.end(); ++it) {
1125 0 : if (a_handle == it->svt_->get_instance_handle())
1126 0 : return true;
1127 : }
1128 0 : }
1129 :
1130 : {
1131 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1132 : guard,
1133 : this->publishers_protector_,
1134 : false);
1135 :
1136 0 : for (PublisherSet::iterator it(publishers_.begin());
1137 0 : it != publishers_.end(); ++it) {
1138 0 : if (a_handle == it->svt_->get_instance_handle())
1139 0 : return true;
1140 : }
1141 0 : }
1142 :
1143 : /// Recurse into SubscriberImpl and PublisherImpl for
1144 : /// DataReader and DataWriter instances respectively.
1145 0 : for (SubscriberSet::iterator it(subscribers_.begin());
1146 0 : it != subscribers_.end(); ++it) {
1147 0 : if (it->svt_->contains_reader(a_handle))
1148 0 : return true;
1149 : }
1150 :
1151 0 : for (PublisherSet::iterator it(publishers_.begin());
1152 0 : it != publishers_.end(); ++it) {
1153 0 : if (it->svt_->contains_writer(a_handle))
1154 0 : return true;
1155 : }
1156 :
1157 0 : return false;
1158 : }
1159 :
1160 : DDS::ReturnCode_t
1161 0 : DomainParticipantImpl::set_qos(
1162 : const DDS::DomainParticipantQos & qos)
1163 : {
1164 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1165 0 : if (qos_ == qos)
1166 0 : return DDS::RETCODE_OK;
1167 :
1168 : // for the not changeable qos, it can be changed before enable
1169 0 : if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
1170 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
1171 :
1172 : } else {
1173 0 : qos_ = qos;
1174 :
1175 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1176 : const bool status =
1177 0 : disco->update_domain_participant_qos(domain_id_,
1178 0 : dp_id_,
1179 0 : qos_);
1180 :
1181 0 : if (!status) {
1182 0 : if (DCPS_debug_level > 0) {
1183 0 : ACE_ERROR((LM_ERROR,
1184 : ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
1185 : ACE_TEXT("failed on compatibility check.\n")));
1186 : }
1187 0 : return DDS::RETCODE_ERROR;
1188 : }
1189 0 : }
1190 :
1191 0 : return DDS::RETCODE_OK;
1192 :
1193 : } else {
1194 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
1195 : }
1196 : }
1197 :
1198 : DDS::ReturnCode_t
1199 0 : DomainParticipantImpl::get_qos(
1200 : DDS::DomainParticipantQos & qos)
1201 : {
1202 0 : qos = qos_;
1203 0 : return DDS::RETCODE_OK;
1204 : }
1205 :
1206 : DDS::ReturnCode_t
1207 0 : DomainParticipantImpl::set_listener(
1208 : DDS::DomainParticipantListener_ptr a_listener,
1209 : DDS::StatusMask mask)
1210 : {
1211 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
1212 0 : listener_mask_ = mask;
1213 : //note: OK to duplicate a nil object ref
1214 0 : listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
1215 0 : return DDS::RETCODE_OK;
1216 0 : }
1217 :
1218 : DDS::DomainParticipantListener_ptr
1219 0 : DomainParticipantImpl::get_listener()
1220 : {
1221 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
1222 0 : return DDS::DomainParticipantListener::_duplicate(listener_.in());
1223 0 : }
1224 :
1225 : DDS::ReturnCode_t
1226 0 : DomainParticipantImpl::ignore_participant(
1227 : DDS::InstanceHandle_t handle)
1228 : {
1229 : #ifndef DDS_HAS_MINIMUM_BIT
1230 0 : if (!enabled_) {
1231 0 : if (DCPS_debug_level > 0) {
1232 0 : ACE_ERROR((LM_ERROR,
1233 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1234 : ACE_TEXT("Entity is not enabled.\n")));
1235 : }
1236 0 : return DDS::RETCODE_NOT_ENABLED;
1237 : }
1238 :
1239 0 : GUID_t ignoreId = get_repoid(handle);
1240 0 : HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
1241 :
1242 0 : if (location == this->ignored_participants_.end()) {
1243 0 : this->ignored_participants_[ ignoreId] = handle;
1244 : }
1245 : else {// ignore same participant again, just return ok.
1246 0 : return DDS::RETCODE_OK;
1247 : }
1248 :
1249 0 : if (DCPS_debug_level >= 4) {
1250 0 : ACE_DEBUG((LM_DEBUG,
1251 : ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
1252 : ACE_TEXT("%C ignoring handle %x.\n"),
1253 : LogGuid(dp_id_).c_str(),
1254 : handle));
1255 : }
1256 :
1257 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1258 0 : if (!disco->ignore_domain_participant(domain_id_,
1259 0 : dp_id_,
1260 : ignoreId)) {
1261 0 : if (DCPS_debug_level > 0) {
1262 0 : ACE_ERROR((LM_ERROR,
1263 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1264 : ACE_TEXT("Could not ignore domain participant.\n")));
1265 : }
1266 0 : return DDS::RETCODE_ERROR;
1267 : }
1268 :
1269 :
1270 0 : if (DCPS_debug_level >= 4) {
1271 0 : ACE_DEBUG((LM_DEBUG,
1272 : ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
1273 : ACE_TEXT("%C repo call returned.\n"),
1274 : LogGuid(dp_id_).c_str()));
1275 : }
1276 :
1277 0 : return DDS::RETCODE_OK;
1278 : #else
1279 : ACE_UNUSED_ARG(handle);
1280 : return DDS::RETCODE_UNSUPPORTED;
1281 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1282 0 : }
1283 :
1284 : DDS::ReturnCode_t
1285 0 : DomainParticipantImpl::ignore_topic(
1286 : DDS::InstanceHandle_t handle)
1287 : {
1288 : #ifndef DDS_HAS_MINIMUM_BIT
1289 0 : if (!enabled_) {
1290 0 : if (DCPS_debug_level > 0) {
1291 0 : ACE_ERROR((LM_ERROR,
1292 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1293 : ACE_TEXT(" Entity is not enabled.\n")));
1294 : }
1295 0 : return DDS::RETCODE_NOT_ENABLED;
1296 : }
1297 :
1298 0 : GUID_t ignoreId = get_repoid(handle);
1299 0 : HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
1300 :
1301 0 : if (location == this->ignored_topics_.end()) {
1302 0 : this->ignored_topics_[ ignoreId] = handle;
1303 : }
1304 : else { // ignore same topic again, just return ok.
1305 0 : return DDS::RETCODE_OK;
1306 : }
1307 :
1308 0 : if (DCPS_debug_level >= 4) {
1309 0 : ACE_DEBUG((LM_DEBUG,
1310 : ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
1311 : ACE_TEXT("%C ignoring handle %x.\n"),
1312 : LogGuid(dp_id_).c_str(),
1313 : handle));
1314 : }
1315 :
1316 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1317 0 : if (!disco->ignore_topic(domain_id_,
1318 0 : dp_id_,
1319 : ignoreId)) {
1320 0 : if (DCPS_debug_level > 0) {
1321 0 : ACE_ERROR((LM_ERROR,
1322 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1323 : ACE_TEXT(" Could not ignore topic.\n")));
1324 : }
1325 : }
1326 :
1327 0 : return DDS::RETCODE_OK;
1328 : #else
1329 : ACE_UNUSED_ARG(handle);
1330 : return DDS::RETCODE_UNSUPPORTED;
1331 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1332 0 : }
1333 :
1334 : DDS::ReturnCode_t
1335 0 : DomainParticipantImpl::ignore_publication(
1336 : DDS::InstanceHandle_t handle)
1337 : {
1338 : #ifndef DDS_HAS_MINIMUM_BIT
1339 0 : if (!enabled_) {
1340 0 : if (DCPS_debug_level > 0) {
1341 0 : ACE_ERROR((LM_ERROR,
1342 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1343 : ACE_TEXT(" Entity is not enabled.\n")));
1344 : }
1345 0 : return DDS::RETCODE_NOT_ENABLED;
1346 : }
1347 :
1348 0 : if (DCPS_debug_level >= 4) {
1349 0 : ACE_DEBUG((LM_DEBUG,
1350 : ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
1351 : ACE_TEXT("%C ignoring handle %x.\n"),
1352 : LogGuid(dp_id_).c_str(),
1353 : handle));
1354 : }
1355 :
1356 0 : GUID_t ignoreId = get_repoid(handle);
1357 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1358 0 : if (!disco->ignore_publication(domain_id_,
1359 0 : dp_id_,
1360 : ignoreId)) {
1361 0 : if (DCPS_debug_level > 0) {
1362 0 : ACE_ERROR((LM_ERROR,
1363 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1364 : ACE_TEXT(" could not ignore publication in discovery.\n")));
1365 : }
1366 0 : return DDS::RETCODE_ERROR;
1367 : }
1368 :
1369 0 : return DDS::RETCODE_OK;
1370 : #else
1371 : ACE_UNUSED_ARG(handle);
1372 : return DDS::RETCODE_UNSUPPORTED;
1373 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1374 0 : }
1375 :
1376 : DDS::ReturnCode_t
1377 0 : DomainParticipantImpl::ignore_subscription(
1378 : DDS::InstanceHandle_t handle)
1379 : {
1380 : #ifndef DDS_HAS_MINIMUM_BIT
1381 0 : if (!enabled_) {
1382 0 : if (DCPS_debug_level > 0) {
1383 0 : ACE_ERROR((LM_ERROR,
1384 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1385 : ACE_TEXT(" Entity is not enabled.\n")));
1386 : }
1387 0 : return DDS::RETCODE_NOT_ENABLED;
1388 : }
1389 :
1390 0 : if (DCPS_debug_level >= 4) {
1391 0 : ACE_DEBUG((LM_DEBUG,
1392 : ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
1393 : ACE_TEXT("%C ignoring handle %d.\n"),
1394 : LogGuid(dp_id_).c_str(),
1395 : handle));
1396 : }
1397 :
1398 0 : GUID_t ignoreId = get_repoid(handle);
1399 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1400 0 : if (!disco->ignore_subscription(domain_id_,
1401 0 : dp_id_,
1402 : ignoreId)) {
1403 0 : if (DCPS_debug_level > 0) {
1404 0 : ACE_ERROR((LM_ERROR,
1405 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1406 : ACE_TEXT(" could not ignore subscription in discovery.\n")));
1407 : }
1408 0 : return DDS::RETCODE_ERROR;
1409 : }
1410 :
1411 0 : return DDS::RETCODE_OK;
1412 : #else
1413 : ACE_UNUSED_ARG(handle);
1414 : return DDS::RETCODE_UNSUPPORTED;
1415 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1416 0 : }
1417 :
1418 : DDS::DomainId_t
1419 0 : DomainParticipantImpl::get_domain_id()
1420 : {
1421 0 : return domain_id_;
1422 : }
1423 :
1424 : DDS::ReturnCode_t
1425 0 : DomainParticipantImpl::assert_liveliness()
1426 : {
1427 : // This operation needs to only be used if the DomainParticipant contains
1428 : // DataWriter entities with the LIVELINESS set to MANUAL_BY_PARTICIPANT and
1429 : // it only affects the liveliness of those DataWriter entities. Otherwise,
1430 : // it has no effect.
1431 : // This will do nothing in current implementation since we only
1432 : // support the AUTOMATIC liveliness qos for datawriter.
1433 : // Add implementation here.
1434 :
1435 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1436 : tao_mon,
1437 : this->publishers_protector_,
1438 : DDS::RETCODE_ERROR);
1439 :
1440 0 : for (PublisherSet::iterator it(publishers_.begin());
1441 0 : it != publishers_.end(); ++it) {
1442 0 : it->svt_->assert_liveliness_by_participant();
1443 : }
1444 :
1445 0 : last_liveliness_activity_.set_to_now();
1446 :
1447 0 : return DDS::RETCODE_OK;
1448 0 : }
1449 :
1450 : DDS::ReturnCode_t
1451 0 : DomainParticipantImpl::set_default_publisher_qos(
1452 : const DDS::PublisherQos & qos)
1453 : {
1454 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1455 0 : default_publisher_qos_ = qos;
1456 0 : return DDS::RETCODE_OK;
1457 :
1458 : } else {
1459 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
1460 : }
1461 : }
1462 :
1463 : DDS::ReturnCode_t
1464 0 : DomainParticipantImpl::get_default_publisher_qos(
1465 : DDS::PublisherQos & qos)
1466 : {
1467 0 : qos = default_publisher_qos_;
1468 0 : return DDS::RETCODE_OK;
1469 : }
1470 :
1471 : DDS::ReturnCode_t
1472 0 : DomainParticipantImpl::set_default_subscriber_qos(
1473 : const DDS::SubscriberQos & qos)
1474 : {
1475 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1476 0 : default_subscriber_qos_ = qos;
1477 0 : return DDS::RETCODE_OK;
1478 :
1479 : } else {
1480 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
1481 : }
1482 : }
1483 :
1484 : DDS::ReturnCode_t
1485 0 : DomainParticipantImpl::get_default_subscriber_qos(
1486 : DDS::SubscriberQos & qos)
1487 : {
1488 0 : qos = default_subscriber_qos_;
1489 0 : return DDS::RETCODE_OK;
1490 : }
1491 :
1492 : DDS::ReturnCode_t
1493 0 : DomainParticipantImpl::set_default_topic_qos(
1494 : const DDS::TopicQos & qos)
1495 : {
1496 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
1497 0 : default_topic_qos_ = qos;
1498 0 : return DDS::RETCODE_OK;
1499 :
1500 : } else {
1501 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
1502 : }
1503 : }
1504 :
1505 : DDS::ReturnCode_t
1506 0 : DomainParticipantImpl::get_default_topic_qos(
1507 : DDS::TopicQos & qos)
1508 : {
1509 0 : qos = default_topic_qos_;
1510 0 : return DDS::RETCODE_OK;
1511 : }
1512 :
1513 : DDS::ReturnCode_t
1514 0 : DomainParticipantImpl::get_current_time(DDS::Time_t& current_time)
1515 : {
1516 0 : current_time = SystemTimePoint::now().to_dds_time();
1517 0 : return DDS::RETCODE_OK;
1518 : }
1519 :
1520 : #if !defined (DDS_HAS_MINIMUM_BIT)
1521 :
1522 : DDS::ReturnCode_t
1523 0 : DomainParticipantImpl::get_discovered_participants(DDS::InstanceHandleSeq& participant_handles)
1524 : {
1525 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
1526 :
1527 0 : const CountedHandleMap::const_iterator itEnd = handles_.end();
1528 0 : for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1529 0 : GuidConverter converter(iter->first);
1530 :
1531 0 : if (converter.entityKind() == KIND_PARTICIPANT) {
1532 : // skip itself and the ignored participant
1533 0 : if (iter->first == dp_id_ || ignored_participants_.count(iter->first)) {
1534 0 : continue;
1535 : }
1536 :
1537 0 : push_back(participant_handles, iter->second.first);
1538 : }
1539 0 : }
1540 :
1541 0 : return DDS::RETCODE_OK;
1542 0 : }
1543 :
1544 : DDS::ReturnCode_t
1545 0 : DomainParticipantImpl::get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
1546 : DDS::InstanceHandle_t participant_handle)
1547 : {
1548 : {
1549 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
1550 :
1551 0 : bool found = false;
1552 0 : const CountedHandleMap::const_iterator itEnd = handles_.end();
1553 0 : for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1554 0 : GuidConverter converter(iter->first);
1555 :
1556 0 : if (participant_handle == iter->second.first
1557 0 : && converter.entityKind() == KIND_PARTICIPANT) {
1558 0 : found = true;
1559 0 : break;
1560 : }
1561 0 : }
1562 :
1563 0 : if (!found)
1564 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1565 0 : }
1566 :
1567 0 : return bit_subscriber_->get_discovered_participant_data(participant_data, participant_handle);
1568 : }
1569 :
1570 : DDS::ReturnCode_t
1571 0 : DomainParticipantImpl::get_discovered_topics(DDS::InstanceHandleSeq& topic_handles)
1572 : {
1573 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
1574 :
1575 0 : const CountedHandleMap::const_iterator itEnd = handles_.end();
1576 0 : for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1577 0 : GuidConverter converter(iter->first);
1578 0 : if (converter.isTopic()) {
1579 0 : if (ignored_topics_.count(iter->first)) {
1580 0 : continue;
1581 : }
1582 :
1583 0 : push_back(topic_handles, iter->second.first);
1584 : }
1585 0 : }
1586 :
1587 0 : return DDS::RETCODE_OK;
1588 0 : }
1589 :
1590 : DDS::ReturnCode_t
1591 0 : DomainParticipantImpl::get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
1592 : DDS::InstanceHandle_t topic_handle)
1593 : {
1594 : {
1595 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
1596 :
1597 0 : bool found = false;
1598 0 : const CountedHandleMap::const_iterator itEnd = handles_.end();
1599 0 : for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
1600 0 : GuidConverter converter(iter->first);
1601 0 : if (topic_handle == iter->second.first && converter.isTopic()) {
1602 0 : found = true;
1603 0 : break;
1604 : }
1605 0 : }
1606 :
1607 0 : if (!found)
1608 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1609 0 : }
1610 :
1611 0 : return bit_subscriber_->get_discovered_topic_data(topic_data, topic_handle);
1612 : }
1613 :
1614 : #endif
1615 :
1616 : DDS::ReturnCode_t
1617 0 : DomainParticipantImpl::enable()
1618 : {
1619 : //According spec:
1620 : // - Calling enable on an already enabled Entity returns OK and has no
1621 : // effect.
1622 : // - Calling enable on an Entity whose factory is not enabled will fail
1623 : // and return PRECONDITION_NOT_MET.
1624 :
1625 0 : if (this->is_enabled()) {
1626 0 : return DDS::RETCODE_OK;
1627 : }
1628 :
1629 : #ifdef OPENDDS_SECURITY
1630 0 : if (!security_config_ && TheServiceParticipant->get_security()) {
1631 0 : security_config_ = TheSecurityRegistry->default_config();
1632 0 : if (!security_config_) {
1633 0 : security_config_ = TheSecurityRegistry->builtin_config();
1634 0 : TheSecurityRegistry->default_config(security_config_);
1635 : }
1636 : }
1637 : #endif
1638 :
1639 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1640 :
1641 0 : if (disco.is_nil()) {
1642 0 : if (DCPS_debug_level > 0) {
1643 0 : ACE_ERROR((LM_ERROR,
1644 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1645 : ACE_TEXT("no discovery found for domain id: %d.\n"), domain_id_));
1646 : }
1647 0 : return DDS::RETCODE_ERROR;
1648 : }
1649 :
1650 : #ifdef OPENDDS_SECURITY
1651 0 : if (TheServiceParticipant->get_security() && !security_config_) {
1652 0 : if (DCPS::security_debug.new_entity_error) {
1653 0 : ACE_ERROR((LM_ERROR,
1654 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1655 : ACE_TEXT("DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
1656 : }
1657 0 : return DDS::RETCODE_ERROR;
1658 : }
1659 : #endif
1660 :
1661 0 : AddDomainStatus value = {GUID_UNKNOWN, false};
1662 :
1663 : #ifdef OPENDDS_SECURITY
1664 0 : if (TheServiceParticipant->get_security() && security_config_->qos_implies_security(qos_)) {
1665 0 : Security::Authentication_var auth = security_config_->get_authentication();
1666 :
1667 0 : DDS::Security::SecurityException se;
1668 : DDS::Security::ValidationResult_t val_res =
1669 0 : auth->validate_local_identity(id_handle_, dp_id_, domain_id_, qos_, disco->generate_participant_guid(), se);
1670 :
1671 : /* TODO - Handle VALIDATION_PENDING_RETRY */
1672 0 : if (val_res != DDS::Security::VALIDATION_OK) {
1673 0 : if (DCPS::security_debug.new_entity_error) {
1674 0 : ACE_ERROR((LM_ERROR,
1675 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1676 : ACE_TEXT("Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
1677 : se.code, se.minor_code, se.message.in()));
1678 : }
1679 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
1680 : }
1681 :
1682 0 : Security::AccessControl_var access = security_config_->get_access_control();
1683 :
1684 0 : perm_handle_ = access->validate_local_permissions(auth, id_handle_, domain_id_, qos_, se);
1685 :
1686 0 : if (perm_handle_ == DDS::HANDLE_NIL) {
1687 0 : if (DCPS::security_debug.new_entity_error) {
1688 0 : ACE_ERROR((LM_ERROR,
1689 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1690 : ACE_TEXT("Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
1691 : se.code, se.minor_code, se.message.in()));
1692 : }
1693 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
1694 : }
1695 :
1696 0 : const bool check_create = access->check_create_participant(perm_handle_, domain_id_, qos_, se);
1697 0 : if (!check_create) {
1698 0 : if (DCPS::security_debug.new_entity_error) {
1699 0 : ACE_ERROR((LM_ERROR,
1700 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1701 : ACE_TEXT("Unable to create participant. SecurityException[%d.%d]: %C\n"),
1702 : se.code, se.minor_code, se.message.in()));
1703 : }
1704 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
1705 : }
1706 :
1707 0 : DDS::Security::ParticipantSecurityAttributes part_sec_attr;
1708 0 : const bool check_part_sec_attr = access->get_participant_sec_attributes(perm_handle_, part_sec_attr, se);
1709 :
1710 0 : if (!check_part_sec_attr) {
1711 0 : if (DCPS::security_debug.new_entity_error) {
1712 0 : ACE_ERROR((LM_ERROR,
1713 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable,")
1714 : ACE_TEXT("Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
1715 : se.code, se.minor_code, se.message.in()));
1716 : }
1717 0 : return DDS::RETCODE_ERROR;
1718 : }
1719 :
1720 0 : if (part_sec_attr.is_rtps_protected) { // DDS-Security v1.1 8.4.2.4 Table 27 is_rtps_protected
1721 0 : if (part_sec_attr.allow_unauthenticated_participants) {
1722 0 : if (DCPS::security_debug.new_entity_error) {
1723 0 : ACE_ERROR((LM_ERROR,
1724 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1725 : ACE_TEXT("allow_unauthenticated_participants is not possible with is_rtps_protected\n")));
1726 : }
1727 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
1728 : }
1729 :
1730 0 : const Security::CryptoKeyFactory_var crypto = security_config_->get_crypto_key_factory();
1731 0 : part_crypto_handle_ = crypto->register_local_participant(id_handle_, perm_handle_,
1732 0 : Util::filter_properties(qos_.property.value, "dds.sec.crypto."), part_sec_attr, se);
1733 0 : if (part_crypto_handle_ == DDS::HANDLE_NIL) {
1734 0 : if (DCPS::security_debug.new_entity_error) {
1735 0 : ACE_ERROR((LM_ERROR,
1736 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1737 : ACE_TEXT("Unable to register local participant. SecurityException[%d.%d]: %C\n"),
1738 : se.code, se.minor_code, se.message.in()));
1739 : }
1740 0 : return DDS::RETCODE_ERROR;
1741 : }
1742 :
1743 0 : } else {
1744 0 : part_crypto_handle_ = DDS::HANDLE_NIL;
1745 : }
1746 :
1747 0 : value = disco->add_domain_participant_secure(domain_id_, qos_, type_lookup_service_,
1748 0 : dp_id_, id_handle_, perm_handle_, part_crypto_handle_);
1749 :
1750 0 : if (value.id == GUID_UNKNOWN) {
1751 0 : if (DCPS::security_debug.new_entity_error) {
1752 0 : ACE_ERROR((LM_ERROR,
1753 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1754 : ACE_TEXT("add_domain_participant_secure returned invalid id.\n")));
1755 : }
1756 0 : return DDS::RETCODE_ERROR;
1757 : }
1758 :
1759 0 : } else {
1760 : #endif
1761 :
1762 0 : value = disco->add_domain_participant(domain_id_, qos_, type_lookup_service_);
1763 :
1764 0 : if (value.id == GUID_UNKNOWN) {
1765 0 : if (DCPS_debug_level > 0) {
1766 0 : ACE_ERROR((LM_ERROR,
1767 : ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1768 : ACE_TEXT("add_domain_participant returned invalid id.\n")));
1769 : }
1770 0 : return DDS::RETCODE_ERROR;
1771 : }
1772 :
1773 : #ifdef OPENDDS_SECURITY
1774 : }
1775 : #endif
1776 :
1777 0 : dp_id_ = value.id;
1778 0 : federated_ = value.federated;
1779 :
1780 0 : if (monitor_) {
1781 0 : monitor_->report();
1782 : }
1783 :
1784 0 : if (TheServiceParticipant->monitor_) {
1785 0 : TheServiceParticipant->monitor_->report();
1786 : }
1787 :
1788 0 : const DDS::ReturnCode_t ret = this->set_enabled();
1789 :
1790 0 : if (DCPS_debug_level > 1) {
1791 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DomainParticipantImpl::enable: ")
1792 : ACE_TEXT("enabled participant %C in domain %d\n"),
1793 : LogGuid(dp_id_).c_str(), domain_id_));
1794 : }
1795 :
1796 0 : if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
1797 0 : Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
1798 0 : this->bit_subscriber_ = disc->init_bit(this);
1799 0 : }
1800 :
1801 0 : if (ret != DDS::RETCODE_OK) {
1802 0 : return ret;
1803 : }
1804 :
1805 0 : if (qos_.entity_factory.autoenable_created_entities) {
1806 :
1807 0 : for (TopicMap::iterator it = topics_.begin(); it != topics_.end(); ++it) {
1808 0 : it->second.pair_.svt_->enable();
1809 : }
1810 :
1811 0 : for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
1812 0 : it->svt_->enable();
1813 : }
1814 :
1815 0 : for (SubscriberSet::iterator it = subscribers_.begin(); it != subscribers_.end(); ++it) {
1816 0 : it->svt_->enable();
1817 : }
1818 : }
1819 :
1820 0 : return DDS::RETCODE_OK;
1821 0 : }
1822 :
1823 : GUID_t
1824 0 : DomainParticipantImpl::get_id() const
1825 : {
1826 0 : return dp_id_;
1827 : }
1828 :
1829 : OPENDDS_STRING
1830 0 : DomainParticipantImpl::get_unique_id()
1831 : {
1832 0 : return GuidConverter(dp_id_).uniqueParticipantId();
1833 : }
1834 :
1835 :
1836 : DDS::InstanceHandle_t
1837 0 : DomainParticipantImpl::get_instance_handle()
1838 : {
1839 0 : return get_entity_instance_handle(dp_id_, rchandle_from(this));
1840 : }
1841 :
1842 0 : DDS::InstanceHandle_t DomainParticipantImpl::assign_handle(const GUID_t& id)
1843 : {
1844 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
1845 0 : if (id == GUID_UNKNOWN) {
1846 : const DDS::InstanceHandle_t ih =
1847 0 : reusable_handles_.empty() ? participant_handles_.next() : reusable_handles_.pop_front();
1848 0 : if (DCPS_debug_level > 5) {
1849 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1850 : "New unmapped InstanceHandle %d\n", ih));
1851 : }
1852 0 : return ih;
1853 : }
1854 :
1855 0 : const CountedHandleMap::iterator location = handles_.find(id);
1856 0 : if (location == handles_.end()) {
1857 : const DDS::InstanceHandle_t handle =
1858 0 : reusable_handles_.empty() ? participant_handles_.next() : reusable_handles_.pop_front();
1859 0 : if (DCPS_debug_level > 5) {
1860 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1861 : "New mapped InstanceHandle %d for %C\n",
1862 : handle, LogGuid(id).c_str()));
1863 : }
1864 0 : handles_[id] = std::make_pair(handle, 1);
1865 0 : repoIds_[handle] = id;
1866 0 : handle_waiters_.notify_all();
1867 0 : return handle;
1868 : }
1869 :
1870 0 : HandleWithCounter& mapped = location->second;
1871 0 : ++mapped.second;
1872 0 : if (DCPS_debug_level > 5) {
1873 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
1874 : "Incremented refcount for InstanceHandle %d to %d\n",
1875 : mapped.first, mapped.second));
1876 : }
1877 0 : return mapped.first;
1878 0 : }
1879 :
1880 0 : DDS::InstanceHandle_t DomainParticipantImpl::await_handle(const GUID_t& id,
1881 : TimeDuration max_wait) const
1882 : {
1883 0 : MonotonicTimePoint expire_at = MonotonicTimePoint::now() + max_wait;
1884 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
1885 0 : CountedHandleMap::const_iterator iter = handles_.find(id);
1886 0 : CvStatus res = CvStatus_NoTimeout;
1887 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1888 0 : while (res == CvStatus_NoTimeout && iter == handles_.end()) {
1889 0 : res = max_wait.is_zero() ? handle_waiters_.wait(thread_status_manager) : handle_waiters_.wait_until(expire_at, thread_status_manager);
1890 0 : iter = handles_.find(id);
1891 : }
1892 0 : return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
1893 0 : }
1894 :
1895 0 : DDS::InstanceHandle_t DomainParticipantImpl::lookup_handle(const GUID_t& id) const
1896 : {
1897 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
1898 0 : const CountedHandleMap::const_iterator iter = handles_.find(id);
1899 0 : return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
1900 0 : }
1901 :
1902 0 : void DomainParticipantImpl::return_handle(DDS::InstanceHandle_t handle)
1903 : {
1904 0 : ACE_GUARD(ACE_Thread_Mutex, guard, handle_protector_);
1905 0 : const RepoIdMap::iterator r_iter = repoIds_.find(handle);
1906 0 : if (r_iter == repoIds_.end()) {
1907 0 : reusable_handles_.add(handle);
1908 0 : if (DCPS_debug_level > 5) {
1909 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
1910 : "Returned unmapped InstanceHandle %d\n", handle));
1911 : }
1912 0 : return;
1913 : }
1914 :
1915 0 : const CountedHandleMap::iterator h_iter = handles_.find(r_iter->second);
1916 0 : if (h_iter == handles_.end()) {
1917 0 : return;
1918 : }
1919 :
1920 0 : if (DCPS_debug_level > 5) {
1921 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
1922 : "Returned mapped InstanceHandle %d refcount %d\n",
1923 : handle, h_iter->second.second));
1924 : }
1925 :
1926 0 : HandleWithCounter& mapped = h_iter->second;
1927 0 : if (--mapped.second == 0) {
1928 0 : handles_.erase(h_iter);
1929 0 : repoIds_.erase(r_iter);
1930 0 : reusable_handles_.add(handle);
1931 : }
1932 0 : }
1933 :
1934 0 : GUID_t DomainParticipantImpl::get_repoid(DDS::InstanceHandle_t handle) const
1935 : {
1936 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, GUID_UNKNOWN);
1937 0 : const RepoIdMap::const_iterator location = repoIds_.find(handle);
1938 0 : return location == repoIds_.end() ? GUID_UNKNOWN : location->second;
1939 0 : }
1940 :
1941 : DDS::Topic_ptr
1942 0 : DomainParticipantImpl::create_new_topic(
1943 : const char * topic_name,
1944 : const char * type_name,
1945 : const DDS::TopicQos & qos,
1946 : DDS::TopicListener_ptr a_listener,
1947 : const DDS::StatusMask & mask,
1948 : OpenDDS::DCPS::TypeSupport_ptr type_support)
1949 : {
1950 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1951 : tao_mon,
1952 : this->topics_protector_,
1953 : DDS::Topic::_nil());
1954 :
1955 : #ifdef OPENDDS_SECURITY
1956 0 : if (perm_handle_ && !topicIsBIT(topic_name, type_name)) {
1957 0 : Security::AccessControl_var access = security_config_->get_access_control();
1958 :
1959 0 : DDS::Security::SecurityException se;
1960 :
1961 : DDS::Security::TopicSecurityAttributes sec_attr;
1962 0 : if (!access->get_topic_sec_attributes(perm_handle_, topic_name, sec_attr, se)) {
1963 0 : if (DCPS::security_debug.new_entity_warn) {
1964 0 : ACE_ERROR((LM_WARNING,
1965 : ACE_TEXT("(%P|%t) WARNING: ")
1966 : ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
1967 : ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
1968 : topic_name, se.code, se.minor_code, se.message.in()));
1969 : }
1970 0 : return DDS::Topic::_nil();
1971 : }
1972 :
1973 0 : if ((sec_attr.is_write_protected || sec_attr.is_read_protected) &&
1974 0 : !access->check_create_topic(perm_handle_, domain_id_, topic_name, qos, se)) {
1975 0 : if (DCPS::security_debug.new_entity_warn) {
1976 0 : ACE_ERROR((LM_WARNING,
1977 : ACE_TEXT("(%P|%t) WARNING: ")
1978 : ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
1979 : ACE_TEXT("Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
1980 : topic_name, se.code, se.minor_code, se.message.in()));
1981 : }
1982 0 : return DDS::Topic::_nil();
1983 : }
1984 0 : }
1985 : #endif
1986 :
1987 0 : TopicImpl* topic_servant = 0;
1988 :
1989 0 : ACE_NEW_RETURN(topic_servant,
1990 : TopicImpl(topic_name,
1991 : type_name,
1992 : type_support,
1993 : qos,
1994 : a_listener,
1995 : mask,
1996 : this),
1997 : DDS::Topic::_nil());
1998 :
1999 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
2000 0 : const DDS::ReturnCode_t ret = topic_servant->enable();
2001 :
2002 0 : if (ret != DDS::RETCODE_OK) {
2003 0 : ACE_ERROR((LM_WARNING,
2004 : ACE_TEXT("(%P|%t) WARNING: ")
2005 : ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
2006 : ACE_TEXT("enable failed.\n")));
2007 0 : return DDS::Topic::_nil();
2008 : }
2009 : }
2010 :
2011 0 : DDS::Topic_ptr obj(topic_servant);
2012 :
2013 : // this object will also act as a guard against leaking the new TopicImpl
2014 0 : RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, false));
2015 0 : topics_.insert(std::make_pair(topic_name, refCounted_topic));
2016 :
2017 0 : if (this->monitor_) {
2018 0 : this->monitor_->report();
2019 : }
2020 :
2021 : // the topics_ map has one reference and we duplicate to give
2022 : // the caller another reference.
2023 0 : return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
2024 0 : }
2025 :
2026 0 : bool DomainParticipantImpl::is_clean(String* leftover_entities) const
2027 : {
2028 0 : if (leftover_entities) {
2029 0 : leftover_entities->clear();
2030 : }
2031 :
2032 : // check that the only remaining topics are built-in topics
2033 0 : size_t topic_count = 0;
2034 0 : for (TopicMap::const_iterator it = topics_.begin(); it != topics_.end(); ++it) {
2035 0 : if (!topicIsBIT(it->second.pair_.svt_->topic_name(), it->second.pair_.svt_->type_name())) {
2036 0 : ++topic_count;
2037 : }
2038 : }
2039 0 : if (topic_count) {
2040 0 : *leftover_entities += to_dds_string(topic_count) + " topic(s)";
2041 : }
2042 :
2043 0 : size_t sub_count = subscribers_.size();
2044 0 : if (!TheTransientKludge->is_enabled()) {
2045 : // There are built-in topics and built-in topic subscribers left.
2046 0 : sub_count = sub_count <= 1 ? 0 : sub_count;
2047 : }
2048 0 : if (leftover_entities && sub_count) {
2049 0 : if (leftover_entities->size()) {
2050 0 : *leftover_entities += ", ";
2051 : }
2052 0 : *leftover_entities += to_dds_string(sub_count) + " subscriber(s)";
2053 : }
2054 :
2055 0 : const size_t pub_count = publishers_.size();
2056 0 : if (leftover_entities && pub_count) {
2057 0 : if (leftover_entities->size()) {
2058 0 : *leftover_entities += ", ";
2059 : }
2060 0 : *leftover_entities += to_dds_string(pub_count) + " publisher(s)";
2061 : }
2062 :
2063 0 : return topic_count == 0 && sub_count == 0 && pub_count == 0;
2064 : }
2065 :
2066 : DDS::DomainParticipantListener_ptr
2067 0 : DomainParticipantImpl::listener_for(DDS::StatusKind kind)
2068 : {
2069 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
2070 0 : if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
2071 0 : return DDS::DomainParticipantListener::_nil ();
2072 : } else {
2073 0 : return DDS::DomainParticipantListener::_duplicate(listener_.in());
2074 : }
2075 0 : }
2076 :
2077 : void
2078 0 : DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
2079 : {
2080 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
2081 : guard,
2082 : this->topics_protector_);
2083 :
2084 0 : topics.reserve(topics_.size());
2085 0 : for (TopicMap::iterator it(topics_.begin());
2086 0 : it != topics_.end(); ++it) {
2087 0 : topics.push_back(it->second.pair_.svt_->get_id());
2088 : }
2089 0 : }
2090 :
2091 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2092 :
2093 : OwnershipManager*
2094 0 : DomainParticipantImpl::ownership_manager()
2095 : {
2096 : #if !defined (DDS_HAS_MINIMUM_BIT)
2097 0 : if (bit_subscriber_) {
2098 0 : bit_subscriber_->bit_pub_listener_hack(this);
2099 : } else {
2100 0 : if (log_level >= LogLevel::Warning) {
2101 0 : ACE_ERROR((LM_WARNING,
2102 : "(%P|%t) WARNING: DomainParticipantImpl::ownership_manager: bit_subscriber_ is null"));
2103 : }
2104 : }
2105 : #endif
2106 0 : return &owner_man_;
2107 : }
2108 :
2109 : void
2110 0 : DomainParticipantImpl::update_ownership_strength (const GUID_t& pub_id,
2111 : const CORBA::Long& ownership_strength)
2112 : {
2113 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
2114 : tao_mon,
2115 : this->subscribers_protector_);
2116 :
2117 0 : if (this->get_deleted ())
2118 0 : return;
2119 :
2120 0 : for (SubscriberSet::iterator it(this->subscribers_.begin());
2121 0 : it != this->subscribers_.end(); ++it) {
2122 0 : it->svt_->update_ownership_strength(pub_id, ownership_strength);
2123 : }
2124 0 : }
2125 :
2126 : #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2127 :
2128 2 : DomainParticipantImpl::RepoIdSequence::RepoIdSequence(const GUID_t& base) :
2129 2 : base_(base),
2130 2 : serial_(0),
2131 2 : builder_(base_)
2132 : {
2133 2 : }
2134 :
2135 : GUID_t
2136 7 : DomainParticipantImpl::RepoIdSequence::next()
2137 : {
2138 7 : builder_.entityKey(++serial_);
2139 7 : return builder_;
2140 : }
2141 :
2142 :
2143 : ////////////////////////////////////////////////////////////////
2144 :
2145 :
2146 : bool
2147 0 : DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
2148 : {
2149 0 : if (pub_qos == PUBLISHER_QOS_DEFAULT) {
2150 0 : this->get_default_publisher_qos(pub_qos);
2151 : }
2152 :
2153 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
2154 :
2155 0 : if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
2156 0 : if (DCPS_debug_level > 0) {
2157 0 : ACE_ERROR((LM_ERROR,
2158 : ACE_TEXT("(%P|%t) ERROR: ")
2159 : ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
2160 : ACE_TEXT("invalid qos.\n")));
2161 : }
2162 0 : return false;
2163 : }
2164 :
2165 0 : return true;
2166 : }
2167 :
2168 : bool
2169 0 : DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
2170 : {
2171 0 : if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
2172 0 : this->get_default_subscriber_qos(subscriber_qos);
2173 : }
2174 :
2175 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
2176 :
2177 0 : if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
2178 0 : if (DCPS_debug_level > 0) {
2179 0 : ACE_ERROR((LM_ERROR,
2180 : ACE_TEXT("(%P|%t) ERROR: ")
2181 : ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
2182 : ACE_TEXT("invalid qos.\n")));
2183 : }
2184 0 : return false;
2185 : }
2186 :
2187 :
2188 0 : return true;
2189 : }
2190 :
2191 : Recorder_ptr
2192 0 : DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
2193 : const DDS::SubscriberQos& subscriber_qos,
2194 : const DDS::DataReaderQos& datareader_qos,
2195 : const RecorderListener_rch& a_listener,
2196 : DDS::StatusMask mask)
2197 : {
2198 0 : if (CORBA::is_nil(a_topic)) {
2199 0 : if (DCPS_debug_level > 0) {
2200 0 : ACE_ERROR((LM_ERROR,
2201 : ACE_TEXT("(%P|%t) ERROR: ")
2202 : ACE_TEXT("DomainParticipantImpl::create_recorder, ")
2203 : ACE_TEXT("topic desc is nil.\n")));
2204 : }
2205 0 : return 0;
2206 : }
2207 :
2208 0 : DDS::SubscriberQos sub_qos = subscriber_qos;
2209 0 : DDS::DataReaderQos dr_qos;
2210 :
2211 0 : if (! this->validate_subscriber_qos(sub_qos) ||
2212 0 : ! SubscriberImpl::validate_datareader_qos(datareader_qos,
2213 : TheServiceParticipant->initial_DataReaderQos(),
2214 : a_topic,
2215 : dr_qos, false) ) {
2216 0 : return 0;
2217 : }
2218 :
2219 0 : RecorderImpl* recorder(new RecorderImpl);
2220 0 : Recorder_var result(recorder);
2221 :
2222 0 : recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
2223 : dr_qos, a_listener,
2224 : mask, this, sub_qos);
2225 :
2226 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
2227 0 : recorder->enable();
2228 : }
2229 :
2230 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
2231 0 : recorders_.insert(result);
2232 :
2233 0 : return result._retn();
2234 0 : }
2235 :
2236 : Replayer_ptr
2237 0 : DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
2238 : const DDS::PublisherQos& publisher_qos,
2239 : const DDS::DataWriterQos& datawriter_qos,
2240 : const ReplayerListener_rch& a_listener,
2241 : DDS::StatusMask mask)
2242 : {
2243 0 : if (CORBA::is_nil(a_topic)) {
2244 0 : if (DCPS_debug_level > 0) {
2245 0 : ACE_ERROR((LM_ERROR,
2246 : ACE_TEXT("(%P|%t) ERROR: ")
2247 : ACE_TEXT("DomainParticipantImpl::create_replayer, ")
2248 : ACE_TEXT("topic desc is nil.\n")));
2249 : }
2250 0 : return 0;
2251 : }
2252 :
2253 0 : DDS::PublisherQos pub_qos = publisher_qos;
2254 0 : DDS::DataWriterQos dw_qos;
2255 :
2256 0 : if (! this->validate_publisher_qos(pub_qos) ||
2257 0 : ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
2258 : TheServiceParticipant->initial_DataWriterQos(),
2259 : a_topic,
2260 : dw_qos)) {
2261 0 : return 0;
2262 : }
2263 :
2264 0 : TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
2265 :
2266 0 : ReplayerImpl* replayer(new ReplayerImpl);
2267 0 : Replayer_var result(replayer);
2268 :
2269 0 : replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
2270 :
2271 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
2272 0 : const DDS::ReturnCode_t ret = replayer->enable();
2273 :
2274 0 : if (ret != DDS::RETCODE_OK) {
2275 0 : if (DCPS_debug_level > 0) {
2276 0 : ACE_ERROR((LM_ERROR,
2277 : ACE_TEXT("(%P|%t) ERROR: ")
2278 : ACE_TEXT("DomainParticipantImpl::create_replayer, ")
2279 : ACE_TEXT("enable failed.\n")));
2280 : }
2281 0 : return 0;
2282 : }
2283 : }
2284 :
2285 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
2286 0 : replayers_.insert(result);
2287 0 : return result._retn();
2288 0 : }
2289 :
2290 : void
2291 0 : DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
2292 : {
2293 0 : const Recorder_var recvar(Recorder::_duplicate(recorder));
2294 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
2295 0 : recorders_.erase(recvar);
2296 0 : }
2297 :
2298 : void
2299 0 : DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
2300 : {
2301 0 : const Replayer_var repvar(Replayer::_duplicate(replayer));
2302 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
2303 0 : replayers_.erase(repvar);
2304 0 : }
2305 :
2306 : void
2307 0 : DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
2308 : {
2309 0 : automatic_liveliness_timer_->add_adjust(writer);
2310 0 : participant_liveliness_timer_->add_adjust(writer);
2311 0 : }
2312 :
2313 : void
2314 0 : DomainParticipantImpl::remove_adjust_liveliness_timers()
2315 : {
2316 0 : automatic_liveliness_timer_->remove_adjust();
2317 0 : participant_liveliness_timer_->remove_adjust();
2318 0 : }
2319 :
2320 0 : DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
2321 0 : DDS::LivelinessQosPolicyKind kind)
2322 0 : : impl_(impl)
2323 0 : , kind_(kind)
2324 0 : , interval_(TimeDuration::max_value)
2325 0 : , recalculate_interval_(false)
2326 0 : , scheduled_(false)
2327 0 : { }
2328 :
2329 0 : DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
2330 : {
2331 0 : }
2332 :
2333 : void
2334 0 : DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
2335 : {
2336 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2337 :
2338 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
2339 :
2340 : // Calculate the time remaining to liveliness check.
2341 0 : const TimeDuration remaining = interval_ - (now - last_liveliness_check_);
2342 :
2343 : // Adopt a smaller interval.
2344 0 : interval_ = std::min(interval_, writer->liveliness_check_interval(kind_));
2345 :
2346 : // Reschedule or schedule a timer if necessary.
2347 0 : if (scheduled_ && interval_ < remaining) {
2348 0 : cancel();
2349 0 : schedule(interval_);
2350 0 : } else if (!scheduled_) {
2351 0 : schedule(interval_);
2352 0 : scheduled_ = true;
2353 0 : last_liveliness_check_ = now;
2354 : }
2355 0 : }
2356 :
2357 : void
2358 0 : DomainParticipantImpl::LivelinessTimer::remove_adjust()
2359 : {
2360 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2361 :
2362 0 : recalculate_interval_ = true;
2363 0 : }
2364 :
2365 0 : void DomainParticipantImpl::LivelinessTimer::execute(const MonotonicTimePoint& now)
2366 : {
2367 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
2368 :
2369 0 : if (recalculate_interval_) {
2370 0 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
2371 0 : TimeDuration interval;
2372 0 : while (recalculate_interval_) {
2373 0 : recalculate_interval_ = false;
2374 0 : ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rev_guard, rev_lock);
2375 0 : interval = impl_.liveliness_check_interval(kind_);
2376 0 : }
2377 0 : interval_ = interval;
2378 0 : }
2379 :
2380 0 : scheduled_ = false;
2381 :
2382 0 : if (!interval_.is_max()) {
2383 0 : dispatch(now);
2384 0 : last_liveliness_check_ = now;
2385 0 : schedule(interval_);
2386 0 : scheduled_ = true;
2387 : }
2388 0 : }
2389 :
2390 0 : DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
2391 0 : : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
2392 0 : { }
2393 :
2394 : void
2395 0 : DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const MonotonicTimePoint& /* tv */)
2396 : {
2397 0 : impl_.signal_liveliness(kind_);
2398 0 : }
2399 :
2400 0 : DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
2401 0 : : LivelinessTimer(impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
2402 0 : { }
2403 :
2404 : void
2405 0 : DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const MonotonicTimePoint& tv)
2406 : {
2407 0 : if (impl_.participant_liveliness_activity_after (tv - interval())) {
2408 0 : impl_.signal_liveliness(kind_);
2409 : }
2410 0 : }
2411 :
2412 : TimeDuration
2413 0 : DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
2414 : {
2415 0 : TimeDuration tv(TimeDuration::max_value);
2416 :
2417 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2418 : tao_mon,
2419 : publishers_protector_,
2420 : tv);
2421 :
2422 0 : for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
2423 0 : tv = std::min(tv, it->svt_->liveliness_check_interval(kind));
2424 : }
2425 :
2426 0 : return tv;
2427 0 : }
2428 :
2429 : bool
2430 0 : DomainParticipantImpl::participant_liveliness_activity_after(const MonotonicTimePoint& tv)
2431 : {
2432 0 : if (last_liveliness_activity_ > tv) {
2433 0 : return true;
2434 : }
2435 :
2436 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, tao_mon, this->publishers_protector_, !tv.is_zero());
2437 :
2438 0 : for (PublisherSet::iterator it(publishers_.begin());
2439 0 : it != publishers_.end(); ++it) {
2440 0 : if (it->svt_->participant_liveliness_activity_after(tv)) {
2441 0 : return true;
2442 : }
2443 : }
2444 :
2445 0 : return false;
2446 0 : }
2447 :
2448 : void
2449 0 : DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
2450 : {
2451 0 : TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
2452 0 : }
2453 :
2454 : int
2455 0 : DomainParticipantImpl::handle_exception(ACE_HANDLE /*fd*/)
2456 : {
2457 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2458 :
2459 0 : DDS::ReturnCode_t ret = DDS::RETCODE_OK;
2460 :
2461 0 : automatic_liveliness_timer_->cancel();
2462 0 : participant_liveliness_timer_->cancel();
2463 :
2464 : // delete publishers
2465 : {
2466 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2467 : tao_mon,
2468 : this->publishers_protector_,
2469 : DDS::RETCODE_ERROR);
2470 :
2471 0 : PublisherSet::iterator pubIter = publishers_.begin();
2472 : DDS::Publisher_ptr pubPtr;
2473 0 : size_t pubsize = publishers_.size();
2474 :
2475 0 : while (pubsize > 0) {
2476 0 : pubPtr = (*pubIter).obj_.in();
2477 0 : ++pubIter;
2478 :
2479 0 : DDS::ReturnCode_t result = pubPtr->delete_contained_entities();
2480 0 : if (result != DDS::RETCODE_OK) {
2481 0 : ret = result;
2482 : }
2483 :
2484 0 : result = delete_publisher(pubPtr);
2485 :
2486 0 : if (result != DDS::RETCODE_OK) {
2487 0 : ret = result;
2488 : }
2489 :
2490 0 : --pubsize;
2491 : }
2492 :
2493 0 : }
2494 :
2495 : // delete subscribers
2496 : {
2497 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2498 : tao_mon,
2499 : this->subscribers_protector_,
2500 : DDS::RETCODE_ERROR);
2501 :
2502 0 : SubscriberSet::iterator subIter = subscribers_.begin();
2503 : DDS::Subscriber_ptr subPtr;
2504 0 : size_t subsize = subscribers_.size();
2505 :
2506 0 : while (subsize > 0) {
2507 0 : subPtr = (*subIter).obj_.in();
2508 0 : ++subIter;
2509 :
2510 0 : DDS::ReturnCode_t result = subPtr->delete_contained_entities();
2511 :
2512 0 : if (result != DDS::RETCODE_OK) {
2513 0 : ret = result;
2514 : }
2515 :
2516 0 : result = delete_subscriber(subPtr);
2517 :
2518 0 : if (result != DDS::RETCODE_OK) {
2519 0 : ret = result;
2520 : }
2521 :
2522 0 : --subsize;
2523 : }
2524 0 : }
2525 :
2526 : {
2527 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2528 : tao_mon,
2529 : this->recorders_protector_,
2530 : DDS::RETCODE_ERROR);
2531 :
2532 0 : RecorderSet::iterator it = recorders_.begin();
2533 0 : for (; it != recorders_.end(); ++it ){
2534 0 : RecorderImpl* impl = dynamic_cast<RecorderImpl* >(it->in());
2535 0 : DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
2536 0 : if (impl) result = impl->cleanup();
2537 0 : if (result != DDS::RETCODE_OK) ret = result;
2538 : }
2539 0 : recorders_.clear();
2540 0 : }
2541 :
2542 : {
2543 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2544 : tao_mon,
2545 : this->replayers_protector_,
2546 : DDS::RETCODE_ERROR);
2547 :
2548 0 : ReplayerSet::iterator it = replayers_.begin();
2549 0 : for (; it != replayers_.end(); ++it ){
2550 0 : ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
2551 0 : DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
2552 0 : if (impl) result = impl->cleanup();
2553 0 : if (result != DDS::RETCODE_OK) ret = result;
2554 :
2555 : }
2556 :
2557 0 : replayers_.clear();
2558 0 : }
2559 :
2560 : // delete topics
2561 : {
2562 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2563 : tao_mon,
2564 : this->topics_protector_,
2565 : DDS::RETCODE_ERROR);
2566 :
2567 0 : TopicMap::iterator topicIter = topics_.begin();
2568 : DDS::Topic_ptr topicPtr;
2569 0 : size_t topicsize = topics_.size();
2570 :
2571 0 : while (topicsize > 0) {
2572 0 : topicPtr = topicIter->second.pair_.obj_.in();
2573 0 : ++topicIter;
2574 :
2575 : // Delete the topic the reference count.
2576 0 : const DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
2577 :
2578 0 : if (result != DDS::RETCODE_OK) {
2579 0 : ret = result;
2580 : }
2581 0 : --topicsize;
2582 : }
2583 0 : }
2584 :
2585 0 : shutdown_mutex_.acquire();
2586 0 : shutdown_result_ = ret;
2587 0 : shutdown_complete_ = true;
2588 0 : shutdown_condition_.notify_all();
2589 0 : shutdown_mutex_.release();
2590 :
2591 0 : return 0;
2592 0 : }
2593 :
2594 0 : bool DomainParticipantImpl::prepare_to_delete_datawriters()
2595 : {
2596 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, publishers_protector_, false);
2597 0 : bool result = true;
2598 0 : const PublisherSet::iterator end = publishers_.end();
2599 0 : for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
2600 0 : result &= i->svt_->prepare_to_delete_datawriters();
2601 : }
2602 0 : return result;
2603 0 : }
2604 :
2605 0 : bool DomainParticipantImpl::set_wait_pending_deadline(const MonotonicTimePoint& deadline)
2606 : {
2607 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, publishers_protector_, false);
2608 0 : bool result = true;
2609 0 : const PublisherSet::iterator end = publishers_.end();
2610 0 : for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
2611 0 : result &= i->svt_->set_wait_pending_deadline(deadline);
2612 : }
2613 0 : return result;
2614 0 : }
2615 :
2616 : #ifndef OPENDDS_SAFETY_PROFILE
2617 0 : DDS::ReturnCode_t DomainParticipantImpl::get_dynamic_type(
2618 : DDS::DynamicType_var& type, const DDS::BuiltinTopicKey_t& key)
2619 : {
2620 0 : if (!type_lookup_service_) {
2621 0 : if (log_level >= LogLevel::Notice) {
2622 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2623 : "Can't get a DynamicType, no type lookup service\n"));
2624 : }
2625 0 : return DDS::RETCODE_UNSUPPORTED;
2626 : }
2627 :
2628 0 : XTypes::TypeInformation ti = type_lookup_service_->get_type_info(key);
2629 0 : if (ti.complete.typeid_with_size.typeobject_serialized_size == 0) {
2630 0 : if (log_level >= LogLevel::Notice) {
2631 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2632 : "Can't get a DynamicType, type info is missing complete\n"));
2633 : }
2634 0 : return DDS::RETCODE_NO_DATA;
2635 : }
2636 :
2637 0 : const XTypes::TypeIdentifier& ctid = ti.complete.typeid_with_size.type_id;
2638 0 : const GUID_t entity = bit_key_to_guid(key);
2639 0 : if (!type_lookup_service_->has_complete(ctid)) {
2640 : // We don't have it, try to asking the remote for the complete
2641 : // TypeObjects.
2642 0 : if (DCPS_debug_level >= 4) {
2643 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::get_dynamic_type: "
2644 : "requesting remote complete TypeObject from %C\n", LogGuid(entity).c_str()));
2645 : }
2646 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
2647 0 : TypeObjReqCond cond;
2648 0 : disco->request_remote_complete_type_objects(domain_id_, dp_id_, entity, ti, cond);
2649 0 : const DDS::ReturnCode_t rc = cond.wait();
2650 0 : if (rc != DDS::RETCODE_OK) {
2651 0 : if (log_level >= LogLevel::Notice) {
2652 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2653 : "Couldn't get remote complete type object: %C\n", retcode_to_string(rc)));
2654 : }
2655 0 : return rc;
2656 : }
2657 :
2658 0 : if (!type_lookup_service_->has_complete(ctid)) {
2659 0 : if (log_level >= LogLevel::Notice) {
2660 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2661 : "request_remote_complete_type_objects succeeded, but type lookup service still says it "
2662 : "doesn't have the complete TypeObject?\n"));
2663 : }
2664 0 : return DDS::RETCODE_ERROR;
2665 : }
2666 0 : }
2667 :
2668 0 : DDS::DynamicType_var got_type = type_lookup_service_->type_identifier_to_dynamic(ctid, entity);
2669 0 : if (!XTypes::dynamic_type_is_valid(got_type)) {
2670 0 : if (log_level >= LogLevel::Notice) {
2671 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
2672 : "Got an invalid DynamicType\n"));
2673 : }
2674 0 : return DDS::RETCODE_ERROR;
2675 : }
2676 0 : type = got_type;
2677 :
2678 0 : XTypes::DynamicTypeImpl* impl = dynamic_cast<XTypes::DynamicTypeImpl*>(type.in());
2679 0 : impl->set_complete_type_identifier(ctid);
2680 0 : impl->set_minimal_type_identifier(ti.minimal.typeid_with_size.type_id);
2681 0 : impl->set_preset_type_info(ti);
2682 :
2683 0 : return DDS::RETCODE_OK;
2684 0 : }
2685 : #endif
2686 :
2687 : } // namespace DCPS
2688 : } // namespace OpenDDS
2689 :
2690 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|