00001
00002
00003
00004
00005
00006
00007 #include "InfoRepoDiscovery.h"
00008
00009 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteC.h"
00010 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteImpl.h"
00011 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteC.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteImpl.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/FailoverListener.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/RepoIdBuilder.h"
00016 #include "dds/DCPS/ConfigUtils.h"
00017
00018 #include "tao/ORB_Core.h"
00019 #include "tao/BiDir_GIOP/BiDirGIOP.h"
00020 #include "ace/Reactor.h"
00021
00022 #if !defined (DDS_HAS_MINIMUM_BIT)
00023 #include "dds/DCPS/DomainParticipantImpl.h"
00024 #include "dds/DCPS/BuiltInTopicUtils.h"
00025 #include "dds/DCPS/Marked_Default_Qos.h"
00026
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029
00030 #include "dds/DCPS/transport/tcp/TcpInst.h"
00031 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00032 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00033 #endif
00034
00035 namespace {
00036 const char ROOT_POA[] = "RootPOA";
00037 const char BIDIR_POA[] = "BiDirPOA";
00038
00039 struct DestroyPolicy {
00040 explicit DestroyPolicy(const CORBA::Policy_ptr& p)
00041 : p_(CORBA::Policy::_duplicate(p)) {}
00042
00043 ~DestroyPolicy() { p_->destroy(); }
00044
00045 CORBA::Policy_var p_;
00046 };
00047
00048 PortableServer::POA_ptr get_POA(CORBA::ORB_ptr orb)
00049 {
00050 CORBA::Object_var obj =
00051 orb->resolve_initial_references(ROOT_POA);
00052 PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj.in());
00053
00054 if (TheServiceParticipant->use_bidir_giop()) {
00055 while (true) {
00056 try {
00057 return root_poa->find_POA(BIDIR_POA, false );
00058 } catch (const PortableServer::POA::AdapterNonExistent&) {
00059
00060 }
00061 CORBA::PolicyList policies(1);
00062 policies.length(1);
00063 CORBA::Any policy;
00064 policy <<= BiDirPolicy::BOTH;
00065 policies[0] =
00066 orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
00067 DestroyPolicy destroy(policies[0]);
00068 PortableServer::POAManager_var manager = root_poa->the_POAManager();
00069 try {
00070 return root_poa->create_POA(BIDIR_POA, manager, policies);
00071 } catch (const PortableServer::POA::AdapterAlreadyExists&) {
00072
00073 }
00074 }
00075 }
00076
00077 return root_poa._retn();
00078 }
00079
00080
00081
00082
00083
00084 template <class T_impl, class T_ptr>
00085 T_impl* remote_reference_to_servant(T_ptr p, CORBA::ORB_ptr orb)
00086 {
00087 if (CORBA::is_nil(p)) {
00088 return 0;
00089 }
00090
00091 PortableServer::POA_var poa = get_POA(orb);
00092
00093 T_impl* the_servant =
00094 dynamic_cast<T_impl*>(poa->reference_to_servant(p));
00095
00096
00097
00098 PortableServer::ServantBase_var servant = the_servant;
00099
00100 return the_servant;
00101 }
00102
00103
00104
00105
00106 template <class T>
00107 typename T::_stub_ptr_type servant_to_remote_reference(T* servant, CORBA::ORB_ptr orb)
00108 {
00109 PortableServer::POA_var poa = get_POA(orb);
00110 PortableServer::ObjectId_var oid = poa->activate_object(servant);
00111 CORBA::Object_var obj = poa->id_to_reference(oid.in());
00112
00113 typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.in());
00114 return the_obj;
00115 }
00116
00117 template <class T>
00118 void deactivate_remote_object(T obj, CORBA::ORB_ptr orb)
00119 {
00120 PortableServer::POA_var poa = get_POA(orb);
00121 PortableServer::ObjectId_var oid =
00122 poa->reference_to_id(obj);
00123 poa->deactivate_object(oid.in());
00124 }
00125
00126 }
00127
00128 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00129
00130 namespace OpenDDS {
00131 namespace DCPS {
00132
00133 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00134 const std::string& ior)
00135 : Discovery(key),
00136 ior_(ior),
00137 bit_transport_port_(0),
00138 use_local_bit_config_(false),
00139 orb_from_user_(false)
00140 {
00141 }
00142
00143 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00144 const DCPSInfo_var& info)
00145 : Discovery(key),
00146 info_(info),
00147 bit_transport_port_(0),
00148 use_local_bit_config_(false),
00149 orb_from_user_(false)
00150 {
00151 }
00152
00153 InfoRepoDiscovery::~InfoRepoDiscovery()
00154 {
00155 if (!orb_from_user_ && orb_runner_) {
00156 if (0 == --orb_runner_->use_count_) {
00157 try {
00158 orb_runner_->shutdown();
00159 }
00160 catch (const CORBA::Exception& ex) {
00161 ACE_ERROR((LM_ERROR,
00162 ACE_TEXT("ERROR: InfoRepoDiscovery::~InfoRepoDiscovery - ")
00163 ACE_TEXT("Exception caught during ORB shutdown: %C.\n"),
00164 ex._info().c_str()));
00165 }
00166
00167 delete orb_runner_;
00168 orb_runner_ = 0;
00169 }
00170 }
00171 }
00172
00173 bool
00174 InfoRepoDiscovery::set_ORB(CORBA::ORB_ptr orb)
00175 {
00176 if (!CORBA::is_nil (orb_.in()) || CORBA::is_nil (orb)) {
00177 return false;
00178 }
00179
00180 orb_ = CORBA::ORB::_duplicate(orb);
00181 orb_from_user_ = true;
00182 return true;
00183 }
00184
00185 namespace
00186 {
00187 DCPSInfo_ptr get_repo(const char* ior, CORBA::ORB_ptr orb)
00188 {
00189 CORBA::Object_var o;
00190 try {
00191 o = orb->string_to_object(ior);
00192 } catch (CORBA::INV_OBJREF&) {
00193
00194
00195 std::string second_try("corbaloc:iiop:");
00196 second_try += ior;
00197 second_try += "/DCPSInfoRepo";
00198
00199 o = orb->string_to_object(second_try.c_str());
00200 }
00201
00202 return DCPSInfo::_narrow(o.in());
00203 }
00204 }
00205
00206 DCPSInfo_var
00207 InfoRepoDiscovery::get_dcps_info()
00208 {
00209 if (CORBA::is_nil(this->info_.in())) {
00210
00211 if (!orb_) {
00212 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_orb_runner_, 0);
00213 if (!orb_runner_) {
00214 orb_runner_ = new OrbRunner;
00215 ACE_ARGV* argv = TheServiceParticipant->ORB_argv();
00216 int argc = argv->argc();
00217 orb_runner_->orb_ =
00218 CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME);
00219 orb_runner_->use_count_ = 1;
00220 orb_runner_->activate();
00221
00222 CORBA::Object_var rp =
00223 orb_runner_->orb_->resolve_initial_references(ROOT_POA);
00224 PortableServer::POA_var poa = PortableServer::POA::_narrow(rp);
00225 PortableServer::POAManager_var poa_manager = poa->the_POAManager();
00226 poa_manager->activate();
00227 } else {
00228 ++orb_runner_->use_count_;
00229 }
00230 orb_ = orb_runner_->orb_;
00231 }
00232
00233 try {
00234 this->info_ = get_repo(this->ior_.c_str(), orb_);
00235
00236 if (CORBA::is_nil(this->info_.in())) {
00237 ACE_ERROR((LM_ERROR,
00238 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
00239 ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"),
00240 this->ior_.c_str(),
00241 this->key().c_str()));
00242 return DCPSInfo::_nil();
00243 }
00244
00245 } catch (const CORBA::Exception& ex) {
00246 ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - ");
00247 return DCPSInfo::_nil();
00248 }
00249 }
00250
00251 return this->info_;
00252 }
00253
00254 std::string
00255 InfoRepoDiscovery::get_stringified_dcps_info_ior()
00256 {
00257 return this->ior_;
00258 }
00259
00260 TransportConfig_rch
00261 InfoRepoDiscovery::bit_config()
00262 {
00263 #if !defined (DDS_HAS_MINIMUM_BIT)
00264 if (bit_config_.is_nil()) {
00265 const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX +
00266 std::string("_BITTransportConfig_") + key();
00267 bit_config_ = TransportRegistry::instance()->create_config(cfg_name);
00268
00269 const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX +
00270 std::string("_BITTCPTransportInst_") + key();
00271 TransportInst_rch inst =
00272 TransportRegistry::instance()->create_inst(inst_name, "tcp");
00273 bit_config_->instances_.push_back(inst);
00274
00275 if (!use_local_bit_config_) {
00276 bit_transport_ip_ = TheServiceParticipant->bit_transport_ip();
00277 bit_transport_port_ = TheServiceParticipant->bit_transport_port();
00278 }
00279
00280
00281 TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst);
00282
00283 tcp_inst->datalink_release_delay_ = 0;
00284 if (!bit_transport_ip_.empty()) {
00285 tcp_inst->local_address(bit_transport_port_,
00286 bit_transport_ip_.c_str());
00287 } else {
00288 tcp_inst->local_address_set_port(bit_transport_port_);
00289 }
00290 }
00291 return bit_config_;
00292 #else
00293 return TransportConfig_rch();
00294 #endif
00295 }
00296
00297 DDS::Subscriber_ptr
00298 InfoRepoDiscovery::init_bit(DomainParticipantImpl* participant)
00299 {
00300 #if defined (DDS_HAS_MINIMUM_BIT)
00301 ACE_UNUSED_ARG(participant);
00302 return 0;
00303 #else
00304 if (!TheServiceParticipant->get_BIT()) {
00305 return 0;
00306 }
00307
00308 if (create_bit_topics(participant) != DDS::RETCODE_OK) {
00309 return 0;
00310 }
00311
00312 DDS::Subscriber_var bit_subscriber =
00313 participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
00314 DDS::SubscriberListener::_nil(),
00315 DEFAULT_STATUS_MASK);
00316 try {
00317 TransportConfig_rch config = bit_config();
00318 TransportRegistry::instance()->bind_config(config, bit_subscriber);
00319
00320 } catch (const Transport::Exception&) {
00321 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00322 "exception during transport initialization\n"));
00323 return 0;
00324 }
00325
00326
00327 try {
00328 DDS::DataReaderQos participantReaderQos;
00329 bit_subscriber->get_default_datareader_qos(participantReaderQos);
00330 participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00331
00332 if (participant->federated()) {
00333 participantReaderQos.liveliness.lease_duration.nanosec = 0;
00334 participantReaderQos.liveliness.lease_duration.sec =
00335 TheServiceParticipant->federation_liveliness();
00336 }
00337
00338 DDS::TopicDescription_var bit_part_topic =
00339 participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
00340
00341 DDS::DataReader_var dr =
00342 bit_subscriber->create_datareader(bit_part_topic,
00343 participantReaderQos,
00344 DDS::DataReaderListener::_nil(),
00345 DEFAULT_STATUS_MASK);
00346
00347 if (participant->federated()) {
00348 DDS::ParticipantBuiltinTopicDataDataReader_var pbit_dr =
00349 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
00350
00351 DataReaderListener_var failover = new FailoverListener(key());
00352 pbit_dr->set_listener(failover, DEFAULT_STATUS_MASK);
00353 }
00354
00355 DDS::DataReaderQos dr_qos;
00356 bit_subscriber->get_default_datareader_qos(dr_qos);
00357 dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00358
00359 DDS::TopicDescription_var bit_topic_topic =
00360 participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
00361
00362 dr = bit_subscriber->create_datareader(bit_topic_topic,
00363 dr_qos,
00364 DDS::DataReaderListener::_nil(),
00365 DEFAULT_STATUS_MASK);
00366
00367 DDS::TopicDescription_var bit_pub_topic =
00368 participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
00369
00370 dr = bit_subscriber->create_datareader(bit_pub_topic,
00371 dr_qos,
00372 DDS::DataReaderListener::_nil(),
00373 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00374
00375 DDS::TopicDescription_var bit_sub_topic =
00376 participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
00377
00378 dr = bit_subscriber->create_datareader(bit_sub_topic,
00379 dr_qos,
00380 DDS::DataReaderListener::_nil(),
00381 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00382
00383 const DDS::ReturnCode_t ret = bit_subscriber->enable();
00384 if (ret != DDS::RETCODE_OK) {
00385 if (DCPS_debug_level) {
00386 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) InfoRepoDiscovery::init_bit")
00387 ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
00388 }
00389 return 0;
00390 }
00391
00392 } catch (const CORBA::Exception&) {
00393 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00394 "exception during DataReader initialization\n"));
00395 return 0;
00396 }
00397 return bit_subscriber._retn();
00398 #endif
00399 }
00400
00401 void
00402 InfoRepoDiscovery::fini_bit(DCPS::DomainParticipantImpl* )
00403 {
00404
00405 }
00406
00407 RepoId
00408 InfoRepoDiscovery::bit_key_to_repo_id(DomainParticipantImpl* ,
00409 const char* ,
00410 const DDS::BuiltinTopicKey_t& key) const
00411 {
00412 RepoId id = RepoIdBuilder::create();
00413 RepoIdBuilder builder(id);
00414 builder.federationId(key.value[0]);
00415 builder.participantId(key.value[1]);
00416 builder.entityId(key.value[2]);
00417 return id;
00418 }
00419
00420 bool
00421 InfoRepoDiscovery::active()
00422 {
00423 try {
00424
00425 get_dcps_info()->_is_a("Not_An_IDL_Type");
00426 return true;
00427 } catch (const CORBA::Exception&) {
00428 return false;
00429 }
00430 }
00431
00432
00433
00434 bool
00435 InfoRepoDiscovery::attach_participant(DDS::DomainId_t domainId,
00436 const RepoId& participantId)
00437 {
00438 try {
00439 return get_dcps_info()->attach_participant(domainId, participantId);
00440 } catch (const CORBA::Exception& ex) {
00441 ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: ");
00442 return false;
00443 }
00444 }
00445
00446 OpenDDS::DCPS::RepoId
00447 InfoRepoDiscovery::generate_participant_guid()
00448 {
00449 return GUID_UNKNOWN;
00450 }
00451
00452 DCPS::AddDomainStatus
00453 InfoRepoDiscovery::add_domain_participant(DDS::DomainId_t domainId,
00454 const DDS::DomainParticipantQos& qos)
00455 {
00456 try {
00457 const DCPSInfo_var info = get_dcps_info();
00458 if (!CORBA::is_nil(info)) {
00459 return info->add_domain_participant(domainId, qos);
00460 }
00461 } catch (const CORBA::Exception& ex) {
00462 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: ");
00463 }
00464 const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false };
00465 return ads;
00466 }
00467
00468 #if defined(OPENDDS_SECURITY)
00469 DCPS::AddDomainStatus
00470 InfoRepoDiscovery::add_domain_participant_secure(
00471 DDS::DomainId_t ,
00472 const DDS::DomainParticipantQos& ,
00473 const OpenDDS::DCPS::RepoId& ,
00474 DDS::Security::IdentityHandle ,
00475 DDS::Security::PermissionsHandle ,
00476 DDS::Security::ParticipantCryptoHandle )
00477 {
00478 const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false };
00479 return ads;
00480 }
00481 #endif
00482
00483 bool
00484 InfoRepoDiscovery::remove_domain_participant(DDS::DomainId_t domainId,
00485 const RepoId& participantId)
00486 {
00487 try {
00488 get_dcps_info()->remove_domain_participant(domainId, participantId);
00489 return true;
00490 } catch (const CORBA::Exception& ex) {
00491 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: ");
00492 return false;
00493 }
00494 }
00495
00496 bool
00497 InfoRepoDiscovery::ignore_domain_participant(DDS::DomainId_t domainId,
00498 const RepoId& myParticipantId,
00499 const RepoId& ignoreId)
00500 {
00501 try {
00502 get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
00503 return true;
00504 } catch (const CORBA::Exception& ex) {
00505 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: ");
00506 return false;
00507 }
00508 }
00509
00510 bool
00511 InfoRepoDiscovery::update_domain_participant_qos(DDS::DomainId_t domainId,
00512 const RepoId& participant,
00513 const DDS::DomainParticipantQos& qos)
00514 {
00515 try {
00516 return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
00517 } catch (const CORBA::Exception& ex) {
00518 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: ");
00519 return false;
00520 }
00521 }
00522
00523
00524
00525 DCPS::TopicStatus
00526 InfoRepoDiscovery::assert_topic(DCPS::RepoId_out topicId, DDS::DomainId_t domainId,
00527 const RepoId& participantId, const char* topicName,
00528 const char* dataTypeName, const DDS::TopicQos& qos,
00529 bool hasDcpsKey)
00530 {
00531 try {
00532 return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
00533 dataTypeName, qos, hasDcpsKey);
00534 } catch (const CORBA::Exception& ex) {
00535 ex._tao_print_exception("ERROR: InfoRepoDiscovery::assert_topic: ");
00536 return DCPS::INTERNAL_ERROR;
00537 }
00538 }
00539
00540 DCPS::TopicStatus
00541 InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId, const char* topicName,
00542 CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
00543 DCPS::RepoId_out topicId)
00544 {
00545 try {
00546 return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
00547 } catch (const CORBA::Exception& ex) {
00548 ex._tao_print_exception("ERROR: InfoRepoDiscovery::find_topic: ");
00549 return DCPS::INTERNAL_ERROR;
00550 }
00551 }
00552
00553 DCPS::TopicStatus
00554 InfoRepoDiscovery::remove_topic(DDS::DomainId_t domainId, const RepoId& participantId,
00555 const RepoId& topicId)
00556 {
00557 try {
00558 return get_dcps_info()->remove_topic(domainId, participantId, topicId);
00559 } catch (const CORBA::Exception& ex) {
00560 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: ");
00561 return DCPS::INTERNAL_ERROR;
00562 }
00563 }
00564
00565 bool
00566 InfoRepoDiscovery::ignore_topic(DDS::DomainId_t domainId, const RepoId& myParticipantId,
00567 const RepoId& ignoreId)
00568 {
00569 try {
00570 get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
00571 return true;
00572 } catch (const CORBA::Exception& ex) {
00573 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: ");
00574 return false;
00575 }
00576 }
00577
00578 bool
00579 InfoRepoDiscovery::update_topic_qos(const RepoId& topicId, DDS::DomainId_t domainId,
00580 const RepoId& participantId, const DDS::TopicQos& qos)
00581 {
00582 try {
00583 return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
00584 } catch (const CORBA::Exception& ex) {
00585 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: ");
00586 return false;
00587 }
00588 }
00589
00590
00591
00592
00593 RepoId
00594 InfoRepoDiscovery::add_publication(DDS::DomainId_t domainId,
00595 const RepoId& participantId,
00596 const RepoId& topicId,
00597 DCPS::DataWriterCallbacks* publication,
00598 const DDS::DataWriterQos& qos,
00599 const DCPS::TransportLocatorSeq& transInfo,
00600 const DDS::PublisherQos& publisherQos)
00601 {
00602 RepoId pubId;
00603
00604 try {
00605 DCPS::DataWriterRemoteImpl* writer_remote_impl = 0;
00606 ACE_NEW_RETURN(writer_remote_impl,
00607 DataWriterRemoteImpl(*publication),
00608 DCPS::GUID_UNKNOWN);
00609
00610
00611 PortableServer::ServantBase_var writer_remote(writer_remote_impl);
00612
00613
00614 OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
00615 servant_to_remote_reference(writer_remote_impl, orb_);
00616
00617 pubId = get_dcps_info()->add_publication(domainId, participantId, topicId,
00618 dr_remote_obj, qos, transInfo, publisherQos);
00619
00620 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00621
00622 dataWriterMap_[pubId] = dr_remote_obj;
00623
00624 } catch (const CORBA::Exception& ex) {
00625 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: ");
00626 pubId = DCPS::GUID_UNKNOWN;
00627 }
00628
00629 return pubId;
00630 }
00631
00632 bool
00633 InfoRepoDiscovery::remove_publication(DDS::DomainId_t domainId,
00634 const RepoId& participantId,
00635 const RepoId& publicationId)
00636 {
00637 {
00638 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00639 removeDataWriterRemote(publicationId);
00640 }
00641 bool removed = false;
00642 try {
00643 get_dcps_info()->remove_publication(domainId, participantId, publicationId);
00644 removed = true;
00645 } catch (const CORBA::Exception& ex) {
00646 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: ");
00647 }
00648
00649 return removed;
00650 }
00651
00652 bool
00653 InfoRepoDiscovery::ignore_publication(DDS::DomainId_t domainId,
00654 const RepoId& participantId,
00655 const RepoId& ignoreId)
00656 {
00657 try {
00658 get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
00659 return true;
00660 } catch (const CORBA::Exception& ex) {
00661 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: ");
00662 return false;
00663 }
00664 }
00665
00666 bool
00667 InfoRepoDiscovery::update_publication_qos(DDS::DomainId_t domainId,
00668 const RepoId& participantId,
00669 const RepoId& dwId,
00670 const DDS::DataWriterQos& qos,
00671 const DDS::PublisherQos& publisherQos)
00672 {
00673 try {
00674 return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
00675 qos, publisherQos);
00676 } catch (const CORBA::Exception& ex) {
00677 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: ");
00678 return false;
00679 }
00680 }
00681
00682
00683
00684
00685 RepoId
00686 InfoRepoDiscovery::add_subscription(DDS::DomainId_t domainId,
00687 const RepoId& participantId,
00688 const RepoId& topicId,
00689 DCPS::DataReaderCallbacks* subscription,
00690 const DDS::DataReaderQos& qos,
00691 const DCPS::TransportLocatorSeq& transInfo,
00692 const DDS::SubscriberQos& subscriberQos,
00693 const char* filterClassName,
00694 const char* filterExpr,
00695 const DDS::StringSeq& params)
00696 {
00697 RepoId subId;
00698
00699 try {
00700 DCPS::DataReaderRemoteImpl* reader_remote_impl = 0;
00701 ACE_NEW_RETURN(reader_remote_impl,
00702 DataReaderRemoteImpl(*subscription),
00703 DCPS::GUID_UNKNOWN);
00704
00705
00706 PortableServer::ServantBase_var reader_remote(reader_remote_impl);
00707
00708
00709 OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
00710 servant_to_remote_reference(reader_remote_impl, orb_);
00711
00712 subId = get_dcps_info()->add_subscription(domainId, participantId, topicId,
00713 dr_remote_obj, qos, transInfo, subscriberQos,
00714 filterClassName, filterExpr, params);
00715
00716 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00717
00718 dataReaderMap_[subId] = dr_remote_obj;
00719
00720 } catch (const CORBA::Exception& ex) {
00721 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: ");
00722 subId = DCPS::GUID_UNKNOWN;
00723 }
00724 return subId;
00725 }
00726
00727 bool
00728 InfoRepoDiscovery::remove_subscription(DDS::DomainId_t domainId,
00729 const RepoId& participantId,
00730 const RepoId& subscriptionId)
00731 {
00732 {
00733 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00734 removeDataReaderRemote(subscriptionId);
00735 }
00736 bool removed = false;
00737 try {
00738 get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
00739 removed = true;
00740 } catch (const CORBA::Exception& ex) {
00741 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: ");
00742 }
00743
00744 return removed;
00745 }
00746
00747 bool
00748 InfoRepoDiscovery::ignore_subscription(DDS::DomainId_t domainId,
00749 const RepoId& participantId,
00750 const RepoId& ignoreId)
00751 {
00752 try {
00753 get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
00754 return true;
00755 } catch (const CORBA::Exception& ex) {
00756 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: ");
00757 return false;
00758 }
00759 }
00760
00761 bool
00762 InfoRepoDiscovery::update_subscription_qos(DDS::DomainId_t domainId,
00763 const RepoId& participantId,
00764 const RepoId& drId,
00765 const DDS::DataReaderQos& qos,
00766 const DDS::SubscriberQos& subQos)
00767 {
00768 try {
00769 return get_dcps_info()->update_subscription_qos(domainId, participantId,
00770 drId, qos, subQos);
00771 } catch (const CORBA::Exception& ex) {
00772 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: ");
00773 return false;
00774 }
00775 }
00776
00777 bool
00778 InfoRepoDiscovery::update_subscription_params(DDS::DomainId_t domainId,
00779 const RepoId& participantId,
00780 const RepoId& subId,
00781 const DDS::StringSeq& params)
00782
00783 {
00784 try {
00785 return get_dcps_info()->update_subscription_params(domainId, participantId,
00786 subId, params);
00787 } catch (const CORBA::Exception& ex) {
00788 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: ");
00789 return false;
00790 }
00791 }
00792
00793
00794
00795
00796 void
00797 InfoRepoDiscovery::association_complete(DDS::DomainId_t domainId,
00798 const RepoId& participantId,
00799 const RepoId& localId, const RepoId& remoteId)
00800 {
00801 try {
00802 get_dcps_info()->association_complete(domainId, participantId, localId, remoteId);
00803 } catch (const CORBA::Exception& ex) {
00804 ex._tao_print_exception("ERROR: InfoRepoDiscovery::association_complete: ");
00805 }
00806 }
00807
00808 void
00809 InfoRepoDiscovery::removeDataReaderRemote(const RepoId& subscriptionId)
00810 {
00811 DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId);
00812 if (drr == dataReaderMap_.end()) {
00813 ACE_ERROR((LM_ERROR,
00814 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
00815 ACE_TEXT(" could not find DataReader for subscriptionId.\n")));
00816 return;
00817 }
00818
00819 try {
00820 DataReaderRemoteImpl* impl =
00821 remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_);
00822 impl->detach_parent();
00823 deactivate_remote_object(drr->second.in(), orb_);
00824 }
00825 catch (::CORBA::BAD_INV_ORDER&){
00826
00827
00828 }
00829
00830 dataReaderMap_.erase(drr);
00831 }
00832
00833 void
00834 InfoRepoDiscovery::removeDataWriterRemote(const RepoId& publicationId)
00835 {
00836 DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId);
00837 if (dwr == dataWriterMap_.end()) {
00838 ACE_ERROR((LM_ERROR,
00839 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
00840 ACE_TEXT(" could not find DataWriter for publicationId.\n")));
00841 return;
00842 }
00843
00844 try {
00845 DataWriterRemoteImpl* impl =
00846 remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_);
00847 impl->detach_parent();
00848 deactivate_remote_object(dwr->second.in(), orb_);
00849 }
00850 catch (::CORBA::BAD_INV_ORDER&){
00851
00852
00853 }
00854
00855 dataWriterMap_.erase(dwr);
00856 }
00857
00858 namespace {
00859 const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00860 }
00861
00862 int
00863 InfoRepoDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00864 {
00865 const ACE_Configuration_Section_Key& root = cf.root_section();
00866 ACE_Configuration_Section_Key repo_sect;
00867
00868 if (cf.open_section(root, REPO_SECTION_NAME, 0, repo_sect) != 0) {
00869 if (DCPS_debug_level > 0) {
00870
00871
00872 ACE_DEBUG((LM_NOTICE,
00873 ACE_TEXT("(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
00874 ACE_TEXT("failed to open [%s] section.\n"),
00875 REPO_SECTION_NAME));
00876 }
00877
00878 return 0;
00879
00880 } else {
00881
00882 ValueMap vm;
00883 if (pullValues(cf, repo_sect, vm) > 0) {
00884
00885 ACE_ERROR_RETURN((LM_ERROR,
00886 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00887 ACE_TEXT("repo sections must have a subsection name\n")),
00888 -1);
00889 }
00890
00891 KeyList keys;
00892 if (processSections( cf, repo_sect, keys ) != 0) {
00893 ACE_ERROR_RETURN((LM_ERROR,
00894 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00895 ACE_TEXT("too many nesting layers in the [repo] section.\n")),
00896 -1);
00897 }
00898
00899
00900 for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
00901 std::string repo_name = (*it).first;
00902
00903 ValueMap values;
00904 pullValues( cf, (*it).second, values );
00905 Discovery::RepoKey repoKey = Discovery::DEFAULT_REPO;
00906 bool repoKeySpecified = false, bitIpSpecified = false,
00907 bitPortSpecified = false;
00908 std::string repoIor;
00909 int bitPort = 0;
00910 std::string bitIp;
00911 for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
00912 std::string name = (*it).first;
00913 if (name == "RepositoryKey") {
00914 repoKey = (*it).second;
00915 repoKeySpecified = true;
00916 if (DCPS_debug_level > 0) {
00917 ACE_DEBUG((LM_DEBUG,
00918 ACE_TEXT("(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
00919 repo_name.c_str(), repoKey.c_str()));
00920 }
00921
00922 } else if (name == "RepositoryIor") {
00923 repoIor = (*it).second;
00924
00925 if (DCPS_debug_level > 0) {
00926 ACE_DEBUG((LM_DEBUG,
00927 ACE_TEXT("(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
00928 repo_name.c_str(), repoIor.c_str()));
00929 }
00930 } else if (name == "DCPSBitTransportIPAddress") {
00931 bitIp = (*it).second;
00932 bitIpSpecified = true;
00933 if (DCPS_debug_level > 0) {
00934 ACE_DEBUG((LM_DEBUG,
00935 ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
00936 repo_name.c_str(), bitIp.c_str()));
00937 }
00938 } else if (name == "DCPSBitTransportPort") {
00939 std::string value = (*it).second;
00940 bitPort = ACE_OS::atoi(value.c_str());
00941 bitPortSpecified = true;
00942 if (convertToInteger(value, bitPort)) {
00943 } else {
00944 ACE_ERROR_RETURN((LM_ERROR,
00945 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00946 ACE_TEXT("Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
00947 value.c_str(), repo_name.c_str()),
00948 -1);
00949 }
00950 if (DCPS_debug_level > 0) {
00951 ACE_DEBUG((LM_DEBUG,
00952 ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
00953 repo_name.c_str(), bitPort));
00954 }
00955 } else {
00956 ACE_ERROR_RETURN((LM_ERROR,
00957 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00958 ACE_TEXT("Unexpected entry (%C) in [repository/%C] section.\n"),
00959 name.c_str(), repo_name.c_str()),
00960 -1);
00961 }
00962 }
00963
00964 if (values.find("RepositoryIor") == values.end()) {
00965 ACE_ERROR_RETURN((LM_ERROR,
00966 ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00967 ACE_TEXT("Repository section [repository/%C] section is missing RepositoryIor value.\n"),
00968 repo_name.c_str()),
00969 -1);
00970 }
00971
00972 if (!repoKeySpecified) {
00973
00974
00975 repoKey = repo_name;
00976 }
00977 InfoRepoDiscovery_rch discovery(
00978 make_rch<InfoRepoDiscovery>(repoKey, repoIor.c_str()));
00979 if (bitPortSpecified) discovery->bit_transport_port(bitPort);
00980 if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
00981 TheServiceParticipant->add_discovery(
00982 DCPS::static_rchandle_cast<Discovery>(discovery));
00983 }
00984 }
00985
00986 return 0;
00987 }
00988
00989 void
00990 InfoRepoDiscovery::OrbRunner::shutdown()
00991 {
00992 orb_->shutdown();
00993 wait();
00994 orb_->destroy();
00995 }
00996
00997 InfoRepoDiscovery::OrbRunner* InfoRepoDiscovery::orb_runner_;
00998 ACE_Thread_Mutex InfoRepoDiscovery::mtx_orb_runner_;
00999
01000 int
01001 InfoRepoDiscovery::OrbRunner::svc()
01002 {
01003
01004 bool done = false;
01005
01006
01007
01008
01009 sigset_t set;
01010 ACE_OS::sigfillset(&set);
01011 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
01012
01013 while (!done) {
01014 try {
01015 if (orb_->orb_core()->has_shutdown() == false) {
01016 orb_->run();
01017 }
01018
01019 done = true;
01020
01021 } catch (const CORBA::SystemException& sysex) {
01022 sysex._tao_print_exception(
01023 "ERROR: InfoRepoDiscovery::OrbRunner");
01024
01025 } catch (const CORBA::UserException& userex) {
01026 userex._tao_print_exception(
01027 "ERROR: InfoRepoDiscovery::OrbRunner");
01028
01029 } catch (const CORBA::Exception& ex) {
01030 ex._tao_print_exception(
01031 "ERROR: InfoRepoDiscovery::OrbRunner");
01032 }
01033
01034 if (orb_->orb_core()->has_shutdown()) {
01035 done = true;
01036
01037 } else {
01038 orb_->orb_core()->reactor()->reset_reactor_event_loop();
01039 }
01040 }
01041
01042 return 0;
01043 }
01044
01045
01046 InfoRepoDiscovery::StaticInitializer::StaticInitializer()
01047 {
01048 TheServiceParticipant->register_discovery_type("repository", new Config);
01049 }
01050
01051 int
01052 IRDiscoveryLoader::init(int, ACE_TCHAR*[])
01053 {
01054
01055
01056 return 0;
01057 }
01058
01059 ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader);
01060 ACE_STATIC_SVC_DEFINE(
01061 IRDiscoveryLoader,
01062 ACE_TEXT("OpenDDS_InfoRepoDiscovery"),
01063 ACE_SVC_OBJ_T,
01064 &ACE_SVC_NAME(IRDiscoveryLoader),
01065 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
01066 0)
01067
01068 }
01069 }
01070
01071 OPENDDS_END_VERSIONED_NAMESPACE_DECL