00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DomainParticipantImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "Service_Participant.h"
00012 #include "Qos_Helper.h"
00013 #include "GuidConverter.h"
00014 #include "PublisherImpl.h"
00015 #include "SubscriberImpl.h"
00016 #include "DataWriterImpl.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "Registered_Data_Types.h"
00019 #include "Transient_Kludge.h"
00020 #include "DomainParticipantFactoryImpl.h"
00021 #include "Util.h"
00022 #include "MonitorFactory.h"
00023 #include "dds/DdsDcpsGuidC.h"
00024 #include "BitPubListenerImpl.h"
00025 #include "ContentFilteredTopicImpl.h"
00026 #include "MultiTopicImpl.h"
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029
00030 #include "RecorderImpl.h"
00031 #include "ReplayerImpl.h"
00032
00033 #if !defined (DDS_HAS_MINIMUM_BIT)
00034 #include "BuiltInTopicUtils.h"
00035 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00036 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00037
00038 #include "tao/debug.h"
00039 #include "ace/Reactor.h"
00040
00041 namespace Util {
00042
00043 template <typename Key>
00044 int find(
00045 OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
00046 const Key& key,
00047 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
00048 {
00049 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
00050 c.find(key);
00051
00052 if (iter == c.end()) {
00053 return -1;
00054 }
00055
00056 value = &iter->second;
00057 return 0;
00058 }
00059
00060 }
00061
00062 namespace OpenDDS {
00063 namespace DCPS {
00064
00065
00066
00067
00068
00069
00070 DomainParticipantImpl::DomainParticipantImpl(DomainParticipantFactoryImpl * factory,
00071 const DDS::DomainId_t& domain_id,
00072 const RepoId& dp_id,
00073 const DDS::DomainParticipantQos & qos,
00074 DDS::DomainParticipantListener_ptr a_listener,
00075 const DDS::StatusMask & mask,
00076 bool federated)
00077 : factory_(factory),
00078 default_topic_qos_(TheServiceParticipant->initial_TopicQos()),
00079 default_publisher_qos_(TheServiceParticipant->initial_PublisherQos()),
00080 default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos()),
00081 qos_(qos),
00082 domain_id_(domain_id),
00083 dp_id_(dp_id),
00084 federated_(federated),
00085 shutdown_condition_(shutdown_mutex_),
00086 shutdown_complete_(false),
00087 monitor_(0),
00088 pub_id_gen_(dp_id_),
00089 automatic_liveliness_timer_ (*this),
00090 participant_liveliness_timer_ (*this)
00091 {
00092 (void) this->set_listener(a_listener, mask);
00093 monitor_ = TheServiceParticipant->monitor_factory_->create_dp_monitor(this);
00094 }
00095
00096
00097 DomainParticipantImpl::~DomainParticipantImpl()
00098 {
00099 }
00100
00101 DDS::Publisher_ptr
00102 DomainParticipantImpl::create_publisher(
00103 const DDS::PublisherQos & qos,
00104 DDS::PublisherListener_ptr a_listener,
00105 DDS::StatusMask mask)
00106 {
00107 ACE_UNUSED_ARG(mask);
00108
00109 DDS::PublisherQos pub_qos = qos;
00110
00111 if (! this->validate_publisher_qos(pub_qos))
00112 return DDS::Publisher::_nil();
00113
00114 PublisherImpl* pub = 0;
00115 ACE_NEW_RETURN(pub,
00116 PublisherImpl(participant_handles_.next(),
00117 pub_id_gen_.next(),
00118 pub_qos,
00119 a_listener,
00120 mask,
00121 this),
00122 DDS::Publisher::_nil());
00123
00124 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
00125 pub->enable();
00126 }
00127
00128 DDS::Publisher_ptr pub_obj(pub);
00129
00130
00131 Publisher_Pair pair(pub, pub_obj, NO_DUP);
00132
00133 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00134 tao_mon,
00135 this->publishers_protector_,
00136 DDS::Publisher::_nil());
00137
00138 if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
00139 ACE_ERROR((LM_ERROR,
00140 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
00141 ACE_TEXT("%p\n"),
00142 ACE_TEXT("insert")));
00143 return DDS::Publisher::_nil();
00144 }
00145
00146 return DDS::Publisher::_duplicate(pub_obj);
00147 }
00148
00149 DDS::ReturnCode_t
00150 DomainParticipantImpl::delete_publisher(
00151 DDS::Publisher_ptr p)
00152 {
00153
00154
00155
00156 PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
00157
00158 if (the_servant->is_clean() == 0) {
00159 ACE_ERROR((LM_ERROR,
00160 ACE_TEXT("(%P|%t) ERROR: ")
00161 ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00162 ACE_TEXT("The publisher is not empty.\n")));
00163 return DDS::RETCODE_PRECONDITION_NOT_MET;
00164 }
00165
00166 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00167 tao_mon,
00168 this->publishers_protector_,
00169 DDS::RETCODE_ERROR);
00170
00171 Publisher_Pair pair(the_servant, p, DUP);
00172
00173 if (OpenDDS::DCPS::remove(publishers_, pair) == -1) {
00174 ACE_ERROR((LM_ERROR,
00175 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_publisher, ")
00176 ACE_TEXT("%p\n"),
00177 ACE_TEXT("remove")));
00178 return DDS::RETCODE_ERROR;
00179
00180 } else {
00181 return DDS::RETCODE_OK;
00182 }
00183 }
00184
00185 DDS::Subscriber_ptr
00186 DomainParticipantImpl::create_subscriber(
00187 const DDS::SubscriberQos & qos,
00188 DDS::SubscriberListener_ptr a_listener,
00189 DDS::StatusMask mask)
00190 {
00191 DDS::SubscriberQos sub_qos = qos;
00192
00193 if (! this->validate_subscriber_qos(sub_qos)) {
00194 return DDS::Subscriber::_nil();
00195 }
00196
00197 SubscriberImpl* sub = 0 ;
00198 ACE_NEW_RETURN(sub,
00199 SubscriberImpl(participant_handles_.next(),
00200 sub_qos,
00201 a_listener,
00202 mask,
00203 this),
00204 DDS::Subscriber::_nil());
00205
00206 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
00207 sub->enable();
00208 }
00209
00210 DDS::Subscriber_ptr sub_obj(sub);
00211
00212 Subscriber_Pair pair(sub, sub_obj, NO_DUP);
00213
00214 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00215 tao_mon,
00216 this->subscribers_protector_,
00217 DDS::Subscriber::_nil());
00218
00219 if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
00220 ACE_ERROR((LM_ERROR,
00221 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
00222 ACE_TEXT("%p\n"),
00223 ACE_TEXT("insert")));
00224 return DDS::Subscriber::_nil();
00225 }
00226
00227 return DDS::Subscriber::_duplicate(sub_obj);
00228 }
00229
00230 DDS::ReturnCode_t
00231 DomainParticipantImpl::delete_subscriber(
00232 DDS::Subscriber_ptr s)
00233 {
00234
00235
00236
00237 SubscriberImpl* the_servant = dynamic_cast<SubscriberImpl*>(s);
00238
00239 if (the_servant->is_clean() == 0) {
00240 ACE_ERROR((LM_ERROR,
00241 ACE_TEXT("(%P|%t) ERROR: ")
00242 ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00243 ACE_TEXT("The subscriber is not empty.\n")));
00244 return DDS::RETCODE_PRECONDITION_NOT_MET;
00245 }
00246
00247 DDS::ReturnCode_t ret
00248 = the_servant->delete_contained_entities();
00249
00250 if (ret != DDS::RETCODE_OK) {
00251 ACE_ERROR((LM_ERROR,
00252 ACE_TEXT("(%P|%t) ERROR: ")
00253 ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00254 ACE_TEXT("Failed to delete contained entities.\n")));
00255 return DDS::RETCODE_ERROR;
00256 }
00257
00258 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00259 tao_mon,
00260 this->subscribers_protector_,
00261 DDS::RETCODE_ERROR);
00262
00263 Subscriber_Pair pair(the_servant, s, DUP);
00264
00265 if (OpenDDS::DCPS::remove(subscribers_, pair) == -1) {
00266 ACE_ERROR((LM_ERROR,
00267 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_subscriber, ")
00268 ACE_TEXT("%p\n"),
00269 ACE_TEXT("remove")));
00270 return DDS::RETCODE_ERROR;
00271
00272 } else {
00273 return DDS::RETCODE_OK;
00274 }
00275 }
00276
00277 DDS::Subscriber_ptr
00278 DomainParticipantImpl::get_builtin_subscriber()
00279 {
00280 return DDS::Subscriber::_duplicate(bit_subscriber_.in());
00281 }
00282
00283 DDS::Topic_ptr
00284 DomainParticipantImpl::create_topic(
00285 const char * topic_name,
00286 const char * type_name,
00287 const DDS::TopicQos & qos,
00288 DDS::TopicListener_ptr a_listener,
00289 DDS::StatusMask mask)
00290 {
00291 return create_topic_i(topic_name,
00292 type_name,
00293 qos,
00294 a_listener,
00295 mask,
00296 0);
00297 }
00298
00299 DDS::Topic_ptr
00300 DomainParticipantImpl::create_typeless_topic(
00301 const char * topic_name,
00302 const char * type_name,
00303 bool type_has_keys,
00304 const DDS::TopicQos & qos,
00305 DDS::TopicListener_ptr a_listener,
00306 DDS::StatusMask mask)
00307 {
00308 int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
00309
00310 return create_topic_i(topic_name,
00311 type_name,
00312 qos,
00313 a_listener,
00314 mask,
00315 topic_mask);
00316 }
00317
00318
00319 DDS::Topic_ptr
00320 DomainParticipantImpl::create_topic_i(
00321 const char * topic_name,
00322 const char * type_name,
00323 const DDS::TopicQos & qos,
00324 DDS::TopicListener_ptr a_listener,
00325 DDS::StatusMask mask,
00326 int topic_mask)
00327 {
00328 DDS::TopicQos topic_qos;
00329
00330 if (qos == TOPIC_QOS_DEFAULT) {
00331 this->get_default_topic_qos(topic_qos);
00332
00333 } else {
00334 topic_qos = qos;
00335 }
00336
00337 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00338 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00339 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00340 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00341
00342 if (!Qos_Helper::valid(topic_qos)) {
00343 ACE_ERROR((LM_ERROR,
00344 ACE_TEXT("(%P|%t) ERROR: ")
00345 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00346 ACE_TEXT("invalid qos.\n")));
00347 return DDS::Topic::_nil();
00348 }
00349
00350 if (!Qos_Helper::consistent(topic_qos)) {
00351 ACE_ERROR((LM_ERROR,
00352 ACE_TEXT("(%P|%t) ERROR: ")
00353 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00354 ACE_TEXT("inconsistent qos.\n")));
00355 return DDS::Topic::_nil();
00356 }
00357
00358 TopicMap::mapped_type* entry = 0;
00359 bool found = false;
00360 {
00361 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00362 tao_mon,
00363 this->topics_protector_,
00364 DDS::Topic::_nil());
00365
00366 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00367 if (topic_descrs_.count(topic_name)) {
00368 if (DCPS_debug_level > 3) {
00369 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00370 ACE_TEXT("DomainParticipantImpl::create_topic, ")
00371 ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
00372 ACE_TEXT("by a TopicDescription.\n"), topic_name));
00373 }
00374 return 0;
00375 }
00376 #endif
00377
00378 if (Util::find(topics_, topic_name, entry) == 0) {
00379 found = true;
00380 }
00381 }
00382
00383 if (found) {
00384 CORBA::String_var found_type
00385 = entry->pair_.svt_->get_type_name();
00386
00387 if (ACE_OS::strcmp(type_name, found_type) == 0) {
00388 DDS::TopicQos found_qos;
00389 entry->pair_.svt_->get_qos(found_qos);
00390
00391 if (topic_qos == found_qos) {
00392 {
00393 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00394 tao_mon,
00395 this->topics_protector_,
00396 DDS::Topic::_nil());
00397 entry->client_refs_ ++;
00398 }
00399 return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00400
00401 } else {
00402 if (DCPS_debug_level >= 1) {
00403 ACE_DEBUG((LM_DEBUG,
00404 ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00405 ACE_TEXT("qos not match: topic_name=%C type_name=%C\n"),
00406 topic_name, type_name));
00407 }
00408
00409 return DDS::Topic::_nil();
00410 }
00411
00412 } else {
00413 if (DCPS_debug_level >= 1) {
00414 ACE_DEBUG((LM_DEBUG,
00415 ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00416 ACE_TEXT(" not match: topic_name=%C type_name=%C\n"),
00417 topic_name, type_name));
00418 }
00419
00420 return DDS::Topic::_nil();
00421 }
00422
00423 } else {
00424
00425 OpenDDS::DCPS::TypeSupport_var type_support;
00426 bool has_keys = (topic_mask & TOPIC_TYPE_HAS_KEYS);
00427
00428 if (0 == topic_mask) {
00429
00430 type_support = Registered_Data_Types->lookup(this, type_name);
00431 has_keys = type_support->has_dcps_key();
00432 }
00433 RepoId topic_id;
00434
00435 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00436 TopicStatus status = disco->assert_topic(topic_id,
00437 domain_id_,
00438 dp_id_,
00439 topic_name,
00440 type_name,
00441 topic_qos,
00442 has_keys);
00443
00444 if (status == CREATED || status == FOUND) {
00445 DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00446 topic_name,
00447 type_name,
00448 topic_qos,
00449 a_listener,
00450 mask,
00451 type_support);
00452 if (this->monitor_) {
00453 this->monitor_->report();
00454 }
00455 return new_topic;
00456
00457 } else {
00458 ACE_ERROR((LM_ERROR,
00459 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
00460 ACE_TEXT("assert_topic failed.\n")));
00461 return DDS::Topic::_nil();
00462 }
00463 }
00464 }
00465
00466 DDS::ReturnCode_t
00467 DomainParticipantImpl::delete_topic(
00468 DDS::Topic_ptr a_topic)
00469 {
00470 return delete_topic_i(a_topic, false);
00471 }
00472
00473 DDS::ReturnCode_t
00474 DomainParticipantImpl::delete_topic_i(
00475 DDS::Topic_ptr a_topic,
00476 bool remove_objref)
00477 {
00478
00479 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00480
00481 try {
00482
00483
00484
00485 TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00486
00487 CORBA::String_var topic_name = the_topic_servant->get_name();
00488
00489 DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
00490
00491 DomainParticipantImpl* the_dp_servant =
00492 dynamic_cast<DomainParticipantImpl*>(dp.in());
00493
00494 if (the_dp_servant != this ||
00495 (!remove_objref && the_topic_servant->entity_refs())) {
00496
00497
00498 return DDS::RETCODE_PRECONDITION_NOT_MET;
00499 }
00500
00501 {
00502
00503 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00504 tao_mon,
00505 this->topics_protector_,
00506 DDS::RETCODE_ERROR);
00507
00508 TopicMap::mapped_type* entry = 0;
00509
00510 if (Util::find(topics_, topic_name.in(), entry) == -1) {
00511 ACE_ERROR_RETURN((LM_ERROR,
00512 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00513 ACE_TEXT("%p\n"),
00514 ACE_TEXT("find")),
00515 DDS::RETCODE_ERROR);
00516 }
00517
00518 entry->client_refs_ --;
00519
00520 if (remove_objref == true ||
00521 0 == entry->client_refs_) {
00522
00523
00524 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00525 TopicStatus status
00526 = disco->remove_topic(the_dp_servant->get_domain_id(),
00527 the_dp_servant->get_id(),
00528 the_topic_servant->get_id());
00529
00530 if (status != REMOVED) {
00531 ACE_ERROR_RETURN((LM_ERROR,
00532 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00533 ACE_TEXT("remove_topic failed\n")),
00534 DDS::RETCODE_ERROR);
00535 }
00536
00537
00538
00539 if (topics_.erase(topic_name.in()) == 0) {
00540 ACE_ERROR_RETURN((LM_ERROR,
00541 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00542 ACE_TEXT("%p \n"),
00543 ACE_TEXT("unbind")),
00544 DDS::RETCODE_ERROR);
00545
00546 } else
00547 return DDS::RETCODE_OK;
00548
00549 }
00550 }
00551
00552 } catch (...) {
00553 ACE_ERROR((LM_ERROR,
00554 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00555 ACE_TEXT(" Caught Unknown Exception \n")));
00556 ret = DDS::RETCODE_ERROR;
00557 }
00558
00559 return ret;
00560 }
00561
00562
00563
00564 DDS::Topic_ptr
00565 DomainParticipantImpl::find_topic(
00566 const char * topic_name,
00567 const DDS::Duration_t & timeout)
00568 {
00569 ACE_Time_Value timeout_tv
00570 = ACE_OS::gettimeofday() + ACE_Time_Value(timeout.sec, timeout.nanosec/1000);
00571
00572 int first_time = 1;
00573
00574 while (first_time || ACE_OS::gettimeofday() < timeout_tv) {
00575 if (first_time) {
00576 first_time = 0;
00577 }
00578
00579 TopicMap::mapped_type* entry = 0;
00580 {
00581 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00582 tao_mon,
00583 this->topics_protector_,
00584 DDS::Topic::_nil());
00585
00586 if (Util::find(topics_, topic_name, entry) == 0) {
00587 entry->client_refs_ ++;
00588 return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00589 }
00590 }
00591
00592 RepoId topic_id;
00593 CORBA::String_var type_name;
00594 DDS::TopicQos_var qos;
00595
00596 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00597 TopicStatus status = disco->find_topic(domain_id_,
00598 topic_name,
00599 type_name.out(),
00600 qos.out(),
00601 topic_id);
00602
00603
00604 if (status == FOUND) {
00605 OpenDDS::DCPS::TypeSupport_var type_support =
00606 Registered_Data_Types->lookup(this, type_name.in());
00607 if (CORBA::is_nil(type_support)) {
00608 if (DCPS_debug_level) {
00609 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00610 ACE_TEXT("DomainParticipantImpl::find_topic, ")
00611 ACE_TEXT("can't create a Topic: type_name \"%C\"")
00612 ACE_TEXT("is not registered.\n"), type_name.in()));
00613 }
00614
00615 return DDS::Topic::_nil();
00616 }
00617
00618 DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00619 topic_name,
00620 type_name,
00621 qos,
00622 DDS::TopicListener::_nil(),
00623 OpenDDS::DCPS::DEFAULT_STATUS_MASK,
00624 type_support);
00625 return new_topic;
00626
00627 } else if (status == INTERNAL_ERROR) {
00628 ACE_ERROR((LM_ERROR,
00629 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
00630 ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
00631 return DDS::Topic::_nil();
00632 } else {
00633 ACE_Time_Value now = ACE_OS::gettimeofday();
00634
00635 if (now < timeout_tv) {
00636 ACE_Time_Value remaining = timeout_tv - now;
00637
00638 if (remaining.sec() >= 1) {
00639 ACE_OS::sleep(1);
00640
00641 } else {
00642 ACE_OS::sleep(remaining);
00643 }
00644 }
00645 }
00646 }
00647
00648 if (DCPS_debug_level >= 1) {
00649
00650 ACE_DEBUG((LM_DEBUG,
00651 ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
00652 ACE_TEXT("timed out. \n")));
00653 }
00654
00655 return DDS::Topic::_nil();
00656 }
00657
00658 DDS::TopicDescription_ptr
00659 DomainParticipantImpl::lookup_topicdescription(const char* name)
00660 {
00661 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00662 tao_mon,
00663 this->topics_protector_,
00664 DDS::Topic::_nil());
00665
00666 TopicMap::mapped_type* entry = 0;
00667
00668 if (Util::find(topics_, name, entry) == -1) {
00669 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00670 TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
00671 if (iter != topic_descrs_.end()) {
00672 return DDS::TopicDescription::_duplicate(iter->second);
00673 }
00674 #endif
00675 return DDS::TopicDescription::_nil();
00676
00677 } else {
00678 return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
00679 }
00680 }
00681
00682 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00683
00684 DDS::ContentFilteredTopic_ptr
00685 DomainParticipantImpl::create_contentfilteredtopic(
00686 const char* name,
00687 DDS::Topic_ptr related_topic,
00688 const char* filter_expression,
00689 const DDS::StringSeq& expression_parameters)
00690 {
00691 if (CORBA::is_nil(related_topic)) {
00692 if (DCPS_debug_level > 3) {
00693 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00694 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00695 ACE_TEXT("can't create a content-filtered topic due to null related ")
00696 ACE_TEXT("topic.\n")));
00697 }
00698 return 0;
00699 }
00700
00701 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00702
00703 if (topics_.count(name)) {
00704 if (DCPS_debug_level > 3) {
00705 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00706 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00707 ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00708 ACE_TEXT("already in use by a Topic.\n"), name));
00709 }
00710 return 0;
00711 }
00712
00713 if (topic_descrs_.count(name)) {
00714 if (DCPS_debug_level > 3) {
00715 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00716 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00717 ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00718 ACE_TEXT("already in use by a TopicDescription.\n"), name));
00719 }
00720 return 0;
00721 }
00722
00723 DDS::ContentFilteredTopic_var cft;
00724 try {
00725 cft = new ContentFilteredTopicImpl(name,
00726 related_topic, filter_expression, expression_parameters, this);
00727 } catch (const std::exception& e) {
00728 if (DCPS_debug_level) {
00729 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00730 ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00731 ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
00732 ACE_TEXT("%C.\n"), e.what()));
00733 }
00734 return 0;
00735 }
00736 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
00737 topic_descrs_[name] = td;
00738 return cft._retn();
00739 }
00740
00741 DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
00742 DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
00743 {
00744 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00745 DDS::RETCODE_OUT_OF_RESOURCES);
00746 DDS::ContentFilteredTopic_var cft =
00747 DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
00748 CORBA::String_var name = cft->get_name();
00749 TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
00750 if (iter == topic_descrs_.end()) {
00751 if (DCPS_debug_level > 3) {
00752 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00753 ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00754 ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00755 ACE_TEXT("because it is not in the set.\n"), name.in ()));
00756 }
00757 return DDS::RETCODE_PRECONDITION_NOT_MET;
00758 }
00759 if (dynamic_cast<TopicDescriptionImpl*>(iter->second.in())->has_reader()) {
00760 if (DCPS_debug_level > 3) {
00761 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00762 ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00763 ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00764 ACE_TEXT("because it still is used by a reader.\n"), name.in ()));
00765 }
00766 return DDS::RETCODE_PRECONDITION_NOT_MET;
00767 }
00768 topic_descrs_.erase(iter);
00769 return DDS::RETCODE_OK;
00770 }
00771
00772 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00773
00774 #ifndef OPENDDS_NO_MULTI_TOPIC
00775
00776 DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
00777 const char* name, const char* type_name,
00778 const char* subscription_expression,
00779 const DDS::StringSeq& expression_parameters)
00780 {
00781 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00782
00783 if (topics_.count(name)) {
00784 if (DCPS_debug_level > 3) {
00785 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00786 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00787 ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00788 ACE_TEXT("already in use by a Topic.\n"), name));
00789 }
00790 return 0;
00791 }
00792
00793 if (topic_descrs_.count(name)) {
00794 if (DCPS_debug_level > 3) {
00795 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00796 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00797 ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00798 ACE_TEXT("already in use by a TopicDescription.\n"), name));
00799 }
00800 return 0;
00801 }
00802
00803 DDS::MultiTopic_var mt;
00804 try {
00805 mt = new MultiTopicImpl(name, type_name, subscription_expression,
00806 expression_parameters, this);
00807 } catch (const std::exception& e) {
00808 if (DCPS_debug_level) {
00809 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00810 ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00811 ACE_TEXT("can't create a multi topic due to runtime error: ")
00812 ACE_TEXT("%C.\n"), e.what()));
00813 }
00814 return 0;
00815 }
00816 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
00817 topic_descrs_[name] = td;
00818 return mt._retn();
00819 }
00820
00821 DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
00822 DDS::MultiTopic_ptr a_multitopic)
00823 {
00824 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00825 DDS::RETCODE_OUT_OF_RESOURCES);
00826 DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
00827 CORBA::String_var mt_name = mt->get_name();
00828 TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
00829 if (iter == topic_descrs_.end()) {
00830 return DDS::RETCODE_PRECONDITION_NOT_MET;
00831 }
00832 if (dynamic_cast<TopicDescriptionImpl*>(iter->second.in())->has_reader()) {
00833 return DDS::RETCODE_PRECONDITION_NOT_MET;
00834 }
00835 topic_descrs_.erase(iter);
00836 return DDS::RETCODE_OK;
00837 }
00838
00839 #endif // OPENDDS_NO_MULTI_TOPIC
00840
00841 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00842
00843 RcHandle<FilterEvaluator>
00844 DomainParticipantImpl::get_filter_eval(const char* filter)
00845 {
00846 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
00847 RcHandle<FilterEvaluator>());
00848 typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00849 Map::iterator iter = filter_cache_.find(filter);
00850 if (iter == filter_cache_.end()) {
00851 return filter_cache_[filter] = new FilterEvaluator(filter, false);
00852 }
00853 return iter->second;
00854 }
00855
00856 void
00857 DomainParticipantImpl::deref_filter_eval(const char* filter)
00858 {
00859 ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
00860 typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00861 Map::iterator iter = filter_cache_.find(filter);
00862 if (iter != filter_cache_.end()) {
00863 if (iter->second->ref_count() == 1) {
00864 filter_cache_.erase(iter);
00865 }
00866 }
00867 }
00868
00869 #endif
00870
00871 DDS::ReturnCode_t
00872 DomainParticipantImpl::delete_contained_entities()
00873 {
00874
00875 set_deleted(true);
00876
00877
00878
00879
00880 Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
00881 disc->fini_bit(this);
00882
00883 TheServiceParticipant->reactor()->notify(this);
00884
00885 shutdown_mutex_.acquire();
00886 while (!shutdown_complete_) {
00887 shutdown_condition_.wait();
00888 }
00889 shutdown_complete_ = false;
00890 shutdown_mutex_.release();
00891
00892 bit_subscriber_ = DDS::Subscriber::_nil();
00893
00894 OpenDDS::DCPS::Registered_Data_Types->unregister_participant(this);
00895
00896 participant_objref_ = DDS::DomainParticipant::_nil();
00897
00898
00899 set_deleted(false);
00900 return shutdown_result_;
00901 }
00902
00903 CORBA::Boolean
00904 DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
00905 {
00906
00907
00908 {
00909 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00910 guard,
00911 this->topics_protector_,
00912 false);
00913
00914 for (TopicMap::iterator it(topics_.begin());
00915 it != topics_.end(); ++it) {
00916 if (a_handle == it->second.pair_.svt_->get_instance_handle())
00917 return true;
00918 }
00919 }
00920
00921 {
00922 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00923 guard,
00924 this->subscribers_protector_,
00925 false);
00926
00927 for (SubscriberSet::iterator it(subscribers_.begin());
00928 it != subscribers_.end(); ++it) {
00929 if (a_handle == it->svt_->get_instance_handle())
00930 return true;
00931 }
00932 }
00933
00934 {
00935 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00936 guard,
00937 this->publishers_protector_,
00938 false);
00939
00940 for (PublisherSet::iterator it(publishers_.begin());
00941 it != publishers_.end(); ++it) {
00942 if (a_handle == it->svt_->get_instance_handle())
00943 return true;
00944 }
00945 }
00946
00947
00948
00949 for (SubscriberSet::iterator it(subscribers_.begin());
00950 it != subscribers_.end(); ++it) {
00951 if (it->svt_->contains_reader(a_handle))
00952 return true;
00953 }
00954
00955 for (PublisherSet::iterator it(publishers_.begin());
00956 it != publishers_.end(); ++it) {
00957 if (it->svt_->contains_writer(a_handle))
00958 return true;
00959 }
00960
00961 return false;
00962 }
00963
00964 DDS::ReturnCode_t
00965 DomainParticipantImpl::set_qos(
00966 const DDS::DomainParticipantQos & qos)
00967 {
00968 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00969 if (qos_ == qos)
00970 return DDS::RETCODE_OK;
00971
00972
00973 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00974 return DDS::RETCODE_IMMUTABLE_POLICY;
00975
00976 } else {
00977 qos_ = qos;
00978
00979 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00980 const bool status =
00981 disco->update_domain_participant_qos(domain_id_,
00982 dp_id_,
00983 qos_);
00984
00985 if (!status) {
00986 ACE_ERROR_RETURN((LM_ERROR,
00987 ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
00988 ACE_TEXT("failed on compatiblity check. \n")),
00989 DDS::RETCODE_ERROR);
00990 }
00991 }
00992
00993 return DDS::RETCODE_OK;
00994
00995 } else {
00996 return DDS::RETCODE_INCONSISTENT_POLICY;
00997 }
00998 }
00999
01000 DDS::ReturnCode_t
01001 DomainParticipantImpl::get_qos(
01002 DDS::DomainParticipantQos & qos)
01003 {
01004 qos = qos_;
01005 return DDS::RETCODE_OK;
01006 }
01007
01008 DDS::ReturnCode_t
01009 DomainParticipantImpl::set_listener(
01010 DDS::DomainParticipantListener_ptr a_listener,
01011 DDS::StatusMask mask)
01012 {
01013 listener_mask_ = mask;
01014
01015 listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
01016 return DDS::RETCODE_OK;
01017 }
01018
01019 DDS::DomainParticipantListener_ptr
01020 DomainParticipantImpl::get_listener()
01021 {
01022 return DDS::DomainParticipantListener::_duplicate(listener_.in());
01023 }
01024
01025 DDS::ReturnCode_t
01026 DomainParticipantImpl::ignore_participant(
01027 DDS::InstanceHandle_t handle)
01028 {
01029 #if !defined (DDS_HAS_MINIMUM_BIT)
01030
01031 if (enabled_ == false) {
01032 ACE_ERROR_RETURN((LM_ERROR,
01033 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01034 ACE_TEXT(" Entity is not enabled. \n")),
01035 DDS::RETCODE_NOT_ENABLED);
01036 }
01037
01038 RepoId ignoreId = get_repoid(handle);
01039 HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
01040
01041 if (location == this->ignored_participants_.end()) {
01042 this->ignored_participants_[ ignoreId] = handle;
01043 }
01044 else {
01045 return DDS::RETCODE_OK;
01046 }
01047
01048 if (DCPS_debug_level >= 4) {
01049 GuidConverter converter(dp_id_);
01050 ACE_DEBUG((LM_DEBUG,
01051 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01052 ACE_TEXT("%C ignoring handle %x.\n"),
01053 OPENDDS_STRING(converter).c_str(),
01054 handle));
01055 }
01056
01057 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01058 if (!disco->ignore_domain_participant(domain_id_,
01059 dp_id_,
01060 ignoreId)) {
01061 ACE_ERROR_RETURN((LM_ERROR,
01062 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01063 ACE_TEXT(" Could not ignore domain participant.\n")),
01064 DDS::RETCODE_NOT_ENABLED);
01065 return DDS::RETCODE_ERROR;
01066 }
01067
01068
01069 if (DCPS_debug_level >= 4) {
01070 GuidConverter converter(dp_id_);
01071 ACE_DEBUG((LM_DEBUG,
01072 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01073 ACE_TEXT("%C repo call returned.\n"),
01074 OPENDDS_STRING(converter).c_str()));
01075 }
01076
01077 return DDS::RETCODE_OK;
01078 #else
01079 ACE_UNUSED_ARG(handle);
01080 return DDS::RETCODE_UNSUPPORTED;
01081 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01082 }
01083
01084 DDS::ReturnCode_t
01085 DomainParticipantImpl::ignore_topic(
01086 DDS::InstanceHandle_t handle)
01087 {
01088 #if !defined (DDS_HAS_MINIMUM_BIT)
01089
01090 if (enabled_ == false) {
01091 ACE_ERROR_RETURN((LM_ERROR,
01092 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01093 ACE_TEXT(" Entity is not enabled. \n")),
01094 DDS::RETCODE_NOT_ENABLED);
01095 }
01096
01097 RepoId ignoreId = get_repoid(handle);
01098 HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
01099
01100 if (location == this->ignored_topics_.end()) {
01101 this->ignored_topics_[ ignoreId] = handle;
01102 }
01103 else {
01104 return DDS::RETCODE_OK;
01105 }
01106
01107 if (DCPS_debug_level >= 4) {
01108 GuidConverter converter(dp_id_);
01109 ACE_DEBUG((LM_DEBUG,
01110 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
01111 ACE_TEXT("%C ignoring handle %x.\n"),
01112 OPENDDS_STRING(converter).c_str(),
01113 handle));
01114 }
01115
01116 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01117 if (!disco->ignore_topic(domain_id_,
01118 dp_id_,
01119 ignoreId)) {
01120 ACE_ERROR((LM_ERROR,
01121 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01122 ACE_TEXT(" Could not ignore topic.\n")));
01123 }
01124
01125 return DDS::RETCODE_OK;
01126 #else
01127 ACE_UNUSED_ARG(handle);
01128 return DDS::RETCODE_UNSUPPORTED;
01129 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01130 }
01131
01132 DDS::ReturnCode_t
01133 DomainParticipantImpl::ignore_publication(
01134 DDS::InstanceHandle_t handle)
01135 {
01136 #if !defined (DDS_HAS_MINIMUM_BIT)
01137
01138 if (enabled_ == false) {
01139 ACE_ERROR_RETURN((LM_ERROR,
01140 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01141 ACE_TEXT(" Entity is not enabled. \n")),
01142 DDS::RETCODE_NOT_ENABLED);
01143 }
01144
01145 if (DCPS_debug_level >= 4) {
01146 GuidConverter converter(dp_id_);
01147 ACE_DEBUG((LM_DEBUG,
01148 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
01149 ACE_TEXT("%C ignoring handle %x.\n"),
01150 OPENDDS_STRING(converter).c_str(),
01151 handle));
01152 }
01153
01154 RepoId ignoreId = get_repoid(handle);
01155 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01156 if (!disco->ignore_publication(domain_id_,
01157 dp_id_,
01158 ignoreId)) {
01159 ACE_ERROR_RETURN((LM_ERROR,
01160 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01161 ACE_TEXT(" could not ignore publication in discovery. \n")),
01162 DDS::RETCODE_ERROR);
01163 }
01164
01165 return DDS::RETCODE_OK;
01166 #else
01167 ACE_UNUSED_ARG(handle);
01168 return DDS::RETCODE_UNSUPPORTED;
01169 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01170 }
01171
01172 DDS::ReturnCode_t
01173 DomainParticipantImpl::ignore_subscription(
01174 DDS::InstanceHandle_t handle)
01175 {
01176 #if !defined (DDS_HAS_MINIMUM_BIT)
01177
01178 if (enabled_ == false) {
01179 ACE_ERROR_RETURN((LM_ERROR,
01180 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01181 ACE_TEXT(" Entity is not enabled. \n")),
01182 DDS::RETCODE_NOT_ENABLED);
01183 }
01184
01185 if (DCPS_debug_level >= 4) {
01186 GuidConverter converter(dp_id_);
01187 ACE_DEBUG((LM_DEBUG,
01188 ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
01189 ACE_TEXT("%C ignoring handle %d.\n"),
01190 OPENDDS_STRING(converter).c_str(),
01191 handle));
01192 }
01193
01194
01195 RepoId ignoreId = get_repoid(handle);
01196 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01197 if (!disco->ignore_subscription(domain_id_,
01198 dp_id_,
01199 ignoreId)) {
01200 ACE_ERROR_RETURN((LM_ERROR,
01201 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01202 ACE_TEXT(" could not ignore subscription in discovery. \n")),
01203 DDS::RETCODE_ERROR);
01204 }
01205
01206 return DDS::RETCODE_OK;
01207 #else
01208 ACE_UNUSED_ARG(handle);
01209 return DDS::RETCODE_UNSUPPORTED;
01210 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01211 }
01212
01213 DDS::DomainId_t
01214 DomainParticipantImpl::get_domain_id()
01215 {
01216 return domain_id_;
01217 }
01218
01219 DDS::ReturnCode_t
01220 DomainParticipantImpl::assert_liveliness()
01221 {
01222
01223
01224
01225
01226
01227
01228
01229
01230 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01231 tao_mon,
01232 this->publishers_protector_,
01233 DDS::RETCODE_ERROR);
01234
01235 for (PublisherSet::iterator it(publishers_.begin());
01236 it != publishers_.end(); ++it) {
01237 it->svt_->assert_liveliness_by_participant();
01238 }
01239
01240 last_liveliness_activity_ = ACE_OS::gettimeofday();
01241
01242 return DDS::RETCODE_OK;
01243 }
01244
01245 DDS::ReturnCode_t
01246 DomainParticipantImpl::set_default_publisher_qos(
01247 const DDS::PublisherQos & qos)
01248 {
01249 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01250 default_publisher_qos_ = qos;
01251 return DDS::RETCODE_OK;
01252
01253 } else {
01254 return DDS::RETCODE_INCONSISTENT_POLICY;
01255 }
01256 }
01257
01258 DDS::ReturnCode_t
01259 DomainParticipantImpl::get_default_publisher_qos(
01260 DDS::PublisherQos & qos)
01261 {
01262 qos = default_publisher_qos_;
01263 return DDS::RETCODE_OK;
01264 }
01265
01266 DDS::ReturnCode_t
01267 DomainParticipantImpl::set_default_subscriber_qos(
01268 const DDS::SubscriberQos & qos)
01269 {
01270 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01271 default_subscriber_qos_ = qos;
01272 return DDS::RETCODE_OK;
01273
01274 } else {
01275 return DDS::RETCODE_INCONSISTENT_POLICY;
01276 }
01277 }
01278
01279 DDS::ReturnCode_t
01280 DomainParticipantImpl::get_default_subscriber_qos(
01281 DDS::SubscriberQos & qos)
01282 {
01283 qos = default_subscriber_qos_;
01284 return DDS::RETCODE_OK;
01285 }
01286
01287 DDS::ReturnCode_t
01288 DomainParticipantImpl::set_default_topic_qos(
01289 const DDS::TopicQos & qos)
01290 {
01291 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01292 default_topic_qos_ = qos;
01293 return DDS::RETCODE_OK;
01294
01295 } else {
01296 return DDS::RETCODE_INCONSISTENT_POLICY;
01297 }
01298 }
01299
01300 DDS::ReturnCode_t
01301 DomainParticipantImpl::get_default_topic_qos(
01302 DDS::TopicQos & qos)
01303 {
01304 qos = default_topic_qos_;
01305 return DDS::RETCODE_OK;
01306 }
01307
01308 DDS::ReturnCode_t
01309 DomainParticipantImpl::get_current_time(
01310 DDS::Time_t & current_time)
01311 {
01312 current_time
01313 = OpenDDS::DCPS::time_value_to_time(
01314 ACE_OS::gettimeofday());
01315 return DDS::RETCODE_OK;
01316 }
01317
01318 #if !defined (DDS_HAS_MINIMUM_BIT)
01319
01320 DDS::ReturnCode_t
01321 DomainParticipantImpl::get_discovered_participants(
01322 DDS::InstanceHandleSeq & participant_handles)
01323 {
01324 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01325 guard,
01326 this->handle_protector_,
01327 DDS::RETCODE_ERROR);
01328
01329 HandleMap::const_iterator itEnd = this->handles_.end();
01330
01331 for (HandleMap::const_iterator iter = this->handles_.begin();
01332 iter != itEnd; ++iter) {
01333 GuidConverter converter(iter->first);
01334
01335 if (converter.entityKind() == KIND_PARTICIPANT)
01336 {
01337
01338 if (iter->first == this->dp_id_
01339 || (this->ignored_participants_.find(iter->first)
01340 != this->ignored_participants_.end ())) {
01341 continue;
01342 }
01343
01344 push_back(participant_handles, iter->second);
01345 }
01346 }
01347
01348 return DDS::RETCODE_OK;
01349 }
01350
01351 DDS::ReturnCode_t
01352 DomainParticipantImpl::get_discovered_participant_data(
01353 DDS::ParticipantBuiltinTopicData & participant_data,
01354 DDS::InstanceHandle_t participant_handle)
01355 {
01356 {
01357 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01358 guard,
01359 this->handle_protector_,
01360 DDS::RETCODE_ERROR);
01361
01362 bool found = false;
01363 HandleMap::const_iterator itEnd = this->handles_.end();
01364
01365 for (HandleMap::const_iterator iter = this->handles_.begin();
01366 iter != itEnd; ++iter) {
01367 GuidConverter converter(iter->first);
01368
01369 if (participant_handle == iter->second
01370 && converter.entityKind() == KIND_PARTICIPANT) {
01371 found = true;
01372 break;
01373 }
01374 }
01375
01376 if (!found)
01377 return DDS::RETCODE_PRECONDITION_NOT_MET;
01378 }
01379
01380 DDS::SampleInfoSeq info;
01381 DDS::ParticipantBuiltinTopicDataSeq data;
01382 DDS::DataReader_var dr =
01383 this->bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
01384 DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
01385 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
01386 DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
01387 info,
01388 1,
01389 participant_handle,
01390 DDS::ANY_SAMPLE_STATE,
01391 DDS::ANY_VIEW_STATE,
01392 DDS::ANY_INSTANCE_STATE);
01393
01394 if (ret == DDS::RETCODE_OK) {
01395 if (info[0].valid_data)
01396 participant_data = data[0];
01397
01398 else
01399 return DDS::RETCODE_NO_DATA;
01400 }
01401
01402 return ret;
01403 }
01404
01405 DDS::ReturnCode_t
01406 DomainParticipantImpl::get_discovered_topics(
01407 DDS::InstanceHandleSeq & topic_handles)
01408 {
01409 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01410 guard,
01411 this->handle_protector_,
01412 DDS::RETCODE_ERROR);
01413
01414 HandleMap::const_iterator itEnd = this->handles_.end();
01415
01416 for (HandleMap::const_iterator iter = this->handles_.begin();
01417 iter != itEnd; ++iter) {
01418 GuidConverter converter(iter->first);
01419
01420 if (converter.entityKind() == KIND_TOPIC) {
01421
01422
01423 if (this->ignored_topics_.find(iter->first)
01424 != this->ignored_topics_.end ()) {
01425 continue;
01426 }
01427
01428 push_back(topic_handles, iter->second);
01429 }
01430 }
01431
01432 return DDS::RETCODE_OK;
01433 }
01434
01435 DDS::ReturnCode_t
01436 DomainParticipantImpl::get_discovered_topic_data(
01437 DDS::TopicBuiltinTopicData & topic_data,
01438 DDS::InstanceHandle_t topic_handle)
01439 {
01440 {
01441 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01442 guard,
01443 this->handle_protector_,
01444 DDS::RETCODE_ERROR);
01445
01446 bool found = false;
01447 HandleMap::const_iterator itEnd = this->handles_.end();
01448
01449 for (HandleMap::const_iterator iter = this->handles_.begin();
01450 iter != itEnd; ++iter) {
01451 GuidConverter converter(iter->first);
01452
01453 if (topic_handle == iter->second
01454 && converter.entityKind() == KIND_TOPIC) {
01455 found = true;
01456 break;
01457 }
01458 }
01459
01460 if (!found)
01461 return DDS::RETCODE_PRECONDITION_NOT_MET;
01462 }
01463
01464 DDS::DataReader_var dr =
01465 bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
01466 DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
01467 DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
01468
01469 DDS::SampleInfoSeq info;
01470 DDS::TopicBuiltinTopicDataSeq data;
01471 DDS::ReturnCode_t ret =
01472 bit_topic_dr->read_instance(data,
01473 info,
01474 1,
01475 topic_handle,
01476 DDS::ANY_SAMPLE_STATE,
01477 DDS::ANY_VIEW_STATE,
01478 DDS::ANY_INSTANCE_STATE);
01479
01480 if (ret == DDS::RETCODE_OK) {
01481 if (info[0].valid_data)
01482 topic_data = data[0];
01483
01484 else
01485 return DDS::RETCODE_NO_DATA;
01486 }
01487
01488 return ret;
01489 }
01490
01491 #endif
01492
01493 DDS::ReturnCode_t
01494 DomainParticipantImpl::enable()
01495 {
01496
01497
01498
01499
01500
01501
01502 if (this->is_enabled()) {
01503 return DDS::RETCODE_OK;
01504 }
01505
01506 DDS::DomainParticipantFactoryQos qos;
01507
01508 if (this->factory_->get_qos(qos) != DDS::RETCODE_OK) {
01509 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t)DomainParticipantImpl::enable failed to")
01510 ACE_TEXT(" get factory qos\n")));
01511 return DDS::RETCODE_ERROR;
01512 }
01513
01514 if (qos.entity_factory.autoenable_created_entities == 0) {
01515 return DDS::RETCODE_PRECONDITION_NOT_MET;
01516 }
01517
01518 DDS::ReturnCode_t ret = this->set_enabled();
01519
01520 if (monitor_) {
01521 monitor_->report();
01522 }
01523 if (TheServiceParticipant->monitor_) {
01524 TheServiceParticipant->monitor_->report();
01525 }
01526
01527 if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
01528 Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
01529 this->bit_subscriber_ = disc->init_bit(this);
01530 }
01531
01532 return ret;
01533 }
01534
01535 RepoId
01536 DomainParticipantImpl::get_id()
01537 {
01538 return dp_id_;
01539 }
01540
01541 OPENDDS_STRING
01542 DomainParticipantImpl::get_unique_id()
01543 {
01544 return GuidConverter(dp_id_).uniqueId();
01545 }
01546
01547
01548 DDS::InstanceHandle_t
01549 DomainParticipantImpl::get_instance_handle()
01550 {
01551 return this->id_to_handle(this->dp_id_);
01552 }
01553
01554 DDS::InstanceHandle_t
01555 DomainParticipantImpl::id_to_handle(const RepoId& id)
01556 {
01557 if (id == GUID_UNKNOWN) {
01558 return this->participant_handles_.next();
01559 }
01560
01561 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01562 guard,
01563 this->handle_protector_,
01564 HANDLE_UNKNOWN);
01565
01566 HandleMap::const_iterator location = this->handles_.find(id);
01567 DDS::InstanceHandle_t result;
01568
01569 if (location == this->handles_.end()) {
01570
01571 result = this->participant_handles_.next();
01572 this->handles_[id] = result;
01573 this->repoIds_[result] = id;
01574 } else {
01575 result = location->second;
01576 }
01577
01578 return result;
01579 }
01580
01581 RepoId
01582 DomainParticipantImpl::get_repoid(const DDS::InstanceHandle_t& handle)
01583 {
01584 RepoId result = GUID_UNKNOWN;
01585 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01586 guard,
01587 this->handle_protector_,
01588 GUID_UNKNOWN);
01589 RepoIdMap::const_iterator location = this->repoIds_.find(handle);
01590 if (location != this->repoIds_.end()) {
01591 result = location->second;
01592 }
01593 return result;
01594 }
01595
01596 DDS::Topic_ptr
01597 DomainParticipantImpl::create_new_topic(
01598 const RepoId topic_id,
01599 const char * topic_name,
01600 const char * type_name,
01601 const DDS::TopicQos & qos,
01602 DDS::TopicListener_ptr a_listener,
01603 const DDS::StatusMask & mask,
01604 OpenDDS::DCPS::TypeSupport_ptr type_support)
01605 {
01606 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01607 tao_mon,
01608 this->topics_protector_,
01609 DDS::Topic::_nil());
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620 TopicImpl* topic_servant = 0;
01621
01622 ACE_NEW_RETURN(topic_servant,
01623 TopicImpl(topic_id,
01624 topic_name,
01625 type_name,
01626 type_support,
01627 qos,
01628 a_listener,
01629 mask,
01630 this),
01631 DDS::Topic::_nil());
01632
01633 if ((enabled_ == true)
01634 && (qos_.entity_factory.autoenable_created_entities == 1)) {
01635 topic_servant->enable();
01636 }
01637
01638 DDS::Topic_ptr obj(topic_servant);
01639
01640
01641 RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, NO_DUP));
01642
01643 if (OpenDDS::DCPS::bind(topics_, topic_name, refCounted_topic) == -1) {
01644 ACE_ERROR((LM_ERROR,
01645 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
01646 ACE_TEXT("%p \n"),
01647 ACE_TEXT("bind")));
01648 return DDS::Topic::_nil();
01649 }
01650
01651 if (this->monitor_) {
01652 this->monitor_->report();
01653 }
01654
01655
01656
01657 return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
01658 }
01659
01660 int
01661 DomainParticipantImpl::is_clean() const
01662 {
01663 int sub_is_clean = subscribers_.empty();
01664 int topics_is_clean = topics_.size() == 0;
01665
01666 if (!TheTransientKludge->is_enabled()) {
01667
01668
01669
01670 sub_is_clean = sub_is_clean == 0 ? subscribers_.size() == 1 : 1;
01671 topics_is_clean = topics_is_clean == 0 ? topics_.size() == 4 : 1;
01672 }
01673 return (publishers_.empty()
01674 && sub_is_clean == 1
01675 && topics_is_clean == 1);
01676 }
01677
01678 void
01679 DomainParticipantImpl::set_object_reference(const DDS::DomainParticipant_ptr& dp)
01680 {
01681 if (!CORBA::is_nil(participant_objref_.in())) {
01682 ACE_ERROR((LM_ERROR,
01683 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::set_object_reference, ")
01684 ACE_TEXT("This participant is already activated. \n")));
01685 return;
01686 }
01687
01688 participant_objref_ = DDS::DomainParticipant::_duplicate(dp);
01689 }
01690
01691 DDS::DomainParticipantListener_ptr
01692 DomainParticipantImpl::listener_for(DDS::StatusKind kind)
01693 {
01694 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01695 return DDS::DomainParticipantListener::_nil ();
01696 } else {
01697 return DDS::DomainParticipantListener::_duplicate(listener_.in());
01698 }
01699 }
01700
01701 void
01702 DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
01703 {
01704 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01705 guard,
01706 this->topics_protector_);
01707
01708 topics.reserve(topics_.size());
01709 for (TopicMap::iterator it(topics_.begin());
01710 it != topics_.end(); ++it) {
01711 topics.push_back(it->second.pair_.svt_->get_id());
01712 }
01713 }
01714
01715 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01716
01717 OwnershipManager*
01718 DomainParticipantImpl::ownership_manager()
01719 {
01720 #if !defined (DDS_HAS_MINIMUM_BIT)
01721
01722 DDS::DataReader_var dr =
01723 bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
01724 DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
01725 DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
01726
01727 if (!CORBA::is_nil(bit_pub_dr.in())) {
01728 DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
01729 if (CORBA::is_nil(listener.in())) {
01730 DDS::DataReaderListener_var bit_pub_listener =
01731 new BitPubListenerImpl(this);
01732 bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
01733
01734 bit_pub_listener->on_data_available(bit_pub_dr.in());
01735 }
01736 }
01737
01738 #endif
01739 return &this->owner_man_;
01740 }
01741
01742 void
01743 DomainParticipantImpl::update_ownership_strength (const PublicationId& pub_id,
01744 const CORBA::Long& ownership_strength)
01745 {
01746 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01747 tao_mon,
01748 this->subscribers_protector_);
01749
01750 if (this->get_deleted ())
01751 return;
01752
01753 for (SubscriberSet::iterator it(this->subscribers_.begin());
01754 it != this->subscribers_.end(); ++it) {
01755 it->svt_->update_ownership_strength(pub_id, ownership_strength);
01756 }
01757 }
01758
01759 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01760
01761 DomainParticipantImpl::RepoIdSequence::RepoIdSequence(RepoId& base) :
01762 base_(base),
01763 serial_(0),
01764 builder_(base_)
01765 {
01766 }
01767
01768 RepoId
01769 DomainParticipantImpl::RepoIdSequence::next()
01770 {
01771 builder_.entityKey(++serial_);
01772 return builder_;
01773 }
01774
01775
01776
01777
01778
01779 bool
01780 DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
01781 {
01782 if (pub_qos == PUBLISHER_QOS_DEFAULT) {
01783 this->get_default_publisher_qos(pub_qos);
01784 }
01785
01786 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
01787
01788 if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
01789 ACE_ERROR((LM_ERROR,
01790 ACE_TEXT("(%P|%t) ERROR: ")
01791 ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
01792 ACE_TEXT("invalid qos.\n")));
01793 return false;
01794 }
01795
01796 return true;
01797 }
01798
01799 bool
01800 DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
01801 {
01802 if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
01803 this->get_default_subscriber_qos(subscriber_qos);
01804 }
01805
01806 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
01807
01808 if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
01809 ACE_ERROR((LM_ERROR,
01810 ACE_TEXT("(%P|%t) ERROR: ")
01811 ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
01812 ACE_TEXT("invalid qos.\n")));
01813 return false;
01814 }
01815
01816
01817 return true;
01818 }
01819
01820 Recorder_ptr
01821 DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
01822 const DDS::SubscriberQos& subscriber_qos,
01823 const DDS::DataReaderQos& datareader_qos,
01824 const RecorderListener_rch& a_listener,
01825 DDS::StatusMask mask)
01826 {
01827 if (CORBA::is_nil(a_topic)) {
01828 ACE_ERROR((LM_ERROR,
01829 ACE_TEXT("(%P|%t) ERROR: ")
01830 ACE_TEXT("SubscriberImpl::create_datareader, ")
01831 ACE_TEXT("topic desc is nil.\n")));
01832 return 0;
01833 }
01834
01835 DDS::SubscriberQos sub_qos = subscriber_qos;
01836 DDS::DataReaderQos dr_qos;
01837
01838 if (! this->validate_subscriber_qos(sub_qos) ||
01839 ! SubscriberImpl::validate_datareader_qos(datareader_qos,
01840 TheServiceParticipant->initial_DataReaderQos(),
01841 a_topic,
01842 dr_qos, false) ) {
01843 return 0;
01844 }
01845
01846 RecorderImpl* recorder(new RecorderImpl);
01847 Recorder_var result(recorder);
01848
01849 recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
01850 dr_qos, a_listener,
01851 mask, this, subscriber_qos);
01852
01853 if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
01854 recorder->enable();
01855 }
01856
01857 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
01858 recorders_.insert(result);
01859
01860 return result._retn();
01861 }
01862
01863 Replayer_ptr
01864 DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
01865 const DDS::PublisherQos& publisher_qos,
01866 const DDS::DataWriterQos& datawriter_qos,
01867 const ReplayerListener_rch& a_listener,
01868 DDS::StatusMask mask)
01869 {
01870 if (CORBA::is_nil(a_topic)) {
01871 ACE_ERROR((LM_ERROR,
01872 ACE_TEXT("(%P|%t) ERROR: ")
01873 ACE_TEXT("SubscriberImpl::create_datareader, ")
01874 ACE_TEXT("topic desc is nil.\n")));
01875 return 0;
01876 }
01877
01878 DDS::PublisherQos pub_qos = publisher_qos;
01879 DDS::DataWriterQos dw_qos;
01880
01881 if (! this->validate_publisher_qos(pub_qos) ||
01882 ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
01883 TheServiceParticipant->initial_DataWriterQos(),
01884 a_topic,
01885 dw_qos)) {
01886 return 0;
01887 }
01888
01889 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
01890
01891 ReplayerImpl* replayer(new ReplayerImpl);
01892 Replayer_var result(replayer);
01893
01894 replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
01895
01896 if (this->enabled_ == true
01897 && qos_.entity_factory.autoenable_created_entities == 1) {
01898
01899 DDS::ReturnCode_t ret = replayer->enable();
01900
01901 if (ret != DDS::RETCODE_OK) {
01902 ACE_ERROR((LM_ERROR,
01903 ACE_TEXT("(%P|%t) ERROR: ")
01904 ACE_TEXT("DomainParticipantImpl::create_replayer, ")
01905 ACE_TEXT("enable failed.\n")));
01906 return 0;
01907 }
01908 }
01909
01910 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
01911 replayers_.insert(result);
01912 return result._retn();
01913 }
01914
01915 void
01916 DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
01917 {
01918 const Recorder_var recvar(Recorder::_duplicate(recorder));
01919 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
01920 recorders_.erase(recvar);
01921 }
01922
01923 void
01924 DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
01925 {
01926 const Replayer_var repvar(Replayer::_duplicate(replayer));
01927 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
01928 replayers_.erase(repvar);
01929 }
01930
01931 void
01932 DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
01933 {
01934 automatic_liveliness_timer_.add_adjust(writer);
01935 participant_liveliness_timer_.add_adjust(writer);
01936 }
01937
01938 void
01939 DomainParticipantImpl::remove_adjust_liveliness_timers()
01940 {
01941 automatic_liveliness_timer_.remove_adjust();
01942 participant_liveliness_timer_.remove_adjust();
01943 }
01944
01945 DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
01946 DDS::LivelinessQosPolicyKind kind)
01947 : impl_(impl)
01948 , kind_ (kind)
01949 , interval_ (ACE_Time_Value::max_time)
01950 , recalculate_interval_ (false)
01951 , scheduled_ (false)
01952 { }
01953
01954 DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
01955 {
01956 if (scheduled_) {
01957 TheServiceParticipant->timer()->cancel_timer(this);
01958 }
01959 }
01960
01961 void
01962 DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
01963 {
01964 ACE_GUARD(ACE_Thread_Mutex,
01965 guard,
01966 this->lock_);
01967
01968 const ACE_Time_Value now = ACE_OS::gettimeofday();
01969
01970
01971 const ACE_Time_Value remaining = interval_ - (now - last_liveliness_check_);
01972
01973
01974 const ACE_Time_Value i = writer->liveliness_check_interval(kind_);
01975 if (i < interval_) {
01976 interval_ = i;
01977 }
01978
01979
01980 if (scheduled_ && interval_ < remaining) {
01981 TheServiceParticipant->timer()->cancel_timer(this);
01982 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
01983 } else if (!scheduled_) {
01984 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
01985 scheduled_ = true;
01986 last_liveliness_check_ = now;
01987 }
01988 }
01989
01990 void
01991 DomainParticipantImpl::LivelinessTimer::remove_adjust()
01992 {
01993 ACE_GUARD(ACE_Thread_Mutex,
01994 guard,
01995 this->lock_);
01996
01997 recalculate_interval_ = true;
01998 }
01999
02000 int
02001 DomainParticipantImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value & tv, const void* )
02002 {
02003 ACE_GUARD_RETURN(ACE_Thread_Mutex,
02004 guard,
02005 this->lock_,
02006 0);
02007
02008 scheduled_ = false;
02009
02010 if (recalculate_interval_) {
02011 interval_ = impl_.liveliness_check_interval(kind_);
02012 recalculate_interval_ = false;
02013 }
02014
02015 if (interval_ != ACE_Time_Value::max_time) {
02016 dispatch(tv);
02017 last_liveliness_check_ = tv;
02018 TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02019 scheduled_ = true;
02020 }
02021
02022 return 0;
02023 }
02024
02025 DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
02026 : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
02027 { }
02028
02029 void
02030 DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const ACE_Time_Value& )
02031 {
02032 impl_.signal_liveliness (kind_);
02033 }
02034
02035 DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
02036 : LivelinessTimer (impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
02037 { }
02038
02039 void
02040 DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const ACE_Time_Value& tv)
02041 {
02042 if (impl_.participant_liveliness_activity_after (tv - interval())) {
02043 impl_.signal_liveliness (kind_);
02044 }
02045 }
02046
02047 ACE_Time_Value
02048 DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
02049 {
02050 ACE_Time_Value tv = ACE_Time_Value::max_time;
02051
02052 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02053 tao_mon,
02054 this->publishers_protector_,
02055 tv);
02056
02057 for (PublisherSet::iterator it(publishers_.begin());
02058 it != publishers_.end(); ++it) {
02059 tv = std::min (tv, it->svt_->liveliness_check_interval(kind));
02060 }
02061
02062 return tv;
02063 }
02064
02065 bool
02066 DomainParticipantImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
02067 {
02068 if (last_liveliness_activity_ > tv) {
02069 return true;
02070 }
02071
02072 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02073 tao_mon,
02074 this->publishers_protector_,
02075 tv);
02076
02077 for (PublisherSet::iterator it(publishers_.begin());
02078 it != publishers_.end(); ++it) {
02079 if (it->svt_->participant_liveliness_activity_after(tv)) {
02080 return true;
02081 }
02082 }
02083
02084 return false;
02085 }
02086
02087 void
02088 DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
02089 {
02090 TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
02091 }
02092
02093 int
02094 DomainParticipantImpl::handle_exception(ACE_HANDLE )
02095 {
02096
02097 {
02098 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02099 tao_mon,
02100 this->publishers_protector_,
02101 DDS::RETCODE_ERROR);
02102
02103 PublisherSet::iterator pubIter = publishers_.begin();
02104 DDS::Publisher_ptr pubPtr;
02105 size_t pubsize = publishers_.size();
02106
02107 while (pubsize > 0) {
02108 pubPtr = (*pubIter).obj_.in();
02109 ++pubIter;
02110
02111 DDS::ReturnCode_t result
02112 = pubPtr->delete_contained_entities();
02113
02114 if (result != DDS::RETCODE_OK) {
02115 return result;
02116 }
02117
02118 result = delete_publisher(pubPtr);
02119
02120 if (result != DDS::RETCODE_OK) {
02121 return result;
02122 }
02123
02124 pubsize--;
02125 }
02126
02127 }
02128
02129
02130 {
02131 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02132 tao_mon,
02133 this->subscribers_protector_,
02134 DDS::RETCODE_ERROR);
02135
02136 SubscriberSet::iterator subIter = subscribers_.begin();
02137 DDS::Subscriber_ptr subPtr;
02138 size_t subsize = subscribers_.size();
02139
02140 while (subsize > 0) {
02141 subPtr = (*subIter).obj_.in();
02142 ++subIter;
02143
02144 DDS::ReturnCode_t result = subPtr->delete_contained_entities();
02145
02146 if (result != DDS::RETCODE_OK) {
02147 return result;
02148 }
02149
02150 result = delete_subscriber(subPtr);
02151
02152 if (result != DDS::RETCODE_OK) {
02153 return result;
02154 }
02155
02156 subsize--;
02157 }
02158 }
02159
02160 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
02161
02162 {
02163 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02164 tao_mon,
02165 this->topics_protector_,
02166 DDS::RETCODE_ERROR);
02167
02168 TopicMap::iterator topicIter = topics_.begin();
02169 DDS::Topic_ptr topicPtr;
02170 size_t topicsize = topics_.size();
02171
02172 while (topicsize > 0) {
02173 topicPtr = topicIter->second.pair_.obj_.in();
02174 ++topicIter;
02175
02176
02177 DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
02178
02179 if (result != DDS::RETCODE_OK) {
02180 return result;
02181 }
02182 topicsize--;
02183 }
02184 }
02185
02186 {
02187 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02188 tao_mon,
02189 this->recorders_protector_,
02190 DDS::RETCODE_ERROR);
02191
02192 RecorderSet::iterator it = recorders_.begin();
02193 for (; it != recorders_.end(); ++it ){
02194 RecorderImpl* impl = static_cast<RecorderImpl* >(it->in());
02195 DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02196 if (impl) result = impl->cleanup();
02197 if (result != DDS::RETCODE_OK) ret = result;
02198 }
02199 recorders_.clear();
02200 }
02201
02202 {
02203 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02204 tao_mon,
02205 this->replayers_protector_,
02206 DDS::RETCODE_ERROR);
02207
02208 ReplayerSet::iterator it = replayers_.begin();
02209 for (; it != replayers_.end(); ++it ){
02210 ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
02211 DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02212 if (impl) result = impl->cleanup();
02213 if (result != DDS::RETCODE_OK) ret = result;
02214
02215 }
02216
02217 replayers_.clear();
02218 }
02219
02220 shutdown_mutex_.acquire();
02221 shutdown_result_ = ret;
02222 shutdown_complete_ = true;
02223 shutdown_condition_.signal();
02224 shutdown_mutex_.release();
02225
02226 return 0;
02227 }
02228
02229 }
02230 }