InfoRepoDiscovery.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 #include "InfoRepoDiscovery.h"
00008 
00009 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteC.h"
00010 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteImpl.h"
00011 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteC.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteImpl.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/FailoverListener.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/RepoIdBuilder.h"
00016 #include "dds/DCPS/ConfigUtils.h"
00017 
00018 #include "tao/ORB_Core.h"
00019 #include "tao/BiDir_GIOP/BiDirGIOP.h"
00020 #include "ace/Reactor.h"
00021 
00022 #if !defined (DDS_HAS_MINIMUM_BIT)
00023 #include "dds/DCPS/DomainParticipantImpl.h"
00024 #include "dds/DCPS/BuiltInTopicUtils.h"
00025 #include "dds/DCPS/Marked_Default_Qos.h"
00026 
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 
00030 #include "dds/DCPS/transport/tcp/TcpInst.h"
00031 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00032 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00033 #endif
00034 
00035 namespace {
00036 const char ROOT_POA[] = "RootPOA";
00037 const char BIDIR_POA[] = "BiDirPOA";
00038 
00039 struct DestroyPolicy {
00040   explicit DestroyPolicy(const CORBA::Policy_ptr& p)
00041     : p_(CORBA::Policy::_duplicate(p)) {}
00042 
00043   ~DestroyPolicy() { p_->destroy(); }
00044 
00045   CORBA::Policy_var p_;
00046 };
00047 
00048 PortableServer::POA_ptr get_POA(CORBA::ORB_ptr orb)
00049 {
00050   CORBA::Object_var obj =
00051     orb->resolve_initial_references(ROOT_POA);
00052   PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj.in());
00053 
00054   if (TheServiceParticipant->use_bidir_giop()) {
00055     while (true) {
00056       try {
00057         return root_poa->find_POA(BIDIR_POA, false /*activate*/);
00058       } catch (const PortableServer::POA::AdapterNonExistent&) {
00059         // go ahead and create it...
00060       }
00061       CORBA::PolicyList policies(1);
00062       policies.length(1);
00063       CORBA::Any policy;
00064       policy <<= BiDirPolicy::BOTH;
00065       policies[0] =
00066         orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
00067       DestroyPolicy destroy(policies[0]);
00068       PortableServer::POAManager_var manager = root_poa->the_POAManager();
00069       try {
00070         return root_poa->create_POA(BIDIR_POA, manager, policies);
00071       } catch (const PortableServer::POA::AdapterAlreadyExists&) {
00072         // another thread created it, try to find it again
00073       }
00074     }
00075   }
00076 
00077   return root_poa._retn();
00078 }
00079 
00080 /// Get a servant pointer given an object reference.
00081 /// @throws PortableServer::POA::ObjectNotActive
00082 ///         PortableServer::POA::WrongAdapter
00083 ///         PortableServer::POA::WrongPolicy
00084 template <class T_impl, class T_ptr>
00085 T_impl* remote_reference_to_servant(T_ptr p, CORBA::ORB_ptr orb)
00086 {
00087   if (CORBA::is_nil(p)) {
00088     return 0;
00089   }
00090 
00091   PortableServer::POA_var poa = get_POA(orb);
00092 
00093   T_impl* the_servant =
00094     dynamic_cast<T_impl*>(poa->reference_to_servant(p));
00095 
00096   // Use the ServantBase_var so that the servant's reference
00097   // count will not be changed by this operation.
00098   PortableServer::ServantBase_var servant = the_servant;
00099 
00100   return the_servant;
00101 }
00102 
00103 /// Given a servant, return the remote object reference from the local POA.
00104 /// @throws PortableServer::POA::ServantNotActive,
00105 ///         PortableServer::POA::WrongPolicy
00106 template <class T>
00107 typename T::_stub_ptr_type servant_to_remote_reference(T* servant, CORBA::ORB_ptr orb)
00108 {
00109   PortableServer::POA_var poa = get_POA(orb);
00110   PortableServer::ObjectId_var oid = poa->activate_object(servant);
00111   CORBA::Object_var obj = poa->id_to_reference(oid.in());
00112 
00113   typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.in());
00114   return the_obj;
00115 }
00116 
00117 template <class T>
00118 void deactivate_remote_object(T obj, CORBA::ORB_ptr orb)
00119 {
00120   PortableServer::POA_var poa = get_POA(orb);
00121   PortableServer::ObjectId_var oid =
00122     poa->reference_to_id(obj);
00123   poa->deactivate_object(oid.in());
00124 }
00125 
00126 }
00127 
00128 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00129 
00130 namespace OpenDDS {
00131 namespace DCPS {
00132 
00133 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00134                                      const std::string& ior)
00135   : Discovery(key),
00136     ior_(ior),
00137     bit_transport_port_(0),
00138     use_local_bit_config_(false),
00139     orb_from_user_(false)
00140 {
00141 }
00142 
00143 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00144                                      const DCPSInfo_var& info)
00145   : Discovery(key),
00146     info_(info),
00147     bit_transport_port_(0),
00148     use_local_bit_config_(false),
00149     orb_from_user_(false)
00150 {
00151 }
00152 
00153 InfoRepoDiscovery::~InfoRepoDiscovery()
00154 {
00155   if (!orb_from_user_ && orb_runner_) {
00156     if (0 == --orb_runner_->use_count_) {
00157       try {
00158         orb_runner_->shutdown();
00159       }
00160       catch (const CORBA::Exception& ex) {
00161         ACE_ERROR((LM_ERROR,
00162           ACE_TEXT("ERROR: InfoRepoDiscovery::~InfoRepoDiscovery - ")
00163           ACE_TEXT("Exception caught during ORB shutdown: %C.\n"),
00164           ex._info().c_str()));
00165       }
00166 
00167       delete orb_runner_;
00168       orb_runner_ = 0;
00169     }
00170   }
00171 }
00172 
00173 bool
00174 InfoRepoDiscovery::set_ORB(CORBA::ORB_ptr orb)
00175 {
00176   if (!CORBA::is_nil (orb_.in()) || CORBA::is_nil (orb)) {
00177     return false;
00178   }
00179 
00180   orb_ = CORBA::ORB::_duplicate(orb);
00181   orb_from_user_ = true;
00182   return true;
00183 }
00184 
00185 namespace
00186 {
00187   DCPSInfo_ptr get_repo(const char* ior, CORBA::ORB_ptr orb)
00188   {
00189     CORBA::Object_var o;
00190     try {
00191       o = orb->string_to_object(ior);
00192     } catch (CORBA::INV_OBJREF&) {
00193       // host:port format causes an exception; try again
00194       // with corbaloc format
00195       std::string second_try("corbaloc:iiop:");
00196       second_try += ior;
00197       second_try += "/DCPSInfoRepo";
00198 
00199       o = orb->string_to_object(second_try.c_str());
00200     }
00201 
00202     return DCPSInfo::_narrow(o.in());
00203   }
00204 }
00205 
00206 DCPSInfo_var
00207 InfoRepoDiscovery::get_dcps_info()
00208 {
00209   if (CORBA::is_nil(this->info_.in())) {
00210 
00211     if (!orb_) {
00212       ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_orb_runner_, 0);
00213       if (!orb_runner_) {
00214         orb_runner_ = new OrbRunner;
00215         ACE_ARGV* argv = TheServiceParticipant->ORB_argv();
00216         int argc = argv->argc();
00217         orb_runner_->orb_ =
00218           CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME);
00219         orb_runner_->use_count_ = 1;
00220         orb_runner_->activate();
00221 
00222         CORBA::Object_var rp =
00223           orb_runner_->orb_->resolve_initial_references(ROOT_POA);
00224         PortableServer::POA_var poa = PortableServer::POA::_narrow(rp);
00225         PortableServer::POAManager_var poa_manager = poa->the_POAManager();
00226         poa_manager->activate();
00227       } else {
00228         ++orb_runner_->use_count_;
00229       }
00230       orb_ = orb_runner_->orb_;
00231     }
00232 
00233     try {
00234       this->info_ = get_repo(this->ior_.c_str(), orb_);
00235 
00236       if (CORBA::is_nil(this->info_.in())) {
00237         ACE_ERROR((LM_ERROR,
00238                    ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
00239                    ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"),
00240                    this->ior_.c_str(),
00241                    this->key().c_str()));
00242         return DCPSInfo::_nil();
00243       }
00244 
00245     } catch (const CORBA::Exception& ex) {
00246       ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - ");
00247       return DCPSInfo::_nil();
00248     }
00249   }
00250 
00251   return this->info_;
00252 }
00253 
00254 std::string
00255 InfoRepoDiscovery::get_stringified_dcps_info_ior()
00256 {
00257   return this->ior_;
00258 }
00259 
00260 TransportConfig_rch
00261 InfoRepoDiscovery::bit_config()
00262 {
00263 #if !defined (DDS_HAS_MINIMUM_BIT)
00264   if (bit_config_.is_nil()) {
00265     const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX +
00266                                  std::string("_BITTransportConfig_") + key();
00267     bit_config_ = TransportRegistry::instance()->create_config(cfg_name);
00268 
00269     const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX +
00270                                   std::string("_BITTCPTransportInst_") + key();
00271     TransportInst_rch inst =
00272       TransportRegistry::instance()->create_inst(inst_name, "tcp");
00273     bit_config_->instances_.push_back(inst);
00274 
00275     if (!use_local_bit_config_) {
00276       bit_transport_ip_ = TheServiceParticipant->bit_transport_ip();
00277       bit_transport_port_ = TheServiceParticipant->bit_transport_port();
00278     }
00279 
00280     // Use a static cast to avoid dependency on the Tcp library
00281     TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst);
00282 
00283     tcp_inst->datalink_release_delay_ = 0;
00284     if (!bit_transport_ip_.empty()) {
00285       tcp_inst->local_address(bit_transport_port_,
00286                               bit_transport_ip_.c_str());
00287     } else {
00288       tcp_inst->local_address_set_port(bit_transport_port_);
00289     }
00290   }
00291   return bit_config_;
00292 #else
00293   return TransportConfig_rch();
00294 #endif
00295 }
00296 
00297 DDS::Subscriber_ptr
00298 InfoRepoDiscovery::init_bit(DomainParticipantImpl* participant)
00299 {
00300 #if defined (DDS_HAS_MINIMUM_BIT)
00301   ACE_UNUSED_ARG(participant);
00302   return 0;
00303 #else
00304   if (!TheServiceParticipant->get_BIT()) {
00305     return 0;
00306   }
00307 
00308   if (create_bit_topics(participant) != DDS::RETCODE_OK) {
00309     return 0;
00310   }
00311 
00312   DDS::Subscriber_var bit_subscriber =
00313     participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
00314                                    DDS::SubscriberListener::_nil(),
00315                                    DEFAULT_STATUS_MASK);
00316   try {
00317     TransportConfig_rch config = bit_config();
00318     TransportRegistry::instance()->bind_config(config, bit_subscriber);
00319 
00320   } catch (const Transport::Exception&) {
00321     ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00322                          "exception during transport initialization\n"));
00323     return 0;
00324   }
00325 
00326   // DataReaders
00327   try {
00328     DDS::DataReaderQos participantReaderQos;
00329     bit_subscriber->get_default_datareader_qos(participantReaderQos);
00330     participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00331 
00332     if (participant->federated()) {
00333       participantReaderQos.liveliness.lease_duration.nanosec = 0;
00334       participantReaderQos.liveliness.lease_duration.sec =
00335         TheServiceParticipant->federation_liveliness();
00336     }
00337 
00338     DDS::TopicDescription_var bit_part_topic =
00339       participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
00340 
00341     DDS::DataReader_var dr =
00342       bit_subscriber->create_datareader(bit_part_topic,
00343                                         participantReaderQos,
00344                                         DDS::DataReaderListener::_nil(),
00345                                         DEFAULT_STATUS_MASK);
00346 
00347     if (participant->federated()) {
00348       DDS::ParticipantBuiltinTopicDataDataReader_var pbit_dr =
00349         DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
00350 
00351       DataReaderListener_var failover = new FailoverListener(key());
00352       pbit_dr->set_listener(failover, DEFAULT_STATUS_MASK);
00353     }
00354 
00355     DDS::DataReaderQos dr_qos;
00356     bit_subscriber->get_default_datareader_qos(dr_qos);
00357     dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00358 
00359     DDS::TopicDescription_var bit_topic_topic =
00360       participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
00361 
00362     dr = bit_subscriber->create_datareader(bit_topic_topic,
00363                                            dr_qos,
00364                                            DDS::DataReaderListener::_nil(),
00365                                            DEFAULT_STATUS_MASK);
00366 
00367     DDS::TopicDescription_var bit_pub_topic =
00368       participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
00369 
00370     dr = bit_subscriber->create_datareader(bit_pub_topic,
00371                                            dr_qos,
00372                                            DDS::DataReaderListener::_nil(),
00373                                            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00374 
00375     DDS::TopicDescription_var bit_sub_topic =
00376       participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
00377 
00378     dr = bit_subscriber->create_datareader(bit_sub_topic,
00379                                            dr_qos,
00380                                            DDS::DataReaderListener::_nil(),
00381                                            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00382 
00383     const DDS::ReturnCode_t ret = bit_subscriber->enable();
00384     if (ret != DDS::RETCODE_OK) {
00385       if (DCPS_debug_level) {
00386         ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) InfoRepoDiscovery::init_bit")
00387                    ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
00388       }
00389       return 0;
00390     }
00391 
00392   } catch (const CORBA::Exception&) {
00393     ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00394                          "exception during DataReader initialization\n"));
00395     return 0;
00396   }
00397   return bit_subscriber._retn();
00398 #endif
00399 }
00400 
00401 void
00402 InfoRepoDiscovery::fini_bit(DCPS::DomainParticipantImpl* /* participant */)
00403 {
00404   // nothing to do for DCPSInfoRepo
00405 }
00406 
00407 RepoId
00408 InfoRepoDiscovery::bit_key_to_repo_id(DomainParticipantImpl* /*participant*/,
00409                                       const char* /*bit_topic_name*/,
00410                                       const DDS::BuiltinTopicKey_t& key) const
00411 {
00412   RepoId id = RepoIdBuilder::create();
00413   RepoIdBuilder builder(id);
00414   builder.federationId(key.value[0]);
00415   builder.participantId(key.value[1]);
00416   builder.entityId(key.value[2]);
00417   return id;
00418 }
00419 
00420 bool
00421 InfoRepoDiscovery::active()
00422 {
00423   try {
00424     // invoke a CORBA call, if we are active then there will be no exception
00425     get_dcps_info()->_is_a("Not_An_IDL_Type");
00426     return true;
00427   } catch (const CORBA::Exception&) {
00428     return false;
00429   }
00430 }
00431 
00432 // Participant operations:
00433 
00434 bool
00435 InfoRepoDiscovery::attach_participant(DDS::DomainId_t domainId,
00436                                       const RepoId& participantId)
00437 {
00438   try {
00439     return get_dcps_info()->attach_participant(domainId, participantId);
00440   } catch (const CORBA::Exception& ex) {
00441     ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: ");
00442     return false;
00443   }
00444 }
00445 
00446 OpenDDS::DCPS::RepoId
00447 InfoRepoDiscovery::generate_participant_guid()
00448 {
00449   return GUID_UNKNOWN;
00450 }
00451 
00452 DCPS::AddDomainStatus
00453 InfoRepoDiscovery::add_domain_participant(DDS::DomainId_t domainId,
00454                                           const DDS::DomainParticipantQos& qos)
00455 {
00456   try {
00457     const DCPSInfo_var info = get_dcps_info();
00458     if (!CORBA::is_nil(info)) {
00459       return info->add_domain_participant(domainId, qos);
00460     }
00461   } catch (const CORBA::Exception& ex) {
00462     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: ");
00463   }
00464   const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
00465   return ads;
00466 }
00467 
00468 #if defined(OPENDDS_SECURITY)
00469 DCPS::AddDomainStatus
00470 InfoRepoDiscovery::add_domain_participant_secure(
00471   DDS::DomainId_t /*domain*/,
00472   const DDS::DomainParticipantQos& /*qos*/,
00473   const OpenDDS::DCPS::RepoId& /*guid*/,
00474   DDS::Security::IdentityHandle /*id*/,
00475   DDS::Security::PermissionsHandle /*perm*/,
00476   DDS::Security::ParticipantCryptoHandle /*part_crypto*/)
00477 {
00478   const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
00479   return ads;
00480 }
00481 #endif
00482 
00483 bool
00484 InfoRepoDiscovery::remove_domain_participant(DDS::DomainId_t domainId,
00485                                              const RepoId& participantId)
00486 {
00487   try {
00488     get_dcps_info()->remove_domain_participant(domainId, participantId);
00489     return true;
00490   } catch (const CORBA::Exception& ex) {
00491     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: ");
00492     return false;
00493   }
00494 }
00495 
00496 bool
00497 InfoRepoDiscovery::ignore_domain_participant(DDS::DomainId_t domainId,
00498                                              const RepoId& myParticipantId,
00499                                              const RepoId& ignoreId)
00500 {
00501   try {
00502     get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
00503     return true;
00504   } catch (const CORBA::Exception& ex) {
00505     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: ");
00506     return false;
00507   }
00508 }
00509 
00510 bool
00511 InfoRepoDiscovery::update_domain_participant_qos(DDS::DomainId_t domainId,
00512                                                  const RepoId& participant,
00513                                                  const DDS::DomainParticipantQos& qos)
00514 {
00515   try {
00516     return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
00517   } catch (const CORBA::Exception& ex) {
00518     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: ");
00519     return false;
00520   }
00521 }
00522 
00523 // Topic operations:
00524 
00525 DCPS::TopicStatus
00526 InfoRepoDiscovery::assert_topic(DCPS::RepoId_out topicId, DDS::DomainId_t domainId,
00527                                 const RepoId& participantId, const char* topicName,
00528                                 const char* dataTypeName, const DDS::TopicQos& qos,
00529                                 bool hasDcpsKey)
00530 {
00531   try {
00532     return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
00533       dataTypeName, qos, hasDcpsKey);
00534   } catch (const CORBA::Exception& ex) {
00535     ex._tao_print_exception("ERROR: InfoRepoDiscovery::assert_topic: ");
00536     return DCPS::INTERNAL_ERROR;
00537   }
00538 }
00539 
00540 DCPS::TopicStatus
00541 InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId, const char* topicName,
00542                               CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
00543                               DCPS::RepoId_out topicId)
00544 {
00545   try {
00546     return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
00547   } catch (const CORBA::Exception& ex) {
00548     ex._tao_print_exception("ERROR: InfoRepoDiscovery::find_topic: ");
00549     return DCPS::INTERNAL_ERROR;
00550   }
00551 }
00552 
00553 DCPS::TopicStatus
00554 InfoRepoDiscovery::remove_topic(DDS::DomainId_t domainId, const RepoId& participantId,
00555                                 const RepoId& topicId)
00556 {
00557   try {
00558     return get_dcps_info()->remove_topic(domainId, participantId, topicId);
00559   } catch (const CORBA::Exception& ex) {
00560     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: ");
00561     return DCPS::INTERNAL_ERROR;
00562   }
00563 }
00564 
00565 bool
00566 InfoRepoDiscovery::ignore_topic(DDS::DomainId_t domainId, const RepoId& myParticipantId,
00567                                 const RepoId& ignoreId)
00568 {
00569   try {
00570     get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
00571     return true;
00572   } catch (const CORBA::Exception& ex) {
00573     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: ");
00574     return false;
00575   }
00576 }
00577 
00578 bool
00579 InfoRepoDiscovery::update_topic_qos(const RepoId& topicId, DDS::DomainId_t domainId,
00580                                     const RepoId& participantId, const DDS::TopicQos& qos)
00581 {
00582   try {
00583     return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
00584   } catch (const CORBA::Exception& ex) {
00585     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: ");
00586     return false;
00587   }
00588 }
00589 
00590 
00591 // Publication operations:
00592 
00593 RepoId
00594 InfoRepoDiscovery::add_publication(DDS::DomainId_t domainId,
00595                                    const RepoId& participantId,
00596                                    const RepoId& topicId,
00597                                    DCPS::DataWriterCallbacks* publication,
00598                                    const DDS::DataWriterQos& qos,
00599                                    const DCPS::TransportLocatorSeq& transInfo,
00600                                    const DDS::PublisherQos& publisherQos)
00601 {
00602   RepoId pubId;
00603 
00604   try {
00605     DCPS::DataWriterRemoteImpl* writer_remote_impl = 0;
00606     ACE_NEW_RETURN(writer_remote_impl,
00607                    DataWriterRemoteImpl(*publication),
00608                    DCPS::GUID_UNKNOWN);
00609 
00610     //this is taking ownership of the DataWriterRemoteImpl (server side) allocated above
00611     PortableServer::ServantBase_var writer_remote(writer_remote_impl);
00612 
00613     //this is the client reference to the DataWriterRemoteImpl
00614     OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
00615       servant_to_remote_reference(writer_remote_impl, orb_);
00616 
00617     pubId = get_dcps_info()->add_publication(domainId, participantId, topicId,
00618       dr_remote_obj, qos, transInfo, publisherQos);
00619 
00620     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00621     // take ownership of the client allocated above
00622     dataWriterMap_[pubId] = dr_remote_obj;
00623 
00624   } catch (const CORBA::Exception& ex) {
00625     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: ");
00626     pubId = DCPS::GUID_UNKNOWN;
00627   }
00628 
00629   return pubId;
00630 }
00631 
00632 bool
00633 InfoRepoDiscovery::remove_publication(DDS::DomainId_t domainId,
00634                                       const RepoId& participantId,
00635                                       const RepoId& publicationId)
00636 {
00637   {
00638     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00639     removeDataWriterRemote(publicationId);
00640   }
00641   bool removed = false;
00642   try {
00643     get_dcps_info()->remove_publication(domainId, participantId, publicationId);
00644     removed = true;
00645   } catch (const CORBA::Exception& ex) {
00646     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: ");
00647   }
00648 
00649   return removed;
00650 }
00651 
00652 bool
00653 InfoRepoDiscovery::ignore_publication(DDS::DomainId_t domainId,
00654                                       const RepoId& participantId,
00655                                       const RepoId& ignoreId)
00656 {
00657   try {
00658     get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
00659     return true;
00660   } catch (const CORBA::Exception& ex) {
00661     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: ");
00662     return false;
00663   }
00664 }
00665 
00666 bool
00667 InfoRepoDiscovery::update_publication_qos(DDS::DomainId_t domainId,
00668                                           const RepoId& participantId,
00669                                           const RepoId& dwId,
00670                                           const DDS::DataWriterQos& qos,
00671                                           const DDS::PublisherQos& publisherQos)
00672 {
00673   try {
00674     return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
00675       qos, publisherQos);
00676   } catch (const CORBA::Exception& ex) {
00677     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: ");
00678     return false;
00679   }
00680 }
00681 
00682 
00683 // Subscription operations:
00684 
00685 RepoId
00686 InfoRepoDiscovery::add_subscription(DDS::DomainId_t domainId,
00687                                     const RepoId& participantId,
00688                                     const RepoId& topicId,
00689                                     DCPS::DataReaderCallbacks* subscription,
00690                                     const DDS::DataReaderQos& qos,
00691                                     const DCPS::TransportLocatorSeq& transInfo,
00692                                     const DDS::SubscriberQos& subscriberQos,
00693                                     const char* filterClassName,
00694                                     const char* filterExpr,
00695                                     const DDS::StringSeq& params)
00696 {
00697   RepoId subId;
00698 
00699   try {
00700     DCPS::DataReaderRemoteImpl* reader_remote_impl = 0;
00701     ACE_NEW_RETURN(reader_remote_impl,
00702                    DataReaderRemoteImpl(*subscription),
00703                    DCPS::GUID_UNKNOWN);
00704 
00705     //this is taking ownership of the DataReaderRemoteImpl (server side) allocated above
00706     PortableServer::ServantBase_var reader_remote(reader_remote_impl);
00707 
00708     //this is the client reference to the DataReaderRemoteImpl
00709     OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
00710       servant_to_remote_reference(reader_remote_impl, orb_);
00711 
00712     subId = get_dcps_info()->add_subscription(domainId, participantId, topicId,
00713                                               dr_remote_obj, qos, transInfo, subscriberQos,
00714                                               filterClassName, filterExpr, params);
00715 
00716     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00717     // take ownership of the client allocated above
00718     dataReaderMap_[subId] = dr_remote_obj;
00719 
00720   } catch (const CORBA::Exception& ex) {
00721     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: ");
00722     subId = DCPS::GUID_UNKNOWN;
00723   }
00724   return subId;
00725 }
00726 
00727 bool
00728 InfoRepoDiscovery::remove_subscription(DDS::DomainId_t domainId,
00729                                        const RepoId& participantId,
00730                                        const RepoId& subscriptionId)
00731 {
00732   {
00733     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00734     removeDataReaderRemote(subscriptionId);
00735   }
00736   bool removed = false;
00737   try {
00738     get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
00739     removed = true;
00740   } catch (const CORBA::Exception& ex) {
00741     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: ");
00742   }
00743 
00744   return removed;
00745 }
00746 
00747 bool
00748 InfoRepoDiscovery::ignore_subscription(DDS::DomainId_t domainId,
00749                                        const RepoId& participantId,
00750                                        const RepoId& ignoreId)
00751 {
00752   try {
00753     get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
00754     return true;
00755   } catch (const CORBA::Exception& ex) {
00756     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: ");
00757     return false;
00758   }
00759 }
00760 
00761 bool
00762 InfoRepoDiscovery::update_subscription_qos(DDS::DomainId_t domainId,
00763                                            const RepoId& participantId,
00764                                            const RepoId& drId,
00765                                            const DDS::DataReaderQos& qos,
00766                                            const DDS::SubscriberQos& subQos)
00767 {
00768   try {
00769     return get_dcps_info()->update_subscription_qos(domainId, participantId,
00770       drId, qos, subQos);
00771   } catch (const CORBA::Exception& ex) {
00772     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: ");
00773     return false;
00774   }
00775 }
00776 
00777 bool
00778 InfoRepoDiscovery::update_subscription_params(DDS::DomainId_t domainId,
00779                                               const RepoId& participantId,
00780                                               const RepoId& subId,
00781                                               const DDS::StringSeq& params)
00782 
00783 {
00784   try {
00785     return get_dcps_info()->update_subscription_params(domainId, participantId,
00786       subId, params);
00787   } catch (const CORBA::Exception& ex) {
00788     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: ");
00789     return false;
00790   }
00791 }
00792 
00793 
00794 // Managing reader/writer associations:
00795 
00796 void
00797 InfoRepoDiscovery::association_complete(DDS::DomainId_t domainId,
00798                                         const RepoId& participantId,
00799                                         const RepoId& localId, const RepoId& remoteId)
00800 {
00801   try {
00802     get_dcps_info()->association_complete(domainId, participantId, localId, remoteId);
00803   } catch (const CORBA::Exception& ex) {
00804     ex._tao_print_exception("ERROR: InfoRepoDiscovery::association_complete: ");
00805   }
00806 }
00807 
00808 void
00809 InfoRepoDiscovery::removeDataReaderRemote(const RepoId& subscriptionId)
00810 {
00811   DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId);
00812   if (drr == dataReaderMap_.end()) {
00813     ACE_ERROR((LM_ERROR,
00814                ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
00815                ACE_TEXT(" could not find DataReader for subscriptionId.\n")));
00816     return;
00817   }
00818 
00819   try {
00820     DataReaderRemoteImpl* impl =
00821       remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_);
00822     impl->detach_parent();
00823     deactivate_remote_object(drr->second.in(), orb_);
00824   }
00825   catch (::CORBA::BAD_INV_ORDER&){
00826     // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown.
00827     // Ignore it anyway.
00828   }
00829 
00830   dataReaderMap_.erase(drr);
00831 }
00832 
00833 void
00834 InfoRepoDiscovery::removeDataWriterRemote(const RepoId& publicationId)
00835 {
00836   DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId);
00837   if (dwr == dataWriterMap_.end()) {
00838     ACE_ERROR((LM_ERROR,
00839                ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
00840                ACE_TEXT(" could not find DataWriter for publicationId.\n")));
00841     return;
00842   }
00843 
00844   try {
00845     DataWriterRemoteImpl* impl =
00846       remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_);
00847     impl->detach_parent();
00848     deactivate_remote_object(dwr->second.in(), orb_);
00849   }
00850   catch (::CORBA::BAD_INV_ORDER&){
00851     // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown.
00852     // Ignore it anyway.
00853   }
00854 
00855   dataWriterMap_.erase(dwr);
00856 }
00857 
00858 namespace {
00859   const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00860 }
00861 
00862 int
00863 InfoRepoDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00864 {
00865   const ACE_Configuration_Section_Key& root = cf.root_section();
00866   ACE_Configuration_Section_Key repo_sect;
00867 
00868   if (cf.open_section(root, REPO_SECTION_NAME, 0, repo_sect) != 0) {
00869     if (DCPS_debug_level > 0) {
00870       // This is not an error if the configuration file does not have
00871       // any repository (sub)section. The code default configuration will be used.
00872       ACE_DEBUG((LM_NOTICE,
00873                  ACE_TEXT("(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
00874                  ACE_TEXT("failed to open [%s] section.\n"),
00875                  REPO_SECTION_NAME));
00876     }
00877 
00878     return 0;
00879 
00880   } else {
00881     // Ensure there are no properties in this section
00882     ValueMap vm;
00883     if (pullValues(cf, repo_sect, vm) > 0) {
00884       // There are values inside [repo]
00885       ACE_ERROR_RETURN((LM_ERROR,
00886                         ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00887                         ACE_TEXT("repo sections must have a subsection name\n")),
00888                        -1);
00889     }
00890     // Process the subsections of this section (the individual repos)
00891     KeyList keys;
00892     if (processSections( cf, repo_sect, keys ) != 0) {
00893       ACE_ERROR_RETURN((LM_ERROR,
00894                         ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00895                         ACE_TEXT("too many nesting layers in the [repo] section.\n")),
00896                        -1);
00897     }
00898 
00899     // Loop through the [repo/*] sections
00900     for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
00901       std::string repo_name = (*it).first;
00902 
00903       ValueMap values;
00904       pullValues( cf, (*it).second, values );
00905       Discovery::RepoKey repoKey = Discovery::DEFAULT_REPO;
00906       bool repoKeySpecified = false, bitIpSpecified = false,
00907         bitPortSpecified = false;
00908       std::string repoIor;
00909       int bitPort = 0;
00910       std::string bitIp;
00911       for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
00912         std::string name = (*it).first;
00913         if (name == "RepositoryKey") {
00914           repoKey = (*it).second;
00915           repoKeySpecified = true;
00916           if (DCPS_debug_level > 0) {
00917             ACE_DEBUG((LM_DEBUG,
00918                        ACE_TEXT("(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
00919                        repo_name.c_str(), repoKey.c_str()));
00920           }
00921 
00922         } else if (name == "RepositoryIor") {
00923           repoIor = (*it).second;
00924 
00925           if (DCPS_debug_level > 0) {
00926             ACE_DEBUG((LM_DEBUG,
00927                        ACE_TEXT("(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
00928                        repo_name.c_str(), repoIor.c_str()));
00929           }
00930         } else if (name == "DCPSBitTransportIPAddress") {
00931           bitIp = (*it).second;
00932           bitIpSpecified = true;
00933           if (DCPS_debug_level > 0) {
00934             ACE_DEBUG((LM_DEBUG,
00935                        ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
00936                        repo_name.c_str(), bitIp.c_str()));
00937           }
00938         } else if (name == "DCPSBitTransportPort") {
00939           std::string value = (*it).second;
00940           bitPort = ACE_OS::atoi(value.c_str());
00941           bitPortSpecified = true;
00942           if (convertToInteger(value, bitPort)) {
00943           } else {
00944             ACE_ERROR_RETURN((LM_ERROR,
00945                               ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00946                               ACE_TEXT("Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
00947                               value.c_str(), repo_name.c_str()),
00948                              -1);
00949           }
00950           if (DCPS_debug_level > 0) {
00951             ACE_DEBUG((LM_DEBUG,
00952                        ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
00953                        repo_name.c_str(), bitPort));
00954           }
00955         } else {
00956           ACE_ERROR_RETURN((LM_ERROR,
00957                             ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00958                             ACE_TEXT("Unexpected entry (%C) in [repository/%C] section.\n"),
00959                             name.c_str(), repo_name.c_str()),
00960                            -1);
00961         }
00962       }
00963 
00964       if (values.find("RepositoryIor") == values.end()) {
00965         ACE_ERROR_RETURN((LM_ERROR,
00966                           ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00967                           ACE_TEXT("Repository section [repository/%C] section is missing RepositoryIor value.\n"),
00968                           repo_name.c_str()),
00969                          -1);
00970       }
00971 
00972       if (!repoKeySpecified) {
00973         // If the RepositoryKey option was not specified, use the section
00974         // name as the repo key
00975         repoKey = repo_name;
00976       }
00977       InfoRepoDiscovery_rch discovery(
00978         make_rch<InfoRepoDiscovery>(repoKey, repoIor.c_str()));
00979       if (bitPortSpecified) discovery->bit_transport_port(bitPort);
00980       if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
00981       TheServiceParticipant->add_discovery(
00982         DCPS::static_rchandle_cast<Discovery>(discovery));
00983     }
00984   }
00985 
00986   return 0;
00987 }
00988 
00989 void
00990 InfoRepoDiscovery::OrbRunner::shutdown()
00991 {
00992   orb_->shutdown();
00993   wait();
00994   orb_->destroy();
00995 }
00996 
00997 InfoRepoDiscovery::OrbRunner* InfoRepoDiscovery::orb_runner_;
00998 ACE_Thread_Mutex InfoRepoDiscovery::mtx_orb_runner_;
00999 
01000 int
01001 InfoRepoDiscovery::OrbRunner::svc()
01002 {
01003   // this method was originally Service_Participant::svc()
01004   bool done = false;
01005 
01006   // Ignore all signals to avoid
01007   //     ERROR: <something descriptive> Interrupted system call
01008   // The main thread will handle signals.
01009   sigset_t set;
01010   ACE_OS::sigfillset(&set);
01011   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
01012 
01013   while (!done) {
01014     try {
01015       if (orb_->orb_core()->has_shutdown() == false) {
01016         orb_->run();
01017       }
01018 
01019       done = true;
01020 
01021     } catch (const CORBA::SystemException& sysex) {
01022       sysex._tao_print_exception(
01023         "ERROR: InfoRepoDiscovery::OrbRunner");
01024 
01025     } catch (const CORBA::UserException& userex) {
01026       userex._tao_print_exception(
01027         "ERROR: InfoRepoDiscovery::OrbRunner");
01028 
01029     } catch (const CORBA::Exception& ex) {
01030       ex._tao_print_exception(
01031         "ERROR: InfoRepoDiscovery::OrbRunner");
01032     }
01033 
01034     if (orb_->orb_core()->has_shutdown()) {
01035       done = true;
01036 
01037     } else {
01038       orb_->orb_core()->reactor()->reset_reactor_event_loop();
01039     }
01040   }
01041 
01042   return 0;
01043 }
01044 
01045 
01046 InfoRepoDiscovery::StaticInitializer::StaticInitializer()
01047 {
01048   TheServiceParticipant->register_discovery_type("repository", new Config);
01049 }
01050 
01051 int
01052 IRDiscoveryLoader::init(int, ACE_TCHAR*[])
01053 {
01054   // no-op: since the library is loaded, InfoRepoDiscovery::StaticInitializer
01055   // has already been constructed.
01056   return 0;
01057 }
01058 
01059 ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader);
01060 ACE_STATIC_SVC_DEFINE(
01061   IRDiscoveryLoader,
01062   ACE_TEXT("OpenDDS_InfoRepoDiscovery"),
01063   ACE_SVC_OBJ_T,
01064   &ACE_SVC_NAME(IRDiscoveryLoader),
01065   ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
01066   0)
01067 
01068 } // namespace DCPS
01069 } // namespace OpenDDS
01070 
01071 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1