00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DomainParticipantImpl.h"
00010 #include "dds/DdsDcpsGuidC.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "Service_Participant.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "PublisherImpl.h"
00016 #include "SubscriberImpl.h"
00017 #include "DataWriterImpl.h"
00018 #include "Marked_Default_Qos.h"
00019 #include "Registered_Data_Types.h"
00020 #include "Transient_Kludge.h"
00021 #include "DomainParticipantFactoryImpl.h"
00022 #include "Util.h"
00023 #include "MonitorFactory.h"
00024 #include "BitPubListenerImpl.h"
00025 #include "ContentFilteredTopicImpl.h"
00026 #include "MultiTopicImpl.h"
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029
00030 #if defined(OPENDDS_SECURITY)
00031 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00032 #endif
00033
00034 #include "RecorderImpl.h"
00035 #include "ReplayerImpl.h"
00036
00037 #if !defined (DDS_HAS_MINIMUM_BIT)
00038 #include "BuiltInTopicUtils.h"
00039 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00040 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00041
00042 #include "tao/debug.h"
00043 #include "ace/Reactor.h"
00044 #include "ace/OS_NS_unistd.h"
00045
00046 namespace Util {
00047
00048 template <typename Key>
00049 int find(
00050 OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
00051 const Key& key,
00052 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
00053 {
00054 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
00055 c.find(key);
00056
00057 if (iter == c.end()) {
00058 return -1;
00059 }
00060
00061 value = &iter->second;
00062 return 0;
00063 }
00064
00065 DDS::PropertySeq filter_properties(const DDS::PropertySeq& properties, const std::string& prefix)
00066 {
00067 DDS::PropertySeq result(properties.length());
00068 result.length(properties.length());
00069 size_t count = 0;
00070 for (size_t i = 0, len = properties.length(); i < len; ++i) {
00071 if (std::string(properties[i].name.in()).find(prefix) == 0) {
00072 result[count++] = properties[i];
00073 }
00074 }
00075 result.length(count);
00076 return result;
00077 }
00078
00079 }
00080
00081 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00082
00083 namespace OpenDDS {
00084 namespace DCPS {
00085
00086
00087
00088
00089
00090
00091 DomainParticipantImpl::DomainParticipantImpl(DomainParticipantFactoryImpl * factory,
00092 const DDS::DomainId_t& domain_id,
00093 const DDS::DomainParticipantQos & qos,
00094 DDS::DomainParticipantListener_ptr a_listener,
00095 const DDS::StatusMask & mask)
00096 : factory_(factory),
00097 default_topic_qos_(TheServiceParticipant->initial_TopicQos()),
00098 default_publisher_qos_(TheServiceParticipant->initial_PublisherQos()),
00099 default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos()),
00100 qos_(qos),
00101 domain_id_(domain_id),
00102 dp_id_(GUID_UNKNOWN),
00103 federated_(false),
00104 shutdown_condition_(shutdown_mutex_),
00105 shutdown_complete_(false),
00106 monitor_(0),
00107 pub_id_gen_(dp_id_),
00108 automatic_liveliness_timer_ (*this),
00109 participant_liveliness_timer_ (*this)
00110 {
00111 (void) this->set_listener(a_listener, mask);
00112 monitor_ = TheServiceParticipant->monitor_factory_->create_dp_monitor(this);
00113 }
00114
00115 DomainParticipantImpl::~DomainParticipantImpl()
00116 {
00117 }
00118
00119 DDS::Publisher_ptr
00120 DomainParticipantImpl::create_publisher(
00121 const DDS::PublisherQos & qos,
00122 DDS::PublisherListener_ptr a_listener,
00123 DDS::StatusMask mask)
00124 {
00125 DDS::PublisherQos pub_qos = qos;
00126
00127 if (! this->validate_publisher_qos(pub_qos))
00128 return DDS::Publisher::_nil();
00129
00130 PublisherImpl* pub = 0;
00131 ACE_NEW_RETURN(pub,
00132 PublisherImpl(participant_handles_.next(),
00133 pub_id_gen_.next(),
00134 pub_qos,
00135 a_listener,
00136 mask,
00137 this),
00138 DDS::Publisher::_nil());
00139
00140 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00141 pub->enable();
00142 }
00143
00144 DDS::Publisher_ptr pub_obj(pub);
00145
00146
00147 Publisher_Pair pair(pub, pub_obj, NO_DUP);
00148
00149 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00150 tao_mon,
00151 this->publishers_protector_,
00152 DDS::Publisher::_nil());
00153
00154 if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
00155 ACE_ERROR((LM_ERROR,
00156 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
00157 ACE_TEXT("%p\n"),
00158 ACE_TEXT("insert")));
00159 return DDS::Publisher::_nil();
00160 }
00161
00162 return DDS::Publisher::_duplicate(pub_obj);
00163 }
00164
00165 DDS::ReturnCode_t
00166 DomainParticipantImpl::delete_publisher(
00167 DDS::Publisher_ptr p)
00168 {
00169
00170
00171
00172 PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
00173
00174 if (!the_servant) {
00175 ACE_ERROR((LM_ERROR,
00176 ACE_TEXT("(%P|%t) ERROR: ")
00177 ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00178 ACE_TEXT("Failed to obtain PublisherImpl.\n")));
00179 return DDS::RETCODE_ERROR;
00180 }
00181
00182 if (!the_servant->is_clean()) {
00183 ACE_ERROR((LM_ERROR,
00184 ACE_TEXT("(%P|%t) ERROR: ")
00185 ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00186 ACE_TEXT("The publisher is not empty.\n")));
00187 return DDS::RETCODE_PRECONDITION_NOT_MET;
00188 }
00189
00190 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00191 tao_mon,
00192 this->publishers_protector_,
00193 DDS::RETCODE_ERROR);
00194
00195 Publisher_Pair pair(the_servant, p, DUP);
00196
00197 if (OpenDDS::DCPS::remove(publishers_, pair) == -1) {
00198 ACE_ERROR((LM_ERROR,
00199 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_publisher, ")
00200 ACE_TEXT("%p\n"),
00201 ACE_TEXT("remove")));
00202 return DDS::RETCODE_ERROR;
00203
00204 } else {
00205 return DDS::RETCODE_OK;
00206 }
00207 }
00208
00209 DDS::Subscriber_ptr
00210 DomainParticipantImpl::create_subscriber(
00211 const DDS::SubscriberQos & qos,
00212 DDS::SubscriberListener_ptr a_listener,
00213 DDS::StatusMask mask)
00214 {
00215 DDS::SubscriberQos sub_qos = qos;
00216
00217 if (! this->validate_subscriber_qos(sub_qos)) {
00218 return DDS::Subscriber::_nil();
00219 }
00220
00221 SubscriberImpl* sub = 0 ;
00222 ACE_NEW_RETURN(sub,
00223 SubscriberImpl(participant_handles_.next(),
00224 sub_qos,
00225 a_listener,
00226 mask,
00227 this),
00228 DDS::Subscriber::_nil());
00229
00230 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00231 sub->enable();
00232 }
00233
00234 DDS::Subscriber_ptr sub_obj(sub);
00235
00236 Subscriber_Pair pair(sub, sub_obj, NO_DUP);
00237
00238 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00239 tao_mon,
00240 this->subscribers_protector_,
00241 DDS::Subscriber::_nil());
00242
00243 if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
00244 ACE_ERROR((LM_ERROR,
00245 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
00246 ACE_TEXT("%p\n"),
00247 ACE_TEXT("insert")));
00248 return DDS::Subscriber::_nil();
00249 }
00250
00251 return DDS::Subscriber::_duplicate(sub_obj);
00252 }
00253
00254 DDS::ReturnCode_t
00255 DomainParticipantImpl::delete_subscriber(
00256 DDS::Subscriber_ptr s)
00257 {
00258
00259
00260
00261 SubscriberImpl* the_servant = dynamic_cast<SubscriberImpl*>(s);
00262
00263 if (!the_servant) {
00264 ACE_ERROR((LM_ERROR,
00265 ACE_TEXT("(%P|%t) ERROR: ")
00266 ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00267 ACE_TEXT("Failed to obtain SubscriberImpl.\n")));
00268 return DDS::RETCODE_ERROR;
00269 }
00270
00271 if (!the_servant->is_clean()) {
00272 ACE_ERROR((LM_ERROR,
00273 ACE_TEXT("(%P|%t) ERROR: ")
00274 ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00275 ACE_TEXT("The subscriber is not empty.\n")));
00276 return DDS::RETCODE_PRECONDITION_NOT_MET;
00277 }
00278
00279 DDS::ReturnCode_t ret
00280 = the_servant->delete_contained_entities();
00281
00282 if (ret != DDS::RETCODE_OK) {
00283 ACE_ERROR((LM_ERROR,
00284 ACE_TEXT("(%P|%t) ERROR: ")
00285 ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00286 ACE_TEXT("Failed to delete contained entities.\n")));
00287 return DDS::RETCODE_ERROR;
00288 }
00289
00290 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00291 tao_mon,
00292 this->subscribers_protector_,
00293 DDS::RETCODE_ERROR);
00294
00295 Subscriber_Pair pair(the_servant, s, DUP);
00296
00297 if (OpenDDS::DCPS::remove(subscribers_, pair) == -1) {
00298 ACE_ERROR((LM_ERROR,
00299 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_subscriber, ")
00300 ACE_TEXT("%p\n"),
00301 ACE_TEXT("remove")));
00302 return DDS::RETCODE_ERROR;
00303
00304 } else {
00305 return DDS::RETCODE_OK;
00306 }
00307 }
00308
00309 DDS::Subscriber_ptr
00310 DomainParticipantImpl::get_builtin_subscriber()
00311 {
00312 return DDS::Subscriber::_duplicate(bit_subscriber_.in());
00313 }
00314
00315 DDS::Topic_ptr
00316 DomainParticipantImpl::create_topic(
00317 const char * topic_name,
00318 const char * type_name,
00319 const DDS::TopicQos & qos,
00320 DDS::TopicListener_ptr a_listener,
00321 DDS::StatusMask mask)
00322 {
00323 return create_topic_i(topic_name,
00324 type_name,
00325 qos,
00326 a_listener,
00327 mask,
00328 0);
00329 }
00330
00331 DDS::Topic_ptr
00332 DomainParticipantImpl::create_typeless_topic(
00333 const char * topic_name,
00334 const char * type_name,
00335 bool type_has_keys,
00336 const DDS::TopicQos & qos,
00337 DDS::TopicListener_ptr a_listener,
00338 DDS::StatusMask mask)
00339 {
00340 int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
00341
00342 return create_topic_i(topic_name,
00343 type_name,
00344 qos,
00345 a_listener,
00346 mask,
00347 topic_mask);
00348 }
00349
00350
00351 DDS::Topic_ptr
00352 DomainParticipantImpl::create_topic_i(
00353 const char * topic_name,
00354 const char * type_name,
00355 const DDS::TopicQos & qos,
00356 DDS::TopicListener_ptr a_listener,
00357 DDS::StatusMask mask,
00358 int topic_mask)
00359 {
00360 DDS::TopicQos topic_qos;
00361
00362 if (qos == TOPIC_QOS_DEFAULT) {
00363 this->get_default_topic_qos(topic_qos);
00364
00365 } else {
00366 topic_qos = qos;
00367 }
00368
00369 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00370 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00371 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00372 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00373
00374 if (!Qos_Helper::valid(topic_qos)) {
00375 ACE_ERROR((LM_ERROR,
00376 ACE_TEXT("(%P|%t) ERROR: ")
00377 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00378 ACE_TEXT("invalid qos.\n")));
00379 return DDS::Topic::_nil();
00380 }
00381
00382 if (!Qos_Helper::consistent(topic_qos)) {
00383 ACE_ERROR((LM_ERROR,
00384 ACE_TEXT("(%P|%t) ERROR: ")
00385 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00386 ACE_TEXT("inconsistent qos.\n")));
00387 return DDS::Topic::_nil();
00388 }
00389
00390 TopicMap::mapped_type* entry = 0;
00391 bool found = false;
00392 {
00393 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00394 tao_mon,
00395 this->topics_protector_,
00396 DDS::Topic::_nil());
00397
00398 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00399 if (topic_descrs_.count(topic_name)) {
00400 if (DCPS_debug_level > 3) {
00401 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00402 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00403 ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
00404 ACE_TEXT("by a TopicDescription.\n"), topic_name));
00405 }
00406 return 0;
00407 }
00408 #endif
00409
00410 if (Util::find(topics_, topic_name, entry) == 0) {
00411 found = true;
00412 }
00413 }
00414
00415 if (found) {
00416 CORBA::String_var found_type
00417 = entry->pair_.svt_->get_type_name();
00418
00419 if (ACE_OS::strcmp(type_name, found_type) == 0) {
00420 DDS::TopicQos found_qos;
00421 entry->pair_.svt_->get_qos(found_qos);
00422
00423 if (topic_qos == found_qos) {
00424 {
00425 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00426 tao_mon,
00427 this->topics_protector_,
00428 DDS::Topic::_nil());
00429 ++entry->client_refs_;
00430 }
00431 return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00432
00433 } else {
00434 if (DCPS_debug_level >= 1) {
00435 ACE_DEBUG((LM_DEBUG,
00436 ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00437 ACE_TEXT("qos not match: topic_name=%C type_name=%C\n"),
00438 topic_name, type_name));
00439 }
00440
00441 return DDS::Topic::_nil();
00442 }
00443
00444 } else {
00445 if (DCPS_debug_level >= 1) {
00446 ACE_DEBUG((LM_DEBUG,
00447 ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00448 ACE_TEXT(" not match: topic_name=%C type_name=%C\n"),
00449 topic_name, type_name));
00450 }
00451
00452 return DDS::Topic::_nil();
00453 }
00454
00455 } else {
00456
00457 OpenDDS::DCPS::TypeSupport_var type_support;
00458 bool has_keys = (topic_mask & TOPIC_TYPE_HAS_KEYS);
00459
00460 if (0 == topic_mask) {
00461
00462 type_support = Registered_Data_Types->lookup(this, type_name);
00463 if (CORBA::is_nil(type_support)) {
00464 if (DCPS_debug_level >= 1) {
00465 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00466 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00467 ACE_TEXT("can't create a topic=%C type_name=%C ")
00468 ACE_TEXT("is not registered.\n"),
00469 topic_name, type_name));
00470 }
00471 return DDS::Topic::_nil();
00472 }
00473 has_keys = type_support->has_dcps_key();
00474 }
00475
00476 RepoId topic_id = GUID_UNKNOWN;
00477 TopicStatus status = TOPIC_DISABLED;
00478
00479 if (is_enabled()) {
00480 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00481 status = disco->assert_topic(topic_id,
00482 domain_id_,
00483 dp_id_,
00484 topic_name,
00485 type_name,
00486 topic_qos,
00487 has_keys);
00488 }
00489
00490 if (status == CREATED || status == FOUND || status == TOPIC_DISABLED) {
00491 DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00492 topic_name,
00493 type_name,
00494 topic_qos,
00495 a_listener,
00496 mask,
00497 type_support);
00498 if (this->monitor_) {
00499 this->monitor_->report();
00500 }
00501 return new_topic;
00502
00503 } else {
00504 ACE_ERROR((LM_ERROR,
00505 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
00506 ACE_TEXT("assert_topic failed with return value %d.\n"), status));
00507 return DDS::Topic::_nil();
00508 }
00509 }
00510 }
00511
00512 DDS::ReturnCode_t
00513 DomainParticipantImpl::delete_topic(
00514 DDS::Topic_ptr a_topic)
00515 {
00516 return delete_topic_i(a_topic, false);
00517 }
00518
00519 DDS::ReturnCode_t
00520 DomainParticipantImpl::delete_topic_i(
00521 DDS::Topic_ptr a_topic,
00522 bool remove_objref)
00523 {
00524
00525 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00526
00527 try {
00528
00529
00530
00531 TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00532
00533 if (!the_topic_servant) {
00534 ACE_ERROR_RETURN((LM_ERROR,
00535 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00536 ACE_TEXT("%p\n"),
00537 ACE_TEXT("failed to obtain TopicImpl.")),
00538 DDS::RETCODE_ERROR);
00539 }
00540
00541 CORBA::String_var topic_name = the_topic_servant->get_name();
00542
00543 DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
00544
00545 DomainParticipantImpl* the_dp_servant =
00546 dynamic_cast<DomainParticipantImpl*>(dp.in());
00547
00548 if (the_dp_servant != this ||
00549 (!remove_objref && the_topic_servant->has_entity_refs())) {
00550
00551
00552 return DDS::RETCODE_PRECONDITION_NOT_MET;
00553 }
00554
00555 {
00556 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00557 tao_mon,
00558 this->topics_protector_,
00559 DDS::RETCODE_ERROR);
00560
00561 TopicMap::mapped_type* entry = 0;
00562
00563 if (Util::find(topics_, topic_name.in(), entry) == -1) {
00564 ACE_ERROR_RETURN((LM_ERROR,
00565 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00566 ACE_TEXT("%p\n"),
00567 ACE_TEXT("find")),
00568 DDS::RETCODE_ERROR);
00569 }
00570
00571 --entry->client_refs_;
00572
00573 if (remove_objref == true ||
00574 0 == entry->client_refs_) {
00575
00576
00577 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00578 TopicStatus status
00579 = disco->remove_topic(the_dp_servant->get_domain_id(),
00580 the_dp_servant->get_id(),
00581 the_topic_servant->get_id());
00582
00583 if (status != REMOVED) {
00584 ACE_ERROR_RETURN((LM_ERROR,
00585 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00586 ACE_TEXT("remove_topic failed with return value %d\n"), status),
00587 DDS::RETCODE_ERROR);
00588 }
00589
00590
00591
00592 if (topics_.erase(topic_name.in()) == 0) {
00593 ACE_ERROR_RETURN((LM_ERROR,
00594 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00595 ACE_TEXT("%p \n"),
00596 ACE_TEXT("unbind")),
00597 DDS::RETCODE_ERROR);
00598
00599 } else
00600 return DDS::RETCODE_OK;
00601
00602 }
00603 }
00604
00605 } catch (...) {
00606 ACE_ERROR((LM_ERROR,
00607 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00608 ACE_TEXT(" Caught Unknown Exception \n")));
00609 ret = DDS::RETCODE_ERROR;
00610 }
00611
00612 return ret;
00613 }
00614
00615
00616
00617 DDS::Topic_ptr
00618 DomainParticipantImpl::find_topic(
00619 const char * topic_name,
00620 const DDS::Duration_t & timeout)
00621 {
00622 ACE_Time_Value timeout_tv
00623 = ACE_OS::gettimeofday() + ACE_Time_Value(timeout.sec, timeout.nanosec/1000);
00624
00625 bool first_time = true;
00626
00627 while (first_time || ACE_OS::gettimeofday() < timeout_tv) {
00628 if (first_time) {
00629 first_time = false;
00630 }
00631
00632 TopicMap::mapped_type* entry = 0;
00633 {
00634 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00635 tao_mon,
00636 this->topics_protector_,
00637 DDS::Topic::_nil());
00638
00639 if (Util::find(topics_, topic_name, entry) == 0) {
00640 ++entry->client_refs_;
00641 return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00642 }
00643 }
00644
00645 RepoId topic_id;
00646 CORBA::String_var type_name;
00647 DDS::TopicQos_var qos;
00648
00649 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00650 TopicStatus status = disco->find_topic(domain_id_,
00651 topic_name,
00652 type_name.out(),
00653 qos.out(),
00654 topic_id);
00655
00656
00657 if (status == FOUND) {
00658 OpenDDS::DCPS::TypeSupport_var type_support =
00659 Registered_Data_Types->lookup(this, type_name.in());
00660 if (CORBA::is_nil(type_support)) {
00661 if (DCPS_debug_level) {
00662 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00663 ACE_TEXT("DomainParticipantImpl::find_topic, ")
00664 ACE_TEXT("can't create a Topic: type_name \"%C\" ")
00665 ACE_TEXT("is not registered.\n"), type_name.in()));
00666 }
00667
00668 return DDS::Topic::_nil();
00669 }
00670
00671 DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00672 topic_name,
00673 type_name,
00674 qos,
00675 DDS::TopicListener::_nil(),
00676 OpenDDS::DCPS::DEFAULT_STATUS_MASK,
00677 type_support);
00678 return new_topic;
00679
00680 } else if (status == INTERNAL_ERROR) {
00681 ACE_ERROR((LM_ERROR,
00682 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
00683 ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
00684 return DDS::Topic::_nil();
00685 } else {
00686 ACE_Time_Value now = ACE_OS::gettimeofday();
00687
00688 if (now < timeout_tv) {
00689 ACE_Time_Value remaining = timeout_tv - now;
00690
00691 if (remaining.sec() >= 1) {
00692 ACE_OS::sleep(1);
00693
00694 } else {
00695 ACE_OS::sleep(remaining);
00696 }
00697 }
00698 }
00699 }
00700
00701 if (DCPS_debug_level >= 1) {
00702
00703 ACE_DEBUG((LM_DEBUG,
00704 ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
00705 ACE_TEXT("timed out. \n")));
00706 }
00707
00708 return DDS::Topic::_nil();
00709 }
00710
00711 DDS::TopicDescription_ptr
00712 DomainParticipantImpl::lookup_topicdescription(const char* name)
00713 {
00714 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00715 tao_mon,
00716 this->topics_protector_,
00717 DDS::Topic::_nil());
00718
00719 TopicMap::mapped_type* entry = 0;
00720
00721 if (Util::find(topics_, name, entry) == -1) {
00722 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00723 TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
00724 if (iter != topic_descrs_.end()) {
00725 return DDS::TopicDescription::_duplicate(iter->second);
00726 }
00727 #endif
00728 return DDS::TopicDescription::_nil();
00729
00730 } else {
00731 return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
00732 }
00733 }
00734
00735 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00736
00737 DDS::ContentFilteredTopic_ptr
00738 DomainParticipantImpl::create_contentfilteredtopic(
00739 const char* name,
00740 DDS::Topic_ptr related_topic,
00741 const char* filter_expression,
00742 const DDS::StringSeq& expression_parameters)
00743 {
00744 if (CORBA::is_nil(related_topic)) {
00745 if (DCPS_debug_level > 3) {
00746 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00747 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00748 ACE_TEXT("can't create a content-filtered topic due to null related ")
00749 ACE_TEXT("topic.\n")));
00750 }
00751 return 0;
00752 }
00753
00754 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00755
00756 if (topics_.count(name)) {
00757 if (DCPS_debug_level > 3) {
00758 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00759 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00760 ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00761 ACE_TEXT("already in use by a Topic.\n"), name));
00762 }
00763 return 0;
00764 }
00765
00766 if (topic_descrs_.count(name)) {
00767 if (DCPS_debug_level > 3) {
00768 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00769 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00770 ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00771 ACE_TEXT("already in use by a TopicDescription.\n"), name));
00772 }
00773 return 0;
00774 }
00775
00776 DDS::ContentFilteredTopic_var cft;
00777 try {
00778
00779
00780 cft = new ContentFilteredTopicImpl(name, related_topic, filter_expression, this);
00781 if (cft->set_expression_parameters(expression_parameters) != DDS::RETCODE_OK) {
00782 return 0;
00783 }
00784 } catch (const std::exception& e) {
00785 if (DCPS_debug_level) {
00786 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00787 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00788 ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
00789 ACE_TEXT("%C.\n"), e.what()));
00790 }
00791 return 0;
00792 }
00793 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
00794 topic_descrs_[name] = td;
00795 return cft._retn();
00796 }
00797
00798 DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
00799 DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
00800 {
00801 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00802 DDS::RETCODE_OUT_OF_RESOURCES);
00803 DDS::ContentFilteredTopic_var cft =
00804 DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
00805 CORBA::String_var name = cft->get_name();
00806 TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
00807 if (iter == topic_descrs_.end()) {
00808 if (DCPS_debug_level > 3) {
00809 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00810 ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00811 ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00812 ACE_TEXT("because it is not in the set.\n"), name.in ()));
00813 }
00814 return DDS::RETCODE_PRECONDITION_NOT_MET;
00815 }
00816
00817 TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
00818
00819 if (!tdi) {
00820 if (DCPS_debug_level > 3) {
00821 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00822 ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00823 ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00824 ACE_TEXT("failed to obtain TopicDescriptionImpl\n"), name.in()));
00825 }
00826 return DDS::RETCODE_ERROR;
00827 }
00828
00829 if (tdi->has_entity_refs()) {
00830 if (DCPS_debug_level > 3) {
00831 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00832 ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00833 ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00834 ACE_TEXT("because it is used by a datareader\n"), name.in ()));
00835 }
00836 return DDS::RETCODE_PRECONDITION_NOT_MET;
00837 }
00838 topic_descrs_.erase(iter);
00839 return DDS::RETCODE_OK;
00840 }
00841
00842 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00843
00844 #ifndef OPENDDS_NO_MULTI_TOPIC
00845
00846 DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
00847 const char* name, const char* type_name,
00848 const char* subscription_expression,
00849 const DDS::StringSeq& expression_parameters)
00850 {
00851 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00852
00853 if (topics_.count(name)) {
00854 if (DCPS_debug_level > 3) {
00855 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00856 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00857 ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00858 ACE_TEXT("already in use by a Topic.\n"), name));
00859 }
00860 return 0;
00861 }
00862
00863 if (topic_descrs_.count(name)) {
00864 if (DCPS_debug_level > 3) {
00865 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00866 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00867 ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00868 ACE_TEXT("already in use by a TopicDescription.\n"), name));
00869 }
00870 return 0;
00871 }
00872
00873 DDS::MultiTopic_var mt;
00874 try {
00875 mt = new MultiTopicImpl(name, type_name, subscription_expression,
00876 expression_parameters, this);
00877 } catch (const std::exception& e) {
00878 if (DCPS_debug_level) {
00879 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00880 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00881 ACE_TEXT("can't create a multi topic due to runtime error: ")
00882 ACE_TEXT("%C.\n"), e.what()));
00883 }
00884 return 0;
00885 }
00886 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
00887 topic_descrs_[name] = td;
00888 return mt._retn();
00889 }
00890
00891 DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
00892 DDS::MultiTopic_ptr a_multitopic)
00893 {
00894 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00895 DDS::RETCODE_OUT_OF_RESOURCES);
00896 DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
00897 CORBA::String_var mt_name = mt->get_name();
00898 TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
00899 if (iter == topic_descrs_.end()) {
00900 if (DCPS_debug_level > 3) {
00901 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00902 ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00903 ACE_TEXT("can't delete a multitopic \"%C\" ")
00904 ACE_TEXT("because it is not in the set.\n"), mt_name.in ()));
00905 }
00906 return DDS::RETCODE_PRECONDITION_NOT_MET;
00907 }
00908
00909 TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
00910
00911 if (!tdi) {
00912 if (DCPS_debug_level > 3) {
00913 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00914 ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00915 ACE_TEXT("can't delete a multitopic topic \"%C\" ")
00916 ACE_TEXT("failed to obtain TopicDescriptionImpl.\n"),
00917 mt_name.in()));
00918 }
00919 return DDS::RETCODE_ERROR;
00920 }
00921
00922 if (tdi->has_entity_refs()) {
00923 if (DCPS_debug_level > 3) {
00924 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00925 ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00926 ACE_TEXT("can't delete a multitopic topic \"%C\" ")
00927 ACE_TEXT("because it is used by a datareader.\n"), mt_name.in ()));
00928 }
00929 return DDS::RETCODE_PRECONDITION_NOT_MET;
00930 }
00931 topic_descrs_.erase(iter);
00932 return DDS::RETCODE_OK;
00933 }
00934
00935 #endif // OPENDDS_NO_MULTI_TOPIC
00936
00937 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00938
00939 RcHandle<FilterEvaluator>
00940 DomainParticipantImpl::get_filter_eval(const char* filter)
00941 {
00942 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
00943 RcHandle<FilterEvaluator>());
00944
00945 RcHandle<FilterEvaluator>& result = filter_cache_[filter];
00946 if (!result) {
00947 try {
00948 result = make_rch<FilterEvaluator>(filter, false);
00949 } catch (const std::exception& e) {
00950 filter_cache_.erase(filter);
00951 if (DCPS_debug_level) {
00952 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00953 ACE_TEXT("DomainParticipantImpl::get_filter_eval, ")
00954 ACE_TEXT("can't create a writer-side content filter due to ")
00955 ACE_TEXT("runtime error: %C.\n"), e.what()));
00956 }
00957 }
00958 }
00959 return result;
00960 }
00961
00962 void
00963 DomainParticipantImpl::deref_filter_eval(const char* filter)
00964 {
00965 ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
00966 typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00967 Map::iterator iter = filter_cache_.find(filter);
00968 if (iter != filter_cache_.end()) {
00969 if (iter->second->ref_count() == 1) {
00970 filter_cache_.erase(iter);
00971 }
00972 }
00973 }
00974
00975 #endif
00976
00977 DDS::ReturnCode_t
00978 DomainParticipantImpl::delete_contained_entities()
00979 {
00980
00981 set_deleted(true);
00982
00983
00984
00985
00986 Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
00987 if (disc)
00988 disc->fini_bit(this);
00989
00990 if (ACE_OS::thr_equal(TheServiceParticipant->reactor_owner(),
00991 ACE_Thread::self())) {
00992 handle_exception(0);
00993
00994 } else {
00995 TheServiceParticipant->reactor()->notify(this);
00996
00997 shutdown_mutex_.acquire();
00998 while (!shutdown_complete_) {
00999 shutdown_condition_.wait();
01000 }
01001 shutdown_complete_ = false;
01002 shutdown_mutex_.release();
01003 }
01004
01005 bit_subscriber_ = DDS::Subscriber::_nil();
01006
01007 OpenDDS::DCPS::Registered_Data_Types->unregister_participant(this);
01008
01009
01010 set_deleted(false);
01011 return shutdown_result_;
01012 }
01013
01014 CORBA::Boolean
01015 DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
01016 {
01017
01018
01019 {
01020 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01021 guard,
01022 this->topics_protector_,
01023 false);
01024
01025 for (TopicMap::iterator it(topics_.begin());
01026 it != topics_.end(); ++it) {
01027 if (a_handle == it->second.pair_.svt_->get_instance_handle())
01028 return true;
01029 }
01030 }
01031
01032 {
01033 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01034 guard,
01035 this->subscribers_protector_,
01036 false);
01037
01038 for (SubscriberSet::iterator it(subscribers_.begin());
01039 it != subscribers_.end(); ++it) {
01040 if (a_handle == it->svt_->get_instance_handle())
01041 return true;
01042 }
01043 }
01044
01045 {
01046 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01047 guard,
01048 this->publishers_protector_,
01049 false);
01050
01051 for (PublisherSet::iterator it(publishers_.begin());
01052 it != publishers_.end(); ++it) {
01053 if (a_handle == it->svt_->get_instance_handle())
01054 return true;
01055 }
01056 }
01057
01058
01059
01060 for (SubscriberSet::iterator it(subscribers_.begin());
01061 it != subscribers_.end(); ++it) {
01062 if (it->svt_->contains_reader(a_handle))
01063 return true;
01064 }
01065
01066 for (PublisherSet::iterator it(publishers_.begin());
01067 it != publishers_.end(); ++it) {
01068 if (it->svt_->contains_writer(a_handle))
01069 return true;
01070 }
01071
01072 return false;
01073 }
01074
01075 DDS::ReturnCode_t
01076 DomainParticipantImpl::set_qos(
01077 const DDS::DomainParticipantQos & qos)
01078 {
01079 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01080 if (qos_ == qos)
01081 return DDS::RETCODE_OK;
01082
01083
01084 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
01085 return DDS::RETCODE_IMMUTABLE_POLICY;
01086
01087 } else {
01088 qos_ = qos;
01089
01090 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01091 const bool status =
01092 disco->update_domain_participant_qos(domain_id_,
01093 dp_id_,
01094 qos_);
01095
01096 if (!status) {
01097 ACE_ERROR_RETURN((LM_ERROR,
01098 ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
01099 ACE_TEXT("failed on compatibility check. \n")),
01100 DDS::RETCODE_ERROR);
01101 }
01102 }
01103
01104 return DDS::RETCODE_OK;
01105
01106 } else {
01107 return DDS::RETCODE_INCONSISTENT_POLICY;
01108 }
01109 }
01110
01111 DDS::ReturnCode_t
01112 DomainParticipantImpl::get_qos(
01113 DDS::DomainParticipantQos & qos)
01114 {
01115 qos = qos_;
01116 return DDS::RETCODE_OK;
01117 }
01118
01119 DDS::ReturnCode_t
01120 DomainParticipantImpl::set_listener(
01121 DDS::DomainParticipantListener_ptr a_listener,
01122 DDS::StatusMask mask)
01123 {
01124 listener_mask_ = mask;
01125
01126 listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
01127 return DDS::RETCODE_OK;
01128 }
01129
01130 DDS::DomainParticipantListener_ptr
01131 DomainParticipantImpl::get_listener()
01132 {
01133 return DDS::DomainParticipantListener::_duplicate(listener_.in());
01134 }
01135
01136 DDS::ReturnCode_t
01137 DomainParticipantImpl::ignore_participant(
01138 DDS::InstanceHandle_t handle)
01139 {
01140 #if !defined (DDS_HAS_MINIMUM_BIT)
01141
01142 if (enabled_ == false) {
01143 ACE_ERROR_RETURN((LM_ERROR,
01144 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01145 ACE_TEXT("Entity is not enabled. \n")),
01146 DDS::RETCODE_NOT_ENABLED);
01147 }
01148
01149 RepoId ignoreId = get_repoid(handle);
01150 HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
01151
01152 if (location == this->ignored_participants_.end()) {
01153 this->ignored_participants_[ ignoreId] = handle;
01154 }
01155 else {
01156 return DDS::RETCODE_OK;
01157 }
01158
01159 if (DCPS_debug_level >= 4) {
01160 GuidConverter converter(dp_id_);
01161 ACE_DEBUG((LM_DEBUG,
01162 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01163 ACE_TEXT("%C ignoring handle %x.\n"),
01164 OPENDDS_STRING(converter).c_str(),
01165 handle));
01166 }
01167
01168 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01169 if (!disco->ignore_domain_participant(domain_id_,
01170 dp_id_,
01171 ignoreId)) {
01172 ACE_ERROR_RETURN((LM_ERROR,
01173 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01174 ACE_TEXT("Could not ignore domain participant.\n")),
01175 DDS::RETCODE_NOT_ENABLED);
01176 return DDS::RETCODE_ERROR;
01177 }
01178
01179
01180 if (DCPS_debug_level >= 4) {
01181 GuidConverter converter(dp_id_);
01182 ACE_DEBUG((LM_DEBUG,
01183 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01184 ACE_TEXT("%C repo call returned.\n"),
01185 OPENDDS_STRING(converter).c_str()));
01186 }
01187
01188 return DDS::RETCODE_OK;
01189 #else
01190 ACE_UNUSED_ARG(handle);
01191 return DDS::RETCODE_UNSUPPORTED;
01192 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01193 }
01194
01195 DDS::ReturnCode_t
01196 DomainParticipantImpl::ignore_topic(
01197 DDS::InstanceHandle_t handle)
01198 {
01199 #if !defined (DDS_HAS_MINIMUM_BIT)
01200
01201 if (enabled_ == false) {
01202 ACE_ERROR_RETURN((LM_ERROR,
01203 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01204 ACE_TEXT(" Entity is not enabled. \n")),
01205 DDS::RETCODE_NOT_ENABLED);
01206 }
01207
01208 RepoId ignoreId = get_repoid(handle);
01209 HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
01210
01211 if (location == this->ignored_topics_.end()) {
01212 this->ignored_topics_[ ignoreId] = handle;
01213 }
01214 else {
01215 return DDS::RETCODE_OK;
01216 }
01217
01218 if (DCPS_debug_level >= 4) {
01219 GuidConverter converter(dp_id_);
01220 ACE_DEBUG((LM_DEBUG,
01221 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
01222 ACE_TEXT("%C ignoring handle %x.\n"),
01223 OPENDDS_STRING(converter).c_str(),
01224 handle));
01225 }
01226
01227 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01228 if (!disco->ignore_topic(domain_id_,
01229 dp_id_,
01230 ignoreId)) {
01231 ACE_ERROR((LM_ERROR,
01232 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01233 ACE_TEXT(" Could not ignore topic.\n")));
01234 }
01235
01236 return DDS::RETCODE_OK;
01237 #else
01238 ACE_UNUSED_ARG(handle);
01239 return DDS::RETCODE_UNSUPPORTED;
01240 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01241 }
01242
01243 DDS::ReturnCode_t
01244 DomainParticipantImpl::ignore_publication(
01245 DDS::InstanceHandle_t handle)
01246 {
01247 #if !defined (DDS_HAS_MINIMUM_BIT)
01248
01249 if (enabled_ == false) {
01250 ACE_ERROR_RETURN((LM_ERROR,
01251 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01252 ACE_TEXT(" Entity is not enabled. \n")),
01253 DDS::RETCODE_NOT_ENABLED);
01254 }
01255
01256 if (DCPS_debug_level >= 4) {
01257 GuidConverter converter(dp_id_);
01258 ACE_DEBUG((LM_DEBUG,
01259 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
01260 ACE_TEXT("%C ignoring handle %x.\n"),
01261 OPENDDS_STRING(converter).c_str(),
01262 handle));
01263 }
01264
01265 RepoId ignoreId = get_repoid(handle);
01266 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01267 if (!disco->ignore_publication(domain_id_,
01268 dp_id_,
01269 ignoreId)) {
01270 ACE_ERROR_RETURN((LM_ERROR,
01271 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01272 ACE_TEXT(" could not ignore publication in discovery. \n")),
01273 DDS::RETCODE_ERROR);
01274 }
01275
01276 return DDS::RETCODE_OK;
01277 #else
01278 ACE_UNUSED_ARG(handle);
01279 return DDS::RETCODE_UNSUPPORTED;
01280 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01281 }
01282
01283 DDS::ReturnCode_t
01284 DomainParticipantImpl::ignore_subscription(
01285 DDS::InstanceHandle_t handle)
01286 {
01287 #if !defined (DDS_HAS_MINIMUM_BIT)
01288
01289 if (enabled_ == false) {
01290 ACE_ERROR_RETURN((LM_ERROR,
01291 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01292 ACE_TEXT(" Entity is not enabled. \n")),
01293 DDS::RETCODE_NOT_ENABLED);
01294 }
01295
01296 if (DCPS_debug_level >= 4) {
01297 GuidConverter converter(dp_id_);
01298 ACE_DEBUG((LM_DEBUG,
01299 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
01300 ACE_TEXT("%C ignoring handle %d.\n"),
01301 OPENDDS_STRING(converter).c_str(),
01302 handle));
01303 }
01304
01305
01306 RepoId ignoreId = get_repoid(handle);
01307 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01308 if (!disco->ignore_subscription(domain_id_,
01309 dp_id_,
01310 ignoreId)) {
01311 ACE_ERROR_RETURN((LM_ERROR,
01312 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01313 ACE_TEXT(" could not ignore subscription in discovery. \n")),
01314 DDS::RETCODE_ERROR);
01315 }
01316
01317 return DDS::RETCODE_OK;
01318 #else
01319 ACE_UNUSED_ARG(handle);
01320 return DDS::RETCODE_UNSUPPORTED;
01321 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01322 }
01323
01324 DDS::DomainId_t
01325 DomainParticipantImpl::get_domain_id()
01326 {
01327 return domain_id_;
01328 }
01329
01330 DDS::ReturnCode_t
01331 DomainParticipantImpl::assert_liveliness()
01332 {
01333
01334
01335
01336
01337
01338
01339
01340
01341 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01342 tao_mon,
01343 this->publishers_protector_,
01344 DDS::RETCODE_ERROR);
01345
01346 for (PublisherSet::iterator it(publishers_.begin());
01347 it != publishers_.end(); ++it) {
01348 it->svt_->assert_liveliness_by_participant();
01349 }
01350
01351 last_liveliness_activity_ = ACE_OS::gettimeofday();
01352
01353 return DDS::RETCODE_OK;
01354 }
01355
01356 DDS::ReturnCode_t
01357 DomainParticipantImpl::set_default_publisher_qos(
01358 const DDS::PublisherQos & qos)
01359 {
01360 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01361 default_publisher_qos_ = qos;
01362 return DDS::RETCODE_OK;
01363
01364 } else {
01365 return DDS::RETCODE_INCONSISTENT_POLICY;
01366 }
01367 }
01368
01369 DDS::ReturnCode_t
01370 DomainParticipantImpl::get_default_publisher_qos(
01371 DDS::PublisherQos & qos)
01372 {
01373 qos = default_publisher_qos_;
01374 return DDS::RETCODE_OK;
01375 }
01376
01377 DDS::ReturnCode_t
01378 DomainParticipantImpl::set_default_subscriber_qos(
01379 const DDS::SubscriberQos & qos)
01380 {
01381 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01382 default_subscriber_qos_ = qos;
01383 return DDS::RETCODE_OK;
01384
01385 } else {
01386 return DDS::RETCODE_INCONSISTENT_POLICY;
01387 }
01388 }
01389
01390 DDS::ReturnCode_t
01391 DomainParticipantImpl::get_default_subscriber_qos(
01392 DDS::SubscriberQos & qos)
01393 {
01394 qos = default_subscriber_qos_;
01395 return DDS::RETCODE_OK;
01396 }
01397
01398 DDS::ReturnCode_t
01399 DomainParticipantImpl::set_default_topic_qos(
01400 const DDS::TopicQos & qos)
01401 {
01402 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01403 default_topic_qos_ = qos;
01404 return DDS::RETCODE_OK;
01405
01406 } else {
01407 return DDS::RETCODE_INCONSISTENT_POLICY;
01408 }
01409 }
01410
01411 DDS::ReturnCode_t
01412 DomainParticipantImpl::get_default_topic_qos(
01413 DDS::TopicQos & qos)
01414 {
01415 qos = default_topic_qos_;
01416 return DDS::RETCODE_OK;
01417 }
01418
01419 DDS::ReturnCode_t
01420 DomainParticipantImpl::get_current_time(
01421 DDS::Time_t & current_time)
01422 {
01423 current_time
01424 = OpenDDS::DCPS::time_value_to_time(
01425 ACE_OS::gettimeofday());
01426 return DDS::RETCODE_OK;
01427 }
01428
01429 #if !defined (DDS_HAS_MINIMUM_BIT)
01430
01431 DDS::ReturnCode_t
01432 DomainParticipantImpl::get_discovered_participants(
01433 DDS::InstanceHandleSeq & participant_handles)
01434 {
01435 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01436 guard,
01437 this->handle_protector_,
01438 DDS::RETCODE_ERROR);
01439
01440 HandleMap::const_iterator itEnd = this->handles_.end();
01441
01442 for (HandleMap::const_iterator iter = this->handles_.begin();
01443 iter != itEnd; ++iter) {
01444 GuidConverter converter(iter->first);
01445
01446 if (converter.entityKind() == KIND_PARTICIPANT)
01447 {
01448
01449 if (iter->first == this->dp_id_
01450 || (this->ignored_participants_.find(iter->first)
01451 != this->ignored_participants_.end ())) {
01452 continue;
01453 }
01454
01455 push_back(participant_handles, iter->second);
01456 }
01457 }
01458
01459 return DDS::RETCODE_OK;
01460 }
01461
01462 DDS::ReturnCode_t
01463 DomainParticipantImpl::get_discovered_participant_data(
01464 DDS::ParticipantBuiltinTopicData & participant_data,
01465 DDS::InstanceHandle_t participant_handle)
01466 {
01467 {
01468 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01469 guard,
01470 this->handle_protector_,
01471 DDS::RETCODE_ERROR);
01472
01473 bool found = false;
01474 HandleMap::const_iterator itEnd = this->handles_.end();
01475
01476 for (HandleMap::const_iterator iter = this->handles_.begin();
01477 iter != itEnd; ++iter) {
01478 GuidConverter converter(iter->first);
01479
01480 if (participant_handle == iter->second
01481 && converter.entityKind() == KIND_PARTICIPANT) {
01482 found = true;
01483 break;
01484 }
01485 }
01486
01487 if (!found)
01488 return DDS::RETCODE_PRECONDITION_NOT_MET;
01489 }
01490
01491 DDS::SampleInfoSeq info;
01492 DDS::ParticipantBuiltinTopicDataSeq data;
01493 DDS::DataReader_var dr =
01494 this->bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
01495 DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
01496 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
01497 DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
01498 info,
01499 1,
01500 participant_handle,
01501 DDS::ANY_SAMPLE_STATE,
01502 DDS::ANY_VIEW_STATE,
01503 DDS::ANY_INSTANCE_STATE);
01504
01505 if (ret == DDS::RETCODE_OK) {
01506 if (info[0].valid_data)
01507 participant_data = data[0];
01508
01509 else
01510 return DDS::RETCODE_NO_DATA;
01511 }
01512
01513 return ret;
01514 }
01515
01516 DDS::ReturnCode_t
01517 DomainParticipantImpl::get_discovered_topics(
01518 DDS::InstanceHandleSeq & topic_handles)
01519 {
01520 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01521 guard,
01522 this->handle_protector_,
01523 DDS::RETCODE_ERROR);
01524
01525 HandleMap::const_iterator itEnd = this->handles_.end();
01526
01527 for (HandleMap::const_iterator iter = this->handles_.begin();
01528 iter != itEnd; ++iter) {
01529 GuidConverter converter(iter->first);
01530
01531 if (converter.isTopic()) {
01532
01533
01534 if (this->ignored_topics_.find(iter->first)
01535 != this->ignored_topics_.end ()) {
01536 continue;
01537 }
01538
01539 push_back(topic_handles, iter->second);
01540 }
01541 }
01542
01543 return DDS::RETCODE_OK;
01544 }
01545
01546 DDS::ReturnCode_t
01547 DomainParticipantImpl::get_discovered_topic_data(
01548 DDS::TopicBuiltinTopicData & topic_data,
01549 DDS::InstanceHandle_t topic_handle)
01550 {
01551 {
01552 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01553 guard,
01554 this->handle_protector_,
01555 DDS::RETCODE_ERROR);
01556
01557 bool found = false;
01558 HandleMap::const_iterator itEnd = this->handles_.end();
01559
01560 for (HandleMap::const_iterator iter = this->handles_.begin();
01561 iter != itEnd; ++iter) {
01562 GuidConverter converter(iter->first);
01563
01564 if (topic_handle == iter->second && converter.isTopic()) {
01565 found = true;
01566 break;
01567 }
01568 }
01569
01570 if (!found)
01571 return DDS::RETCODE_PRECONDITION_NOT_MET;
01572 }
01573
01574 DDS::DataReader_var dr =
01575 bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
01576 DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
01577 DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
01578
01579 DDS::SampleInfoSeq info;
01580 DDS::TopicBuiltinTopicDataSeq data;
01581 DDS::ReturnCode_t ret =
01582 bit_topic_dr->read_instance(data,
01583 info,
01584 1,
01585 topic_handle,
01586 DDS::ANY_SAMPLE_STATE,
01587 DDS::ANY_VIEW_STATE,
01588 DDS::ANY_INSTANCE_STATE);
01589
01590 if (ret == DDS::RETCODE_OK) {
01591 if (info[0].valid_data)
01592 topic_data = data[0];
01593
01594 else
01595 return DDS::RETCODE_NO_DATA;
01596 }
01597
01598 return ret;
01599 }
01600
01601 #endif
01602
01603 DDS::ReturnCode_t
01604 DomainParticipantImpl::enable()
01605 {
01606
01607
01608
01609
01610
01611
01612 if (this->is_enabled()) {
01613 return DDS::RETCODE_OK;
01614 }
01615
01616 if (monitor_) {
01617 monitor_->report();
01618 }
01619
01620 if (TheServiceParticipant->monitor_) {
01621 TheServiceParticipant->monitor_->report();
01622 }
01623
01624 #if defined(OPENDDS_SECURITY)
01625 if (!security_config_ && TheServiceParticipant->get_security()) {
01626 security_config_ = TheSecurityRegistry->default_config();
01627 if (!security_config_) {
01628 security_config_ = TheSecurityRegistry->fix_empty_default();
01629 }
01630 }
01631 #endif
01632
01633 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01634
01635 if (disco.is_nil()) {
01636 ACE_ERROR((LM_ERROR,
01637 ACE_TEXT("(%P|%t) ERROR: ")
01638 ACE_TEXT("DomainParticipantImpl::enable, ")
01639 ACE_TEXT("no repository found for domain id: %d.\n"), domain_id_));
01640 return DDS::RETCODE_ERROR;
01641 }
01642
01643 #if defined(OPENDDS_SECURITY)
01644 if (TheServiceParticipant->get_security() && !security_config_) {
01645 ACE_ERROR((LM_ERROR,
01646 ACE_TEXT("(%P|%t) ERROR: ")
01647 ACE_TEXT("DomainParticipantImpl::enable, ")
01648 ACE_TEXT("DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
01649 return DDS::RETCODE_ERROR;
01650 }
01651 #endif
01652
01653 AddDomainStatus value = {GUID_UNKNOWN, false};
01654
01655 #if defined(OPENDDS_SECURITY)
01656 if (TheServiceParticipant->get_security()) {
01657 Security::Authentication_var auth = security_config_->get_authentication();
01658
01659 DDS::Security::SecurityException se;
01660 DDS::Security::ValidationResult_t val_res =
01661 auth->validate_local_identity(id_handle_, dp_id_, domain_id_, qos_, disco->generate_participant_guid(), se);
01662
01663
01664 if (val_res != DDS::Security::VALIDATION_OK) {
01665 ACE_ERROR((LM_ERROR,
01666 ACE_TEXT("(%P|%t) ERROR: ")
01667 ACE_TEXT("DomainParticipantImpl::enable, ")
01668 ACE_TEXT("Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
01669 se.code, se.minor_code, se.message.in()));
01670 return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01671 }
01672
01673 Security::AccessControl_var access = security_config_->get_access_control();
01674
01675 perm_handle_ = access->validate_local_permissions(auth, id_handle_, domain_id_, qos_, se);
01676
01677 if (perm_handle_ == DDS::HANDLE_NIL) {
01678 ACE_ERROR((LM_ERROR,
01679 ACE_TEXT("(%P|%t) ERROR: ")
01680 ACE_TEXT("DomainParticipantImpl::enable, ")
01681 ACE_TEXT("Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
01682 se.code, se.minor_code, se.message.in()));
01683 return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01684 }
01685
01686 bool check_create = access->check_create_participant(perm_handle_, domain_id_, qos_, se);
01687 if (!check_create) {
01688 ACE_ERROR((LM_ERROR,
01689 ACE_TEXT("(%P|%t) ERROR: ")
01690 ACE_TEXT("DomainParticipantImpl::enable, ")
01691 ACE_TEXT("Unable to create participant. SecurityException[%d.%d]: %C\n"),
01692 se.code, se.minor_code, se.message.in()));
01693 return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01694 }
01695
01696 DDS::Security::ParticipantSecurityAttributes part_sec_attr;
01697 bool check_part_sec_attr = access->get_participant_sec_attributes(perm_handle_, part_sec_attr, se);
01698
01699 if (!check_part_sec_attr) {
01700 ACE_ERROR((LM_ERROR,
01701 ACE_TEXT("(%P|%t) ERROR: ")
01702 ACE_TEXT("DomainParticipantImpl::enable, ")
01703 ACE_TEXT("Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
01704 se.code, se.minor_code, se.message.in()));
01705 return DDS::RETCODE_ERROR;
01706 }
01707
01708 Security::CryptoKeyFactory_var crypto = security_config_->get_crypto_key_factory();
01709
01710 part_crypto_handle_ = crypto->register_local_participant(id_handle_, perm_handle_,
01711 Util::filter_properties(qos_.property.value, "dds.sec.crypto."), part_sec_attr, se);
01712 if (part_crypto_handle_ == DDS::HANDLE_NIL) {
01713 ACE_ERROR((LM_ERROR,
01714 ACE_TEXT("(%P|%t) ERROR: ")
01715 ACE_TEXT("DomainParticipantImpl::enable, ")
01716 ACE_TEXT("Unable to register local participant. SecurityException[%d.%d]: %C\n"),
01717 se.code, se.minor_code, se.message.in()));
01718 return DDS::RETCODE_ERROR;
01719 }
01720
01721 value = disco->add_domain_participant_secure(domain_id_, qos_, dp_id_, id_handle_, perm_handle_, part_crypto_handle_);
01722
01723 if (value.id == GUID_UNKNOWN) {
01724 ACE_ERROR((LM_ERROR,
01725 ACE_TEXT("(%P|%t) ERROR: ")
01726 ACE_TEXT("DomainParticipantImpl::enable, ")
01727 ACE_TEXT("add_domain_participant_secure returned invalid id.\n")));
01728 return DDS::RETCODE_ERROR;
01729 }
01730
01731 } else {
01732 #endif
01733
01734 value = disco->add_domain_participant(domain_id_, qos_);
01735
01736 if (value.id == GUID_UNKNOWN) {
01737 ACE_ERROR((LM_ERROR,
01738 ACE_TEXT("(%P|%t) ERROR: ")
01739 ACE_TEXT("DomainParticipantImpl::enable, ")
01740 ACE_TEXT("add_domain_participant returned invalid id.\n")));
01741 return DDS::RETCODE_ERROR;
01742 }
01743
01744 #if defined(OPENDDS_SECURITY)
01745 }
01746 #endif
01747
01748 dp_id_ = value.id;
01749 federated_ = value.federated;
01750
01751 const DDS::ReturnCode_t ret = this->set_enabled();
01752
01753 if (DCPS_debug_level > 1) {
01754 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DomainParticipantImpl::enable: ")
01755 ACE_TEXT("enabled participant %C in domain %d\n"),
01756 OPENDDS_STRING(GuidConverter(dp_id_)).c_str(), domain_id_));
01757 }
01758
01759 if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
01760 Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
01761 this->bit_subscriber_ = disc->init_bit(this);
01762 }
01763
01764 if (ret != DDS::RETCODE_OK) {
01765 return ret;
01766 }
01767
01768 if (qos_.entity_factory.autoenable_created_entities) {
01769
01770 for (TopicMap::iterator it = topics_.begin(); it != topics_.end(); ++it) {
01771 it->second.pair_.svt_->enable();
01772 }
01773
01774 for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
01775 it->svt_->enable();
01776 }
01777
01778 for (SubscriberSet::iterator it = subscribers_.begin(); it != subscribers_.end(); ++it) {
01779 it->svt_->enable();
01780 }
01781 }
01782
01783 return DDS::RETCODE_OK;
01784 }
01785
01786 RepoId
01787 DomainParticipantImpl::get_id()
01788 {
01789 return dp_id_;
01790 }
01791
01792 OPENDDS_STRING
01793 DomainParticipantImpl::get_unique_id()
01794 {
01795 return GuidConverter(dp_id_).uniqueId();
01796 }
01797
01798
01799 DDS::InstanceHandle_t
01800 DomainParticipantImpl::get_instance_handle()
01801 {
01802 return this->id_to_handle(this->dp_id_);
01803 }
01804
01805 DDS::InstanceHandle_t
01806 DomainParticipantImpl::id_to_handle(const RepoId& id)
01807 {
01808 if (id == GUID_UNKNOWN) {
01809 return this->participant_handles_.next();
01810 }
01811
01812 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01813 guard,
01814 this->handle_protector_,
01815 HANDLE_UNKNOWN);
01816
01817 HandleMap::const_iterator location = this->handles_.find(id);
01818 DDS::InstanceHandle_t result;
01819
01820 if (location == this->handles_.end()) {
01821
01822 result = this->participant_handles_.next();
01823 this->handles_[id] = result;
01824 this->repoIds_[result] = id;
01825 } else {
01826 result = location->second;
01827 }
01828
01829 return result;
01830 }
01831
01832 RepoId
01833 DomainParticipantImpl::get_repoid(const DDS::InstanceHandle_t& handle)
01834 {
01835 RepoId result = GUID_UNKNOWN;
01836 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01837 guard,
01838 this->handle_protector_,
01839 GUID_UNKNOWN);
01840 RepoIdMap::const_iterator location = this->repoIds_.find(handle);
01841 if (location != this->repoIds_.end()) {
01842 result = location->second;
01843 }
01844 return result;
01845 }
01846
01847 #if defined(OPENDDS_SECURITY)
01848 namespace {
01849
01850 bool
01851 is_bit(const char* topic_name) {
01852 return strcmp(topic_name, BUILT_IN_PARTICIPANT_TOPIC) == 0
01853 || strcmp(topic_name, BUILT_IN_TOPIC_TOPIC) == 0
01854 || strcmp(topic_name, BUILT_IN_PUBLICATION_TOPIC) == 0
01855 || strcmp(topic_name, BUILT_IN_SUBSCRIPTION_TOPIC) == 0;
01856 }
01857
01858 }
01859 #endif
01860
01861 DDS::Topic_ptr
01862 DomainParticipantImpl::create_new_topic(
01863 const RepoId topic_id,
01864 const char * topic_name,
01865 const char * type_name,
01866 const DDS::TopicQos & qos,
01867 DDS::TopicListener_ptr a_listener,
01868 const DDS::StatusMask & mask,
01869 OpenDDS::DCPS::TypeSupport_ptr type_support)
01870 {
01871 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01872 tao_mon,
01873 this->topics_protector_,
01874 DDS::Topic::_nil());
01875
01876 #if defined(OPENDDS_SECURITY)
01877 if (TheServiceParticipant->get_security() && !is_bit(topic_name)) {
01878 Security::AccessControl_var access = security_config_->get_access_control();
01879
01880 DDS::Security::SecurityException se;
01881
01882 DDS::Security::TopicSecurityAttributes sec_attr;
01883 if (!access->get_topic_sec_attributes(perm_handle_, topic_name, sec_attr, se)) {
01884 ACE_ERROR((LM_WARNING,
01885 ACE_TEXT("(%P|%t) WARNING: ")
01886 ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
01887 ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
01888 topic_name, se.code, se.minor_code, se.message.in()));
01889 return DDS::Topic::_nil();
01890 }
01891
01892 if ((sec_attr.is_write_protected || sec_attr.is_read_protected) &&
01893 !access->check_create_topic(perm_handle_, domain_id_, topic_name, qos, se)) {
01894 ACE_ERROR((LM_WARNING,
01895 ACE_TEXT("(%P|%t) WARNING: ")
01896 ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
01897 ACE_TEXT("Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
01898 topic_name, se.code, se.minor_code, se.message.in()));
01899 return DDS::Topic::_nil();
01900 }
01901 }
01902 #endif
01903
01904 TopicImpl* topic_servant = 0;
01905
01906 ACE_NEW_RETURN(topic_servant,
01907 TopicImpl(topic_id,
01908 topic_name,
01909 type_name,
01910 type_support,
01911 qos,
01912 a_listener,
01913 mask,
01914 this),
01915 DDS::Topic::_nil());
01916
01917 if ((enabled_ == true)
01918 && (qos_.entity_factory.autoenable_created_entities)) {
01919 topic_servant->enable();
01920 }
01921
01922 DDS::Topic_ptr obj(topic_servant);
01923
01924
01925 RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, NO_DUP));
01926
01927 if (OpenDDS::DCPS::bind(topics_, topic_name, refCounted_topic) == -1) {
01928 ACE_ERROR((LM_ERROR,
01929 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_new_topic, ")
01930 ACE_TEXT("%p \n"),
01931 ACE_TEXT("bind")));
01932 return DDS::Topic::_nil();
01933 }
01934
01935 if (this->monitor_) {
01936 this->monitor_->report();
01937 }
01938
01939
01940
01941 return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
01942 }
01943
01944 bool
01945 DomainParticipantImpl::is_clean() const
01946 {
01947 bool sub_is_clean = subscribers_.empty();
01948 bool topics_is_clean = topics_.size() == 0;
01949
01950 if (!TheTransientKludge->is_enabled()) {
01951
01952
01953
01954 sub_is_clean = !sub_is_clean ? subscribers_.size() == 1 : true;
01955 topics_is_clean = !topics_is_clean ? topics_.size() == 4 : true;
01956 }
01957 return (publishers_.empty()
01958 && sub_is_clean
01959 && topics_is_clean);
01960 }
01961
01962 DDS::DomainParticipantListener_ptr
01963 DomainParticipantImpl::listener_for(DDS::StatusKind kind)
01964 {
01965 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01966 return DDS::DomainParticipantListener::_nil ();
01967 } else {
01968 return DDS::DomainParticipantListener::_duplicate(listener_.in());
01969 }
01970 }
01971
01972 void
01973 DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
01974 {
01975 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01976 guard,
01977 this->topics_protector_);
01978
01979 topics.reserve(topics_.size());
01980 for (TopicMap::iterator it(topics_.begin());
01981 it != topics_.end(); ++it) {
01982 topics.push_back(it->second.pair_.svt_->get_id());
01983 }
01984 }
01985
01986 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01987
01988 OwnershipManager*
01989 DomainParticipantImpl::ownership_manager()
01990 {
01991 #if !defined (DDS_HAS_MINIMUM_BIT)
01992
01993 DDS::DataReader_var dr =
01994 bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
01995 DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
01996 DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
01997
01998 if (!CORBA::is_nil(bit_pub_dr.in())) {
01999 DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
02000 if (CORBA::is_nil(listener.in())) {
02001 DDS::DataReaderListener_var bit_pub_listener =
02002 new BitPubListenerImpl(this);
02003 bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
02004
02005 bit_pub_listener->on_data_available(bit_pub_dr.in());
02006 }
02007 }
02008
02009 #endif
02010 return &this->owner_man_;
02011 }
02012
02013 void
02014 DomainParticipantImpl::update_ownership_strength (const PublicationId& pub_id,
02015 const CORBA::Long& ownership_strength)
02016 {
02017 ACE_GUARD(ACE_Recursive_Thread_Mutex,
02018 tao_mon,
02019 this->subscribers_protector_);
02020
02021 if (this->get_deleted ())
02022 return;
02023
02024 for (SubscriberSet::iterator it(this->subscribers_.begin());
02025 it != this->subscribers_.end(); ++it) {
02026 it->svt_->update_ownership_strength(pub_id, ownership_strength);
02027 }
02028 }
02029
02030 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02031
02032 DomainParticipantImpl::RepoIdSequence::RepoIdSequence(const RepoId& base) :
02033 base_(base),
02034 serial_(0),
02035 builder_(base_)
02036 {
02037 }
02038
02039 RepoId
02040 DomainParticipantImpl::RepoIdSequence::next()
02041 {
02042 builder_.entityKey(++serial_);
02043 return builder_;
02044 }
02045
02046
02047
02048
02049
02050 bool
02051 DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
02052 {
02053 if (pub_qos == PUBLISHER_QOS_DEFAULT) {
02054 this->get_default_publisher_qos(pub_qos);
02055 }
02056
02057 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
02058
02059 if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
02060 ACE_ERROR((LM_ERROR,
02061 ACE_TEXT("(%P|%t) ERROR: ")
02062 ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
02063 ACE_TEXT("invalid qos.\n")));
02064 return false;
02065 }
02066
02067 return true;
02068 }
02069
02070 bool
02071 DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
02072 {
02073 if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
02074 this->get_default_subscriber_qos(subscriber_qos);
02075 }
02076
02077 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
02078
02079 if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
02080 ACE_ERROR((LM_ERROR,
02081 ACE_TEXT("(%P|%t) ERROR: ")
02082 ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
02083 ACE_TEXT("invalid qos.\n")));
02084 return false;
02085 }
02086
02087
02088 return true;
02089 }
02090
02091 Recorder_ptr
02092 DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
02093 const DDS::SubscriberQos& subscriber_qos,
02094 const DDS::DataReaderQos& datareader_qos,
02095 const RecorderListener_rch& a_listener,
02096 DDS::StatusMask mask)
02097 {
02098 if (CORBA::is_nil(a_topic)) {
02099 ACE_ERROR((LM_ERROR,
02100 ACE_TEXT("(%P|%t) ERROR: ")
02101 ACE_TEXT("SubscriberImpl::create_datareader, ")
02102 ACE_TEXT("topic desc is nil.\n")));
02103 return 0;
02104 }
02105
02106 DDS::SubscriberQos sub_qos = subscriber_qos;
02107 DDS::DataReaderQos dr_qos;
02108
02109 if (! this->validate_subscriber_qos(sub_qos) ||
02110 ! SubscriberImpl::validate_datareader_qos(datareader_qos,
02111 TheServiceParticipant->initial_DataReaderQos(),
02112 a_topic,
02113 dr_qos, false) ) {
02114 return 0;
02115 }
02116
02117 RecorderImpl* recorder(new RecorderImpl);
02118 Recorder_var result(recorder);
02119
02120 recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
02121 dr_qos, a_listener,
02122 mask, this, subscriber_qos);
02123
02124 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
02125 recorder->enable();
02126 }
02127
02128 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
02129 recorders_.insert(result);
02130
02131 return result._retn();
02132 }
02133
02134 Replayer_ptr
02135 DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
02136 const DDS::PublisherQos& publisher_qos,
02137 const DDS::DataWriterQos& datawriter_qos,
02138 const ReplayerListener_rch& a_listener,
02139 DDS::StatusMask mask)
02140 {
02141 if (CORBA::is_nil(a_topic)) {
02142 ACE_ERROR((LM_ERROR,
02143 ACE_TEXT("(%P|%t) ERROR: ")
02144 ACE_TEXT("SubscriberImpl::create_datareader, ")
02145 ACE_TEXT("topic desc is nil.\n")));
02146 return 0;
02147 }
02148
02149 DDS::PublisherQos pub_qos = publisher_qos;
02150 DDS::DataWriterQos dw_qos;
02151
02152 if (! this->validate_publisher_qos(pub_qos) ||
02153 ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
02154 TheServiceParticipant->initial_DataWriterQos(),
02155 a_topic,
02156 dw_qos)) {
02157 return 0;
02158 }
02159
02160 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
02161
02162 ReplayerImpl* replayer(new ReplayerImpl);
02163 Replayer_var result(replayer);
02164
02165 replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
02166
02167 if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
02168 const DDS::ReturnCode_t ret = replayer->enable();
02169
02170 if (ret != DDS::RETCODE_OK) {
02171 ACE_ERROR((LM_ERROR,
02172 ACE_TEXT("(%P|%t) ERROR: ")
02173 ACE_TEXT("DomainParticipantImpl::create_replayer, ")
02174 ACE_TEXT("enable failed.\n")));
02175 return 0;
02176 }
02177 }
02178
02179 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
02180 replayers_.insert(result);
02181 return result._retn();
02182 }
02183
02184 void
02185 DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
02186 {
02187 const Recorder_var recvar(Recorder::_duplicate(recorder));
02188 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
02189 recorders_.erase(recvar);
02190 }
02191
02192 void
02193 DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
02194 {
02195 const Replayer_var repvar(Replayer::_duplicate(replayer));
02196 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
02197 replayers_.erase(repvar);
02198 }
02199
02200 void
02201 DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
02202 {
02203 automatic_liveliness_timer_.add_adjust(writer);
02204 participant_liveliness_timer_.add_adjust(writer);
02205 }
02206
02207 void
02208 DomainParticipantImpl::remove_adjust_liveliness_timers()
02209 {
02210 automatic_liveliness_timer_.remove_adjust();
02211 participant_liveliness_timer_.remove_adjust();
02212 }
02213
02214 DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
02215 DDS::LivelinessQosPolicyKind kind)
02216 : impl_(impl)
02217 , kind_ (kind)
02218 , interval_ (ACE_Time_Value::max_time)
02219 , recalculate_interval_ (false)
02220 , scheduled_ (false)
02221 { }
02222
02223 DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
02224 {
02225 if (scheduled_) {
02226 TheServiceParticipant->timer()->cancel_timer(this);
02227 }
02228 }
02229
02230 void
02231 DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
02232 {
02233 ACE_GUARD(ACE_Thread_Mutex,
02234 guard,
02235 this->lock_);
02236
02237 const ACE_Time_Value now = ACE_OS::gettimeofday();
02238
02239
02240 const ACE_Time_Value remaining = interval_ - (now - last_liveliness_check_);
02241
02242
02243 const ACE_Time_Value i = writer->liveliness_check_interval(kind_);
02244 if (i < interval_) {
02245 interval_ = i;
02246 }
02247
02248
02249 if (scheduled_ && interval_ < remaining) {
02250 TheServiceParticipant->timer()->cancel_timer(this);
02251 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02252 } else if (!scheduled_) {
02253 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02254 scheduled_ = true;
02255 last_liveliness_check_ = now;
02256 }
02257 }
02258
02259 void
02260 DomainParticipantImpl::LivelinessTimer::remove_adjust()
02261 {
02262 ACE_GUARD(ACE_Thread_Mutex,
02263 guard,
02264 this->lock_);
02265
02266 recalculate_interval_ = true;
02267 }
02268
02269 int
02270 DomainParticipantImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value & tv, const void* )
02271 {
02272 ACE_GUARD_RETURN(ACE_Thread_Mutex,
02273 guard,
02274 this->lock_,
02275 0);
02276
02277 scheduled_ = false;
02278
02279 if (recalculate_interval_) {
02280 interval_ = impl_.liveliness_check_interval(kind_);
02281 recalculate_interval_ = false;
02282 }
02283
02284 if (interval_ != ACE_Time_Value::max_time) {
02285 dispatch(tv);
02286 last_liveliness_check_ = tv;
02287 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02288 scheduled_ = true;
02289 }
02290
02291 return 0;
02292 }
02293
02294 DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
02295 : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
02296 { }
02297
02298 void
02299 DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const ACE_Time_Value& )
02300 {
02301 impl_.signal_liveliness (kind_);
02302 }
02303
02304 DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
02305 : LivelinessTimer (impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
02306 { }
02307
02308 void
02309 DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const ACE_Time_Value& tv)
02310 {
02311 if (impl_.participant_liveliness_activity_after (tv - interval())) {
02312 impl_.signal_liveliness (kind_);
02313 }
02314 }
02315
02316 ACE_Time_Value
02317 DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
02318 {
02319 ACE_Time_Value tv = ACE_Time_Value::max_time;
02320
02321 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02322 tao_mon,
02323 this->publishers_protector_,
02324 tv);
02325
02326 for (PublisherSet::iterator it(publishers_.begin());
02327 it != publishers_.end(); ++it) {
02328 tv = std::min (tv, it->svt_->liveliness_check_interval(kind));
02329 }
02330
02331 return tv;
02332 }
02333
02334 bool
02335 DomainParticipantImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
02336 {
02337 if (last_liveliness_activity_ > tv) {
02338 return true;
02339 }
02340
02341 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02342 tao_mon,
02343 this->publishers_protector_,
02344 tv);
02345
02346 for (PublisherSet::iterator it(publishers_.begin());
02347 it != publishers_.end(); ++it) {
02348 if (it->svt_->participant_liveliness_activity_after(tv)) {
02349 return true;
02350 }
02351 }
02352
02353 return false;
02354 }
02355
02356 void
02357 DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
02358 {
02359 TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
02360 }
02361
02362 #if defined(OPENDDS_SECURITY)
02363 void
02364 DomainParticipantImpl::set_security_config(const Security::SecurityConfig_rch& cfg)
02365 {
02366 security_config_ = cfg;
02367 }
02368 #endif
02369
02370 int
02371 DomainParticipantImpl::handle_exception(ACE_HANDLE )
02372 {
02373 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
02374
02375
02376 {
02377 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02378 tao_mon,
02379 this->publishers_protector_,
02380 DDS::RETCODE_ERROR);
02381
02382 PublisherSet::iterator pubIter = publishers_.begin();
02383 DDS::Publisher_ptr pubPtr;
02384 size_t pubsize = publishers_.size();
02385
02386 while (pubsize > 0) {
02387 pubPtr = (*pubIter).obj_.in();
02388 ++pubIter;
02389
02390 DDS::ReturnCode_t result
02391 = pubPtr->delete_contained_entities();
02392
02393 if (result != DDS::RETCODE_OK) {
02394 ret = result;
02395 }
02396
02397 result = delete_publisher(pubPtr);
02398
02399 if (result != DDS::RETCODE_OK) {
02400 ret = result;
02401 }
02402
02403 --pubsize;
02404 }
02405
02406 }
02407
02408
02409 {
02410 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02411 tao_mon,
02412 this->subscribers_protector_,
02413 DDS::RETCODE_ERROR);
02414
02415 SubscriberSet::iterator subIter = subscribers_.begin();
02416 DDS::Subscriber_ptr subPtr;
02417 size_t subsize = subscribers_.size();
02418
02419 while (subsize > 0) {
02420 subPtr = (*subIter).obj_.in();
02421 ++subIter;
02422
02423 DDS::ReturnCode_t result = subPtr->delete_contained_entities();
02424
02425 if (result != DDS::RETCODE_OK) {
02426 ret = result;
02427 }
02428
02429 result = delete_subscriber(subPtr);
02430
02431 if (result != DDS::RETCODE_OK) {
02432 ret = result;
02433 }
02434
02435 --subsize;
02436 }
02437 }
02438
02439
02440 {
02441 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02442 tao_mon,
02443 this->topics_protector_,
02444 DDS::RETCODE_ERROR);
02445
02446 TopicMap::iterator topicIter = topics_.begin();
02447 DDS::Topic_ptr topicPtr;
02448 size_t topicsize = topics_.size();
02449
02450 while (topicsize > 0) {
02451 topicPtr = topicIter->second.pair_.obj_.in();
02452 ++topicIter;
02453
02454
02455 const DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
02456
02457 if (result != DDS::RETCODE_OK) {
02458 ret = result;
02459 }
02460 --topicsize;
02461 }
02462 }
02463
02464 {
02465 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02466 tao_mon,
02467 this->recorders_protector_,
02468 DDS::RETCODE_ERROR);
02469
02470 RecorderSet::iterator it = recorders_.begin();
02471 for (; it != recorders_.end(); ++it ){
02472 RecorderImpl* impl = dynamic_cast<RecorderImpl* >(it->in());
02473 DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02474 if (impl) result = impl->cleanup();
02475 if (result != DDS::RETCODE_OK) ret = result;
02476 }
02477 recorders_.clear();
02478 }
02479
02480 {
02481 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02482 tao_mon,
02483 this->replayers_protector_,
02484 DDS::RETCODE_ERROR);
02485
02486 ReplayerSet::iterator it = replayers_.begin();
02487 for (; it != replayers_.end(); ++it ){
02488 ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
02489 DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02490 if (impl) result = impl->cleanup();
02491 if (result != DDS::RETCODE_OK) ret = result;
02492
02493 }
02494
02495 replayers_.clear();
02496 }
02497
02498 shutdown_mutex_.acquire();
02499 shutdown_result_ = ret;
02500 shutdown_complete_ = true;
02501 shutdown_condition_.signal();
02502 shutdown_mutex_.release();
02503
02504 return 0;
02505 }
02506
02507 }
02508 }
02509
02510 OPENDDS_END_VERSIONED_NAMESPACE_DECL