00001
00002
00003
00004
00005
00006
00007 #include "InfoRepoDiscovery.h"
00008
00009 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteC.h"
00010 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteImpl.h"
00011 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteC.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteImpl.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/FailoverListener.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/RepoIdBuilder.h"
00016 #include "dds/DCPS/ConfigUtils.h"
00017
00018 #include "tao/ORB_Core.h"
00019 #include "ace/Reactor.h"
00020
00021 #if !defined (DDS_HAS_MINIMUM_BIT)
00022 #include "dds/DCPS/DomainParticipantImpl.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/Marked_Default_Qos.h"
00025
00026 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028
00029 #include "dds/DCPS/transport/tcp/TcpInst.h"
00030 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00031 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00032 #endif
00033
00034 namespace {
00035
00036
00037
00038
00039
00040 template <class T_impl, class T_ptr>
00041 T_impl* remote_reference_to_servant(T_ptr p, CORBA::ORB_ptr orb)
00042 {
00043 if (CORBA::is_nil(p)) {
00044 return 0;
00045 }
00046
00047 CORBA::Object_var obj =
00048 orb->resolve_initial_references("RootPOA");
00049 PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
00050
00051 T_impl* the_servant =
00052 dynamic_cast<T_impl*>(poa->reference_to_servant(p));
00053
00054
00055
00056 PortableServer::ServantBase_var servant = the_servant;
00057
00058 return the_servant;
00059 }
00060
00061
00062
00063
00064 template <class T>
00065 typename T::_stub_ptr_type servant_to_remote_reference(T* servant, CORBA::ORB_ptr orb)
00066 {
00067 CORBA::Object_var obj =
00068 orb->resolve_initial_references("RootPOA");
00069 PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
00070
00071 PortableServer::ObjectId_var oid = poa->activate_object(servant);
00072
00073 obj = poa->id_to_reference(oid.in());
00074
00075 typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.in());
00076 return the_obj;
00077 }
00078
00079 template <class T>
00080 void deactivate_remote_object(T obj, CORBA::ORB_ptr orb)
00081 {
00082 CORBA::Object_var poa_obj =
00083 orb->resolve_initial_references("RootPOA");
00084 PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
00085 PortableServer::ObjectId_var oid =
00086 poa->reference_to_id(obj);
00087 poa->deactivate_object(oid.in());
00088 }
00089
00090 }
00091
00092 namespace OpenDDS {
00093 namespace DCPS {
00094
00095 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00096 const std::string& ior)
00097 : Discovery(key),
00098 ior_(ior),
00099 bit_transport_port_(0),
00100 use_local_bit_config_(false),
00101 failoverListener_(0),
00102 orb_from_user_(false)
00103 {
00104 }
00105
00106 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00107 const DCPSInfo_var& info)
00108 : Discovery(key),
00109 info_(info),
00110 bit_transport_port_(0),
00111 use_local_bit_config_(false),
00112 failoverListener_(0),
00113 orb_from_user_(false)
00114 {
00115 }
00116
00117 InfoRepoDiscovery::~InfoRepoDiscovery()
00118 {
00119 delete this->failoverListener_;
00120 if (!orb_from_user_ && orb_runner_) {
00121 if (0 == --orb_runner_->use_count_) {
00122 orb_runner_->shutdown();
00123 delete orb_runner_;
00124 orb_runner_ = 0;
00125 }
00126 }
00127 }
00128
00129 bool
00130 InfoRepoDiscovery::set_ORB(CORBA::ORB_ptr orb)
00131 {
00132 if (orb_.in() || !orb) {
00133 return false;
00134 }
00135
00136 orb_ = CORBA::ORB::_duplicate(orb);
00137 orb_from_user_ = true;
00138 return true;
00139 }
00140
00141 namespace
00142 {
00143 DCPSInfo_ptr get_repo(const char* ior, CORBA::ORB_ptr orb)
00144 {
00145 CORBA::Object_var o;
00146 try {
00147 o = orb->string_to_object(ior);
00148 } catch (CORBA::INV_OBJREF&) {
00149
00150
00151 std::string second_try("corbaloc:iiop:");
00152 second_try += ior;
00153 second_try += "/DCPSInfoRepo";
00154
00155 o = orb->string_to_object(second_try.c_str());
00156 }
00157
00158 return DCPSInfo::_narrow(o.in());
00159 }
00160 }
00161
00162 DCPSInfo_var
00163 InfoRepoDiscovery::get_dcps_info()
00164 {
00165 if (CORBA::is_nil(this->info_.in())) {
00166
00167 if (!orb_) {
00168 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_orb_runner_, 0);
00169 if (!orb_runner_) {
00170 orb_runner_ = new OrbRunner;
00171 ACE_ARGV* argv = TheServiceParticipant->ORB_argv();
00172 int argc = argv->argc();
00173 orb_runner_->orb_ =
00174 CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME);
00175 orb_runner_->use_count_ = 1;
00176 orb_runner_->activate();
00177
00178 CORBA::Object_var rp =
00179 orb_runner_->orb_->resolve_initial_references("RootPOA");
00180 PortableServer::POA_var poa = PortableServer::POA::_narrow(rp);
00181 PortableServer::POAManager_var poa_manager = poa->the_POAManager();
00182 poa_manager->activate();
00183 } else {
00184 ++orb_runner_->use_count_;
00185 }
00186 orb_ = orb_runner_->orb_;
00187 }
00188
00189 try {
00190 this->info_ = get_repo(this->ior_.c_str(), orb_);
00191
00192 if (CORBA::is_nil(this->info_.in())) {
00193 ACE_ERROR((LM_ERROR,
00194 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
00195 ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"),
00196 this->ior_.c_str(),
00197 this->key().c_str()));
00198 return DCPSInfo::_nil();
00199 }
00200
00201 } catch (const CORBA::Exception& ex) {
00202 ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - ");
00203 return DCPSInfo::_nil();
00204 }
00205 }
00206
00207 return this->info_;
00208 }
00209
00210 std::string
00211 InfoRepoDiscovery::get_stringified_dcps_info_ior()
00212 {
00213 return this->ior_;
00214 }
00215
00216 TransportConfig_rch
00217 InfoRepoDiscovery::bit_config()
00218 {
00219 #if !defined (DDS_HAS_MINIMUM_BIT)
00220 if (bit_config_.is_nil()) {
00221 const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX +
00222 std::string("_BITTransportConfig_") + key();
00223 bit_config_ = TransportRegistry::instance()->create_config(cfg_name);
00224
00225 const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX +
00226 std::string("_BITTCPTransportInst_") + key();
00227 TransportInst_rch inst =
00228 TransportRegistry::instance()->create_inst(inst_name, "tcp");
00229 bit_config_->instances_.push_back(inst);
00230
00231 if (!use_local_bit_config_) {
00232 bit_transport_ip_ = TheServiceParticipant->bit_transport_ip();
00233 bit_transport_port_ = TheServiceParticipant->bit_transport_port();
00234 }
00235
00236
00237 TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst);
00238
00239 tcp_inst->datalink_release_delay_ = 0;
00240 tcp_inst->local_address(bit_transport_port_,
00241 bit_transport_ip_.c_str());
00242 }
00243 return bit_config_;
00244 #else
00245 return 0;
00246 #endif
00247 }
00248
00249 DDS::Subscriber_ptr
00250 InfoRepoDiscovery::init_bit(DomainParticipantImpl* participant)
00251 {
00252 #if defined (DDS_HAS_MINIMUM_BIT)
00253 ACE_UNUSED_ARG(participant);
00254 return 0;
00255 #else
00256 if (!TheServiceParticipant->get_BIT()) {
00257 return 0;
00258 }
00259
00260 if (create_bit_topics(participant) != DDS::RETCODE_OK) {
00261 return 0;
00262 }
00263
00264 DDS::Subscriber_var bit_subscriber =
00265 participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
00266 DDS::SubscriberListener::_nil(),
00267 DEFAULT_STATUS_MASK);
00268 try {
00269 TransportConfig_rch config = bit_config();
00270 TransportRegistry::instance()->bind_config(config, bit_subscriber);
00271
00272 } catch (const Transport::Exception&) {
00273 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00274 "exception during transport initialization\n"));
00275 return 0;
00276 }
00277
00278
00279 try {
00280 DDS::DataReaderQos participantReaderQos;
00281 bit_subscriber->get_default_datareader_qos(participantReaderQos);
00282 participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00283
00284 if (participant->federated()) {
00285 participantReaderQos.liveliness.lease_duration.nanosec = 0;
00286 participantReaderQos.liveliness.lease_duration.sec =
00287 TheServiceParticipant->federation_liveliness();
00288 }
00289
00290 DDS::TopicDescription_var bit_part_topic =
00291 participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
00292
00293 DDS::DataReader_var dr =
00294 bit_subscriber->create_datareader(bit_part_topic,
00295 participantReaderQos,
00296 DDS::DataReaderListener::_nil(),
00297 DEFAULT_STATUS_MASK);
00298
00299 if (participant->federated()) {
00300 DDS::ParticipantBuiltinTopicDataDataReader* pbit_dr =
00301 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
00302
00303
00304 failoverListener_ = new FailoverListener(key());
00305 pbit_dr->set_listener(failoverListener_, DEFAULT_STATUS_MASK);
00306 }
00307
00308 DDS::DataReaderQos dr_qos;
00309 bit_subscriber->get_default_datareader_qos(dr_qos);
00310 dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00311
00312 DDS::TopicDescription_var bit_topic_topic =
00313 participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
00314
00315 dr = bit_subscriber->create_datareader(bit_topic_topic,
00316 dr_qos,
00317 DDS::DataReaderListener::_nil(),
00318 DEFAULT_STATUS_MASK);
00319
00320 DDS::TopicDescription_var bit_pub_topic =
00321 participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
00322
00323 dr = bit_subscriber->create_datareader(bit_pub_topic,
00324 dr_qos,
00325 DDS::DataReaderListener::_nil(),
00326 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00327
00328 DDS::TopicDescription_var bit_sub_topic =
00329 participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
00330
00331 dr = bit_subscriber->create_datareader(bit_sub_topic,
00332 dr_qos,
00333 DDS::DataReaderListener::_nil(),
00334 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00335
00336 } catch (const CORBA::Exception&) {
00337 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00338 "exception during DataReader initialization\n"));
00339 return 0;
00340 }
00341 return bit_subscriber._retn();
00342 #endif
00343 }
00344
00345 void
00346 InfoRepoDiscovery::fini_bit(DCPS::DomainParticipantImpl* )
00347 {
00348
00349 }
00350
00351 RepoId
00352 InfoRepoDiscovery::bit_key_to_repo_id(DomainParticipantImpl* ,
00353 const char* ,
00354 const DDS::BuiltinTopicKey_t& key) const
00355 {
00356 RepoId id = RepoIdBuilder::create();
00357 RepoIdBuilder builder(id);
00358 builder.federationId(key.value[0]);
00359 builder.participantId(key.value[1]);
00360 builder.entityId(key.value[2]);
00361 return id;
00362 }
00363
00364 bool
00365 InfoRepoDiscovery::active()
00366 {
00367 try {
00368
00369 get_dcps_info()->_is_a("Not_An_IDL_Type");
00370 return true;
00371 } catch (const CORBA::Exception&) {
00372 return false;
00373 }
00374 }
00375
00376
00377
00378 bool
00379 InfoRepoDiscovery::attach_participant(DDS::DomainId_t domainId,
00380 const RepoId& participantId)
00381 {
00382 try {
00383 return get_dcps_info()->attach_participant(domainId, participantId);
00384 } catch (const CORBA::Exception& ex) {
00385 ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: ");
00386 return false;
00387 }
00388 }
00389
00390 DCPS::AddDomainStatus
00391 InfoRepoDiscovery::add_domain_participant(DDS::DomainId_t domainId,
00392 const DDS::DomainParticipantQos& qos)
00393 {
00394 try {
00395 const DCPSInfo_var info = get_dcps_info();
00396 if (!CORBA::is_nil(info)) {
00397 return info->add_domain_participant(domainId, qos);
00398 }
00399 } catch (const CORBA::Exception& ex) {
00400 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: ");
00401 }
00402 const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false };
00403 return ads;
00404 }
00405
00406 bool
00407 InfoRepoDiscovery::remove_domain_participant(DDS::DomainId_t domainId,
00408 const RepoId& participantId)
00409 {
00410 try {
00411 get_dcps_info()->remove_domain_participant(domainId, participantId);
00412 return true;
00413 } catch (const CORBA::Exception& ex) {
00414 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: ");
00415 return false;
00416 }
00417 }
00418
00419 bool
00420 InfoRepoDiscovery::ignore_domain_participant(DDS::DomainId_t domainId,
00421 const RepoId& myParticipantId,
00422 const RepoId& ignoreId)
00423 {
00424 try {
00425 get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
00426 return true;
00427 } catch (const CORBA::Exception& ex) {
00428 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: ");
00429 return false;
00430 }
00431 }
00432
00433 bool
00434 InfoRepoDiscovery::update_domain_participant_qos(DDS::DomainId_t domainId,
00435 const RepoId& participant,
00436 const DDS::DomainParticipantQos& qos)
00437 {
00438 try {
00439 return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
00440 } catch (const CORBA::Exception& ex) {
00441 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: ");
00442 return false;
00443 }
00444 }
00445
00446
00447
00448 DCPS::TopicStatus
00449 InfoRepoDiscovery::assert_topic(DCPS::RepoId_out topicId, DDS::DomainId_t domainId,
00450 const RepoId& participantId, const char* topicName,
00451 const char* dataTypeName, const DDS::TopicQos& qos,
00452 bool hasDcpsKey)
00453 {
00454 try {
00455 return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
00456 dataTypeName, qos, hasDcpsKey);
00457 } catch (const CORBA::Exception& ex) {
00458 ex._tao_print_exception("ERROR: InfoRepoDiscovery::assert_topic: ");
00459 return DCPS::INTERNAL_ERROR;
00460 }
00461 }
00462
00463 DCPS::TopicStatus
00464 InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId, const char* topicName,
00465 CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
00466 DCPS::RepoId_out topicId)
00467 {
00468 try {
00469 return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
00470 } catch (const CORBA::Exception& ex) {
00471 ex._tao_print_exception("ERROR: InfoRepoDiscovery::find_topic: ");
00472 return DCPS::INTERNAL_ERROR;
00473 }
00474 }
00475
00476 DCPS::TopicStatus
00477 InfoRepoDiscovery::remove_topic(DDS::DomainId_t domainId, const RepoId& participantId,
00478 const RepoId& topicId)
00479 {
00480 try {
00481 return get_dcps_info()->remove_topic(domainId, participantId, topicId);
00482 } catch (const CORBA::Exception& ex) {
00483 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: ");
00484 return DCPS::INTERNAL_ERROR;
00485 }
00486 }
00487
00488 bool
00489 InfoRepoDiscovery::ignore_topic(DDS::DomainId_t domainId, const RepoId& myParticipantId,
00490 const RepoId& ignoreId)
00491 {
00492 try {
00493 get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
00494 return true;
00495 } catch (const CORBA::Exception& ex) {
00496 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: ");
00497 return false;
00498 }
00499 }
00500
00501 bool
00502 InfoRepoDiscovery::update_topic_qos(const RepoId& topicId, DDS::DomainId_t domainId,
00503 const RepoId& participantId, const DDS::TopicQos& qos)
00504 {
00505 try {
00506 return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
00507 } catch (const CORBA::Exception& ex) {
00508 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: ");
00509 return false;
00510 }
00511 }
00512
00513
00514
00515
00516 RepoId
00517 InfoRepoDiscovery::add_publication(DDS::DomainId_t domainId,
00518 const RepoId& participantId,
00519 const RepoId& topicId,
00520 DCPS::DataWriterCallbacks* publication,
00521 const DDS::DataWriterQos& qos,
00522 const DCPS::TransportLocatorSeq& transInfo,
00523 const DDS::PublisherQos& publisherQos)
00524 {
00525 RepoId pubId;
00526
00527 try {
00528 DCPS::DataWriterRemoteImpl* writer_remote_impl = 0;
00529 ACE_NEW_RETURN(writer_remote_impl,
00530 DataWriterRemoteImpl(publication),
00531 DCPS::GUID_UNKNOWN);
00532
00533
00534 PortableServer::ServantBase_var writer_remote(writer_remote_impl);
00535
00536
00537 OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
00538 servant_to_remote_reference(writer_remote_impl, orb_);
00539
00540 pubId = get_dcps_info()->add_publication(domainId, participantId, topicId,
00541 dr_remote_obj, qos, transInfo, publisherQos);
00542
00543 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00544
00545 dataWriterMap_[pubId] = dr_remote_obj;
00546
00547 } catch (const CORBA::Exception& ex) {
00548 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: ");
00549 pubId = DCPS::GUID_UNKNOWN;
00550 }
00551
00552 return pubId;
00553 }
00554
00555 bool
00556 InfoRepoDiscovery::remove_publication(DDS::DomainId_t domainId,
00557 const RepoId& participantId,
00558 const RepoId& publicationId)
00559 {
00560 {
00561 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00562 removeDataWriterRemote(publicationId);
00563 }
00564 bool removed = false;
00565 try {
00566 get_dcps_info()->remove_publication(domainId, participantId, publicationId);
00567 removed = true;
00568 } catch (const CORBA::Exception& ex) {
00569 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: ");
00570 }
00571
00572 return removed;
00573 }
00574
00575 bool
00576 InfoRepoDiscovery::ignore_publication(DDS::DomainId_t domainId,
00577 const RepoId& participantId,
00578 const RepoId& ignoreId)
00579 {
00580 try {
00581 get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
00582 return true;
00583 } catch (const CORBA::Exception& ex) {
00584 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: ");
00585 return false;
00586 }
00587 }
00588
00589 bool
00590 InfoRepoDiscovery::update_publication_qos(DDS::DomainId_t domainId,
00591 const RepoId& participantId,
00592 const RepoId& dwId,
00593 const DDS::DataWriterQos& qos,
00594 const DDS::PublisherQos& publisherQos)
00595 {
00596 try {
00597 return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
00598 qos, publisherQos);
00599 } catch (const CORBA::Exception& ex) {
00600 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: ");
00601 return false;
00602 }
00603 }
00604
00605
00606
00607
00608 RepoId
00609 InfoRepoDiscovery::add_subscription(DDS::DomainId_t domainId,
00610 const RepoId& participantId,
00611 const RepoId& topicId,
00612 DCPS::DataReaderCallbacks* subscription,
00613 const DDS::DataReaderQos& qos,
00614 const DCPS::TransportLocatorSeq& transInfo,
00615 const DDS::SubscriberQos& subscriberQos,
00616 const char* filterClassName,
00617 const char* filterExpr,
00618 const DDS::StringSeq& params)
00619 {
00620 RepoId subId;
00621
00622 try {
00623 DCPS::DataReaderRemoteImpl* reader_remote_impl = 0;
00624 ACE_NEW_RETURN(reader_remote_impl,
00625 DataReaderRemoteImpl(subscription),
00626 DCPS::GUID_UNKNOWN);
00627
00628
00629 PortableServer::ServantBase_var reader_remote(reader_remote_impl);
00630
00631
00632 OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
00633 servant_to_remote_reference(reader_remote_impl, orb_);
00634
00635 subId = get_dcps_info()->add_subscription(domainId, participantId, topicId,
00636 dr_remote_obj, qos, transInfo, subscriberQos,
00637 filterClassName, filterExpr, params);
00638
00639 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00640
00641 dataReaderMap_[subId] = dr_remote_obj;
00642
00643 } catch (const CORBA::Exception& ex) {
00644 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: ");
00645 subId = DCPS::GUID_UNKNOWN;
00646 }
00647 return subId;
00648 }
00649
00650 bool
00651 InfoRepoDiscovery::remove_subscription(DDS::DomainId_t domainId,
00652 const RepoId& participantId,
00653 const RepoId& subscriptionId)
00654 {
00655 {
00656 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00657 removeDataReaderRemote(subscriptionId);
00658 }
00659 bool removed = false;
00660 try {
00661 get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
00662 removed = true;
00663 } catch (const CORBA::Exception& ex) {
00664 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: ");
00665 }
00666
00667 return removed;
00668 }
00669
00670 bool
00671 InfoRepoDiscovery::ignore_subscription(DDS::DomainId_t domainId,
00672 const RepoId& participantId,
00673 const RepoId& ignoreId)
00674 {
00675 try {
00676 get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
00677 return true;
00678 } catch (const CORBA::Exception& ex) {
00679 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: ");
00680 return false;
00681 }
00682 }
00683
00684 bool
00685 InfoRepoDiscovery::update_subscription_qos(DDS::DomainId_t domainId,
00686 const RepoId& participantId,
00687 const RepoId& drId,
00688 const DDS::DataReaderQos& qos,
00689 const DDS::SubscriberQos& subQos)
00690 {
00691 try {
00692 return get_dcps_info()->update_subscription_qos(domainId, participantId,
00693 drId, qos, subQos);
00694 } catch (const CORBA::Exception& ex) {
00695 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: ");
00696 return false;
00697 }
00698 }
00699
00700 bool
00701 InfoRepoDiscovery::update_subscription_params(DDS::DomainId_t domainId,
00702 const RepoId& participantId,
00703 const RepoId& subId,
00704 const DDS::StringSeq& params)
00705
00706 {
00707 try {
00708 return get_dcps_info()->update_subscription_params(domainId, participantId,
00709 subId, params);
00710 } catch (const CORBA::Exception& ex) {
00711 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: ");
00712 return false;
00713 }
00714 }
00715
00716
00717
00718
00719 void
00720 InfoRepoDiscovery::association_complete(DDS::DomainId_t domainId,
00721 const RepoId& participantId,
00722 const RepoId& localId, const RepoId& remoteId)
00723 {
00724 try {
00725 get_dcps_info()->association_complete(domainId, participantId, localId, remoteId);
00726 } catch (const CORBA::Exception& ex) {
00727 ex._tao_print_exception("ERROR: InfoRepoDiscovery::association_complete: ");
00728 }
00729 }
00730
00731 void
00732 InfoRepoDiscovery::removeDataReaderRemote(const RepoId& subscriptionId)
00733 {
00734 DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId);
00735 if (drr == dataReaderMap_.end()) {
00736 ACE_ERROR((LM_ERROR,
00737 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
00738 ACE_TEXT(" could not find DataReader for subscriptionId.\n")));
00739 return;
00740 }
00741
00742 DataReaderRemoteImpl* impl =
00743 remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_);
00744 impl->detach_parent();
00745 deactivate_remote_object(drr->second.in(), orb_);
00746
00747 dataReaderMap_.erase(drr);
00748 }
00749
00750 void
00751 InfoRepoDiscovery::removeDataWriterRemote(const RepoId& publicationId)
00752 {
00753 DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId);
00754 if (dwr == dataWriterMap_.end()) {
00755 ACE_ERROR((LM_ERROR,
00756 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
00757 ACE_TEXT(" could not find DataWriter for publicationId.\n")));
00758 return;
00759 }
00760
00761 DataWriterRemoteImpl* impl =
00762 remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_);
00763 impl->detach_parent();
00764 deactivate_remote_object(dwr->second.in(), orb_);
00765
00766 dataWriterMap_.erase(dwr);
00767 }
00768
00769 namespace {
00770 const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00771 }
00772
00773 int
00774 InfoRepoDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00775 {
00776 const ACE_Configuration_Section_Key& root = cf.root_section();
00777 ACE_Configuration_Section_Key repo_sect;
00778
00779 if (cf.open_section(root, REPO_SECTION_NAME, 0, repo_sect) != 0) {
00780 if (DCPS_debug_level > 0) {
00781
00782
00783 ACE_DEBUG((LM_NOTICE,
00784 ACE_TEXT("(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
00785 ACE_TEXT("failed to open [%s] section.\n"),
00786 REPO_SECTION_NAME));
00787 }
00788
00789 return 0;
00790
00791 } else {
00792
00793 ValueMap vm;
00794 if (pullValues(cf, repo_sect, vm) > 0) {
00795
00796 ACE_ERROR_RETURN((LM_ERROR,
00797 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00798 ACE_TEXT("repo sections must have a subsection name\n")),
00799 -1);
00800 }
00801
00802 KeyList keys;
00803 if (processSections( cf, repo_sect, keys ) != 0) {
00804 ACE_ERROR_RETURN((LM_ERROR,
00805 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00806 ACE_TEXT("too many nesting layers in the [repo] section.\n")),
00807 -1);
00808 }
00809
00810
00811 for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
00812 std::string repo_name = (*it).first;
00813
00814 ValueMap values;
00815 pullValues( cf, (*it).second, values );
00816 Discovery::RepoKey repoKey = Discovery::DEFAULT_REPO;
00817 bool repoKeySpecified = false, bitIpSpecified = false,
00818 bitPortSpecified = false;
00819 std::string repoIor;
00820 int bitPort = 0;
00821 std::string bitIp;
00822 for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
00823 std::string name = (*it).first;
00824 if (name == "RepositoryKey") {
00825 repoKey = (*it).second;
00826 repoKeySpecified = true;
00827 if (DCPS_debug_level > 0) {
00828 ACE_DEBUG((LM_DEBUG,
00829 ACE_TEXT("(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
00830 repo_name.c_str(), repoKey.c_str()));
00831 }
00832
00833 } else if (name == "RepositoryIor") {
00834 repoIor = (*it).second;
00835
00836 if (DCPS_debug_level > 0) {
00837 ACE_DEBUG((LM_DEBUG,
00838 ACE_TEXT("(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
00839 repo_name.c_str(), repoIor.c_str()));
00840 }
00841 } else if (name == "DCPSBitTransportIPAddress") {
00842 bitIp = (*it).second;
00843 bitIpSpecified = true;
00844 if (DCPS_debug_level > 0) {
00845 ACE_DEBUG((LM_DEBUG,
00846 ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
00847 repo_name.c_str(), bitIp.c_str()));
00848 }
00849 } else if (name == "DCPSBitTransportPort") {
00850 std::string value = (*it).second;
00851 bitPort = ACE_OS::atoi(value.c_str());
00852 bitPortSpecified = true;
00853 if (convertToInteger(value, bitPort)) {
00854 } else {
00855 ACE_ERROR_RETURN((LM_ERROR,
00856 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00857 ACE_TEXT("Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
00858 value.c_str(), repo_name.c_str()),
00859 -1);
00860 }
00861 if (DCPS_debug_level > 0) {
00862 ACE_DEBUG((LM_DEBUG,
00863 ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
00864 repo_name.c_str(), bitPort));
00865 }
00866 } else {
00867 ACE_ERROR_RETURN((LM_ERROR,
00868 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00869 ACE_TEXT("Unexpected entry (%C) in [repository/%C] section.\n"),
00870 name.c_str(), repo_name.c_str()),
00871 -1);
00872 }
00873 }
00874
00875 if (values.find("RepositoryIor") == values.end()) {
00876 ACE_ERROR_RETURN((LM_ERROR,
00877 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00878 ACE_TEXT("Repository section [repository/%C] section is missing RepositoryIor value.\n"),
00879 repo_name.c_str()),
00880 -1);
00881 }
00882
00883 if (!repoKeySpecified) {
00884
00885
00886 repoKey = repo_name;
00887 }
00888 InfoRepoDiscovery_rch discovery =
00889 new InfoRepoDiscovery(repoKey, repoIor.c_str());
00890 if (bitPortSpecified) discovery->bit_transport_port(bitPort);
00891 if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
00892 TheServiceParticipant->add_discovery(
00893 DCPS::static_rchandle_cast<Discovery>(discovery));
00894 }
00895 }
00896
00897 return 0;
00898 }
00899
00900 void
00901 InfoRepoDiscovery::OrbRunner::shutdown()
00902 {
00903 orb_->shutdown();
00904 wait();
00905 }
00906
00907 InfoRepoDiscovery::OrbRunner* InfoRepoDiscovery::orb_runner_;
00908 ACE_Thread_Mutex InfoRepoDiscovery::mtx_orb_runner_;
00909
00910 int
00911 InfoRepoDiscovery::OrbRunner::svc()
00912 {
00913
00914 bool done = false;
00915
00916
00917
00918
00919 sigset_t set;
00920 ACE_OS::sigfillset(&set);
00921 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00922
00923 while (!done) {
00924 try {
00925 if (orb_->orb_core()->has_shutdown() == false) {
00926 orb_->run();
00927 }
00928
00929 done = true;
00930
00931 } catch (const CORBA::SystemException& sysex) {
00932 sysex._tao_print_exception(
00933 "ERROR: InfoRepoDiscovery::OrbRunner");
00934
00935 } catch (const CORBA::UserException& userex) {
00936 userex._tao_print_exception(
00937 "ERROR: InfoRepoDiscovery::OrbRunner");
00938
00939 } catch (const CORBA::Exception& ex) {
00940 ex._tao_print_exception(
00941 "ERROR: InfoRepoDiscovery::OrbRunner");
00942 }
00943
00944 if (orb_->orb_core()->has_shutdown()) {
00945 done = true;
00946
00947 } else {
00948 orb_->orb_core()->reactor()->reset_reactor_event_loop();
00949 }
00950 }
00951
00952 return 0;
00953 }
00954
00955
00956 InfoRepoDiscovery::StaticInitializer::StaticInitializer()
00957 {
00958 TheServiceParticipant->register_discovery_type("repository", new Config);
00959 }
00960
00961 int
00962 IRDiscoveryLoader::init(int, ACE_TCHAR*[])
00963 {
00964
00965
00966 return 0;
00967 }
00968
00969 ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader);
00970 ACE_STATIC_SVC_DEFINE(
00971 IRDiscoveryLoader,
00972 ACE_TEXT("OpenDDS_InfoRepoDiscovery"),
00973 ACE_SVC_OBJ_T,
00974 &ACE_SVC_NAME(IRDiscoveryLoader),
00975 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00976 0)
00977
00978 }
00979 }