InfoRepoDiscovery.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 #include "InfoRepoDiscovery.h"
00008 
00009 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteC.h"
00010 #include "dds/DCPS/InfoRepoDiscovery/DataReaderRemoteImpl.h"
00011 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteC.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/DataWriterRemoteImpl.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/FailoverListener.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/RepoIdBuilder.h"
00016 #include "dds/DCPS/ConfigUtils.h"
00017 
00018 #include "tao/ORB_Core.h"
00019 #include "ace/Reactor.h"
00020 
00021 #if !defined (DDS_HAS_MINIMUM_BIT)
00022 #include "dds/DCPS/DomainParticipantImpl.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/Marked_Default_Qos.h"
00025 
00026 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028 
00029 #include "dds/DCPS/transport/tcp/TcpInst.h"
00030 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00031 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00032 #endif
00033 
00034 namespace {
00035 
00036 /// Get a servant pointer given an object reference.
00037 /// @throws PortableServer::POA::ObjectNotActive
00038 ///         PortableServer::POA::WrongAdapter
00039 ///         PortableServer::POA::WongPolicy
00040 template <class T_impl, class T_ptr>
00041 T_impl* remote_reference_to_servant(T_ptr p, CORBA::ORB_ptr orb)
00042 {
00043   if (CORBA::is_nil(p)) {
00044     return 0;
00045   }
00046 
00047   CORBA::Object_var obj =
00048     orb->resolve_initial_references("RootPOA");
00049   PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
00050 
00051   T_impl* the_servant =
00052     dynamic_cast<T_impl*>(poa->reference_to_servant(p));
00053 
00054   // Use the ServantBase_var so that the servant's reference
00055   // count will not be changed by this operation.
00056   PortableServer::ServantBase_var servant = the_servant;
00057 
00058   return the_servant;
00059 }
00060 
00061 /// Given a servant, return the remote object reference from the local POA.
00062 /// @throws PortableServer::POA::ServantNotActive,
00063 ///         PortableServer::POA::WrongPolicy
00064 template <class T>
00065 typename T::_stub_ptr_type servant_to_remote_reference(T* servant, CORBA::ORB_ptr orb)
00066 {
00067   CORBA::Object_var obj =
00068     orb->resolve_initial_references("RootPOA");
00069   PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
00070 
00071   PortableServer::ObjectId_var oid = poa->activate_object(servant);
00072 
00073   obj = poa->id_to_reference(oid.in());
00074 
00075   typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.in());
00076   return the_obj;
00077 }
00078 
00079 template <class T>
00080 void deactivate_remote_object(T obj, CORBA::ORB_ptr orb)
00081 {
00082   CORBA::Object_var poa_obj =
00083     orb->resolve_initial_references("RootPOA");
00084   PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
00085   PortableServer::ObjectId_var oid =
00086     poa->reference_to_id(obj);
00087   poa->deactivate_object(oid.in());
00088 }
00089 
00090 }
00091 
00092 namespace OpenDDS {
00093 namespace DCPS {
00094 
00095 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00096                                      const std::string& ior)
00097   : Discovery(key),
00098     ior_(ior),
00099     bit_transport_port_(0),
00100     use_local_bit_config_(false),
00101     failoverListener_(0),
00102     orb_from_user_(false)
00103 {
00104 }
00105 
00106 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
00107                                      const DCPSInfo_var& info)
00108   : Discovery(key),
00109     info_(info),
00110     bit_transport_port_(0),
00111     use_local_bit_config_(false),
00112     failoverListener_(0),
00113     orb_from_user_(false)
00114 {
00115 }
00116 
00117 InfoRepoDiscovery::~InfoRepoDiscovery()
00118 {
00119   delete this->failoverListener_;
00120   if (!orb_from_user_ && orb_runner_) {
00121     if (0 == --orb_runner_->use_count_) {
00122       orb_runner_->shutdown();
00123       delete orb_runner_;
00124       orb_runner_ = 0;
00125     }
00126   }
00127 }
00128 
00129 bool
00130 InfoRepoDiscovery::set_ORB(CORBA::ORB_ptr orb)
00131 {
00132   if (orb_.in() || !orb) {
00133     return false;
00134   }
00135 
00136   orb_ = CORBA::ORB::_duplicate(orb);
00137   orb_from_user_ = true;
00138   return true;
00139 }
00140 
00141 namespace
00142 {
00143   DCPSInfo_ptr get_repo(const char* ior, CORBA::ORB_ptr orb)
00144   {
00145     CORBA::Object_var o;
00146     try {
00147       o = orb->string_to_object(ior);
00148     } catch (CORBA::INV_OBJREF&) {
00149       // host:port format causes an exception; try again
00150       // with corbaloc format
00151       std::string second_try("corbaloc:iiop:");
00152       second_try += ior;
00153       second_try += "/DCPSInfoRepo";
00154 
00155       o = orb->string_to_object(second_try.c_str());
00156     }
00157 
00158     return DCPSInfo::_narrow(o.in());
00159   }
00160 }
00161 
00162 DCPSInfo_var
00163 InfoRepoDiscovery::get_dcps_info()
00164 {
00165   if (CORBA::is_nil(this->info_.in())) {
00166 
00167     if (!orb_) {
00168       ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_orb_runner_, 0);
00169       if (!orb_runner_) {
00170         orb_runner_ = new OrbRunner;
00171         ACE_ARGV* argv = TheServiceParticipant->ORB_argv();
00172         int argc = argv->argc();
00173         orb_runner_->orb_ =
00174           CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME);
00175         orb_runner_->use_count_ = 1;
00176         orb_runner_->activate();
00177 
00178         CORBA::Object_var rp =
00179           orb_runner_->orb_->resolve_initial_references("RootPOA");
00180         PortableServer::POA_var poa = PortableServer::POA::_narrow(rp);
00181         PortableServer::POAManager_var poa_manager = poa->the_POAManager();
00182         poa_manager->activate();
00183       } else {
00184         ++orb_runner_->use_count_;
00185       }
00186       orb_ = orb_runner_->orb_;
00187     }
00188 
00189     try {
00190       this->info_ = get_repo(this->ior_.c_str(), orb_);
00191 
00192       if (CORBA::is_nil(this->info_.in())) {
00193         ACE_ERROR((LM_ERROR,
00194                    ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
00195                    ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"),
00196                    this->ior_.c_str(),
00197                    this->key().c_str()));
00198         return DCPSInfo::_nil();
00199       }
00200 
00201     } catch (const CORBA::Exception& ex) {
00202       ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - ");
00203       return DCPSInfo::_nil();
00204     }
00205   }
00206 
00207   return this->info_;
00208 }
00209 
00210 std::string
00211 InfoRepoDiscovery::get_stringified_dcps_info_ior()
00212 {
00213   return this->ior_;
00214 }
00215 
00216 TransportConfig_rch
00217 InfoRepoDiscovery::bit_config()
00218 {
00219 #if !defined (DDS_HAS_MINIMUM_BIT)
00220   if (bit_config_.is_nil()) {
00221     const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX +
00222                                  std::string("_BITTransportConfig_") + key();
00223     bit_config_ = TransportRegistry::instance()->create_config(cfg_name);
00224 
00225     const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX +
00226                                   std::string("_BITTCPTransportInst_") + key();
00227     TransportInst_rch inst =
00228       TransportRegistry::instance()->create_inst(inst_name, "tcp");
00229     bit_config_->instances_.push_back(inst);
00230 
00231     if (!use_local_bit_config_) {
00232       bit_transport_ip_ = TheServiceParticipant->bit_transport_ip();
00233       bit_transport_port_ = TheServiceParticipant->bit_transport_port();
00234     }
00235 
00236     // Use a static cast to avoid dependency on the Tcp library
00237     TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst);
00238 
00239     tcp_inst->datalink_release_delay_ = 0;
00240     tcp_inst->local_address(bit_transport_port_,
00241                             bit_transport_ip_.c_str());
00242   }
00243   return bit_config_;
00244 #else
00245   return 0;
00246 #endif
00247 }
00248 
00249 DDS::Subscriber_ptr
00250 InfoRepoDiscovery::init_bit(DomainParticipantImpl* participant)
00251 {
00252 #if defined (DDS_HAS_MINIMUM_BIT)
00253   ACE_UNUSED_ARG(participant);
00254   return 0;
00255 #else
00256   if (!TheServiceParticipant->get_BIT()) {
00257     return 0;
00258   }
00259 
00260   if (create_bit_topics(participant) != DDS::RETCODE_OK) {
00261     return 0;
00262   }
00263 
00264   DDS::Subscriber_var bit_subscriber =
00265     participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
00266                                    DDS::SubscriberListener::_nil(),
00267                                    DEFAULT_STATUS_MASK);
00268   try {
00269     TransportConfig_rch config = bit_config();
00270     TransportRegistry::instance()->bind_config(config, bit_subscriber);
00271 
00272   } catch (const Transport::Exception&) {
00273     ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00274                          "exception during transport initialization\n"));
00275     return 0;
00276   }
00277 
00278   // DataReaders
00279   try {
00280     DDS::DataReaderQos participantReaderQos;
00281     bit_subscriber->get_default_datareader_qos(participantReaderQos);
00282     participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00283 
00284     if (participant->federated()) {
00285       participantReaderQos.liveliness.lease_duration.nanosec = 0;
00286       participantReaderQos.liveliness.lease_duration.sec =
00287         TheServiceParticipant->federation_liveliness();
00288     }
00289 
00290     DDS::TopicDescription_var bit_part_topic =
00291       participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
00292 
00293     DDS::DataReader_var dr =
00294       bit_subscriber->create_datareader(bit_part_topic,
00295                                         participantReaderQos,
00296                                         DDS::DataReaderListener::_nil(),
00297                                         DEFAULT_STATUS_MASK);
00298 
00299     if (participant->federated()) {
00300       DDS::ParticipantBuiltinTopicDataDataReader* pbit_dr =
00301         DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
00302 
00303       // Create and attach the listener.
00304       failoverListener_ = new FailoverListener(key());
00305       pbit_dr->set_listener(failoverListener_, DEFAULT_STATUS_MASK);
00306     }
00307 
00308     DDS::DataReaderQos dr_qos;
00309     bit_subscriber->get_default_datareader_qos(dr_qos);
00310     dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00311 
00312     DDS::TopicDescription_var bit_topic_topic =
00313       participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
00314 
00315     dr = bit_subscriber->create_datareader(bit_topic_topic,
00316                                            dr_qos,
00317                                            DDS::DataReaderListener::_nil(),
00318                                            DEFAULT_STATUS_MASK);
00319 
00320     DDS::TopicDescription_var bit_pub_topic =
00321       participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
00322 
00323     dr = bit_subscriber->create_datareader(bit_pub_topic,
00324                                            dr_qos,
00325                                            DDS::DataReaderListener::_nil(),
00326                                            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00327 
00328     DDS::TopicDescription_var bit_sub_topic =
00329       participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
00330 
00331     dr = bit_subscriber->create_datareader(bit_sub_topic,
00332                                            dr_qos,
00333                                            DDS::DataReaderListener::_nil(),
00334                                            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00335 
00336   } catch (const CORBA::Exception&) {
00337     ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
00338                          "exception during DataReader initialization\n"));
00339     return 0;
00340   }
00341   return bit_subscriber._retn();
00342 #endif
00343 }
00344 
00345 void
00346 InfoRepoDiscovery::fini_bit(DCPS::DomainParticipantImpl* /* participant */)
00347 {
00348   // nothing to do for DCPSInfoRepo
00349 }
00350 
00351 RepoId
00352 InfoRepoDiscovery::bit_key_to_repo_id(DomainParticipantImpl* /*participant*/,
00353                                       const char* /*bit_topic_name*/,
00354                                       const DDS::BuiltinTopicKey_t& key) const
00355 {
00356   RepoId id = RepoIdBuilder::create();
00357   RepoIdBuilder builder(id);
00358   builder.federationId(key.value[0]);
00359   builder.participantId(key.value[1]);
00360   builder.entityId(key.value[2]);
00361   return id;
00362 }
00363 
00364 bool
00365 InfoRepoDiscovery::active()
00366 {
00367   try {
00368     // invoke a CORBA call, if we are active then there will be no exception
00369     get_dcps_info()->_is_a("Not_An_IDL_Type");
00370     return true;
00371   } catch (const CORBA::Exception&) {
00372     return false;
00373   }
00374 }
00375 
00376 // Participant operations:
00377 
00378 bool
00379 InfoRepoDiscovery::attach_participant(DDS::DomainId_t domainId,
00380                                       const RepoId& participantId)
00381 {
00382   try {
00383     return get_dcps_info()->attach_participant(domainId, participantId);
00384   } catch (const CORBA::Exception& ex) {
00385     ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: ");
00386     return false;
00387   }
00388 }
00389 
00390 DCPS::AddDomainStatus
00391 InfoRepoDiscovery::add_domain_participant(DDS::DomainId_t domainId,
00392                                           const DDS::DomainParticipantQos& qos)
00393 {
00394   try {
00395     const DCPSInfo_var info = get_dcps_info();
00396     if (!CORBA::is_nil(info)) {
00397       return info->add_domain_participant(domainId, qos);
00398     }
00399   } catch (const CORBA::Exception& ex) {
00400     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: ");
00401   }
00402   const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
00403   return ads;
00404 }
00405 
00406 bool
00407 InfoRepoDiscovery::remove_domain_participant(DDS::DomainId_t domainId,
00408                                              const RepoId& participantId)
00409 {
00410   try {
00411     get_dcps_info()->remove_domain_participant(domainId, participantId);
00412     return true;
00413   } catch (const CORBA::Exception& ex) {
00414     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: ");
00415     return false;
00416   }
00417 }
00418 
00419 bool
00420 InfoRepoDiscovery::ignore_domain_participant(DDS::DomainId_t domainId,
00421                                              const RepoId& myParticipantId,
00422                                              const RepoId& ignoreId)
00423 {
00424   try {
00425     get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
00426     return true;
00427   } catch (const CORBA::Exception& ex) {
00428     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: ");
00429     return false;
00430   }
00431 }
00432 
00433 bool
00434 InfoRepoDiscovery::update_domain_participant_qos(DDS::DomainId_t domainId,
00435                                                  const RepoId& participant,
00436                                                  const DDS::DomainParticipantQos& qos)
00437 {
00438   try {
00439     return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
00440   } catch (const CORBA::Exception& ex) {
00441     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: ");
00442     return false;
00443   }
00444 }
00445 
00446 // Topic operations:
00447 
00448 DCPS::TopicStatus
00449 InfoRepoDiscovery::assert_topic(DCPS::RepoId_out topicId, DDS::DomainId_t domainId,
00450                                 const RepoId& participantId, const char* topicName,
00451                                 const char* dataTypeName, const DDS::TopicQos& qos,
00452                                 bool hasDcpsKey)
00453 {
00454   try {
00455     return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
00456       dataTypeName, qos, hasDcpsKey);
00457   } catch (const CORBA::Exception& ex) {
00458     ex._tao_print_exception("ERROR: InfoRepoDiscovery::assert_topic: ");
00459     return DCPS::INTERNAL_ERROR;
00460   }
00461 }
00462 
00463 DCPS::TopicStatus
00464 InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId, const char* topicName,
00465                               CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
00466                               DCPS::RepoId_out topicId)
00467 {
00468   try {
00469     return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
00470   } catch (const CORBA::Exception& ex) {
00471     ex._tao_print_exception("ERROR: InfoRepoDiscovery::find_topic: ");
00472     return DCPS::INTERNAL_ERROR;
00473   }
00474 }
00475 
00476 DCPS::TopicStatus
00477 InfoRepoDiscovery::remove_topic(DDS::DomainId_t domainId, const RepoId& participantId,
00478                                 const RepoId& topicId)
00479 {
00480   try {
00481     return get_dcps_info()->remove_topic(domainId, participantId, topicId);
00482   } catch (const CORBA::Exception& ex) {
00483     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: ");
00484     return DCPS::INTERNAL_ERROR;
00485   }
00486 }
00487 
00488 bool
00489 InfoRepoDiscovery::ignore_topic(DDS::DomainId_t domainId, const RepoId& myParticipantId,
00490                                 const RepoId& ignoreId)
00491 {
00492   try {
00493     get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
00494     return true;
00495   } catch (const CORBA::Exception& ex) {
00496     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: ");
00497     return false;
00498   }
00499 }
00500 
00501 bool
00502 InfoRepoDiscovery::update_topic_qos(const RepoId& topicId, DDS::DomainId_t domainId,
00503                                     const RepoId& participantId, const DDS::TopicQos& qos)
00504 {
00505   try {
00506     return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
00507   } catch (const CORBA::Exception& ex) {
00508     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: ");
00509     return false;
00510   }
00511 }
00512 
00513 
00514 // Publication operations:
00515 
00516 RepoId
00517 InfoRepoDiscovery::add_publication(DDS::DomainId_t domainId,
00518                                    const RepoId& participantId,
00519                                    const RepoId& topicId,
00520                                    DCPS::DataWriterCallbacks* publication,
00521                                    const DDS::DataWriterQos& qos,
00522                                    const DCPS::TransportLocatorSeq& transInfo,
00523                                    const DDS::PublisherQos& publisherQos)
00524 {
00525   RepoId pubId;
00526 
00527   try {
00528     DCPS::DataWriterRemoteImpl* writer_remote_impl = 0;
00529     ACE_NEW_RETURN(writer_remote_impl,
00530                    DataWriterRemoteImpl(publication),
00531                    DCPS::GUID_UNKNOWN);
00532 
00533     //this is taking ownership of the DataWriterRemoteImpl (server side) allocated above
00534     PortableServer::ServantBase_var writer_remote(writer_remote_impl);
00535 
00536     //this is the client reference to the DataWriterRemoteImpl
00537     OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
00538       servant_to_remote_reference(writer_remote_impl, orb_);
00539 
00540     pubId = get_dcps_info()->add_publication(domainId, participantId, topicId,
00541       dr_remote_obj, qos, transInfo, publisherQos);
00542 
00543     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00544     // take ownership of the client allocated above
00545     dataWriterMap_[pubId] = dr_remote_obj;
00546 
00547   } catch (const CORBA::Exception& ex) {
00548     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: ");
00549     pubId = DCPS::GUID_UNKNOWN;
00550   }
00551 
00552   return pubId;
00553 }
00554 
00555 bool
00556 InfoRepoDiscovery::remove_publication(DDS::DomainId_t domainId,
00557                                       const RepoId& participantId,
00558                                       const RepoId& publicationId)
00559 {
00560   {
00561     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00562     removeDataWriterRemote(publicationId);
00563   }
00564   bool removed = false;
00565   try {
00566     get_dcps_info()->remove_publication(domainId, participantId, publicationId);
00567     removed = true;
00568   } catch (const CORBA::Exception& ex) {
00569     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: ");
00570   }
00571 
00572   return removed;
00573 }
00574 
00575 bool
00576 InfoRepoDiscovery::ignore_publication(DDS::DomainId_t domainId,
00577                                       const RepoId& participantId,
00578                                       const RepoId& ignoreId)
00579 {
00580   try {
00581     get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
00582     return true;
00583   } catch (const CORBA::Exception& ex) {
00584     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: ");
00585     return false;
00586   }
00587 }
00588 
00589 bool
00590 InfoRepoDiscovery::update_publication_qos(DDS::DomainId_t domainId,
00591                                           const RepoId& participantId,
00592                                           const RepoId& dwId,
00593                                           const DDS::DataWriterQos& qos,
00594                                           const DDS::PublisherQos& publisherQos)
00595 {
00596   try {
00597     return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
00598       qos, publisherQos);
00599   } catch (const CORBA::Exception& ex) {
00600     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: ");
00601     return false;
00602   }
00603 }
00604 
00605 
00606 // Subscription operations:
00607 
00608 RepoId
00609 InfoRepoDiscovery::add_subscription(DDS::DomainId_t domainId,
00610                                     const RepoId& participantId,
00611                                     const RepoId& topicId,
00612                                     DCPS::DataReaderCallbacks* subscription,
00613                                     const DDS::DataReaderQos& qos,
00614                                     const DCPS::TransportLocatorSeq& transInfo,
00615                                     const DDS::SubscriberQos& subscriberQos,
00616                                     const char* filterClassName,
00617                                     const char* filterExpr,
00618                                     const DDS::StringSeq& params)
00619 {
00620   RepoId subId;
00621 
00622   try {
00623     DCPS::DataReaderRemoteImpl* reader_remote_impl = 0;
00624     ACE_NEW_RETURN(reader_remote_impl,
00625                    DataReaderRemoteImpl(subscription),
00626                    DCPS::GUID_UNKNOWN);
00627 
00628     //this is taking ownership of the DataReaderRemoteImpl (server side) allocated above
00629     PortableServer::ServantBase_var reader_remote(reader_remote_impl);
00630 
00631     //this is the client reference to the DataReaderRemoteImpl
00632     OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
00633       servant_to_remote_reference(reader_remote_impl, orb_);
00634 
00635     subId = get_dcps_info()->add_subscription(domainId, participantId, topicId,
00636                                               dr_remote_obj, qos, transInfo, subscriberQos,
00637                                               filterClassName, filterExpr, params);
00638 
00639     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN);
00640     // take ownership of the client allocated above
00641     dataReaderMap_[subId] = dr_remote_obj;
00642 
00643   } catch (const CORBA::Exception& ex) {
00644     ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: ");
00645     subId = DCPS::GUID_UNKNOWN;
00646   }
00647   return subId;
00648 }
00649 
00650 bool
00651 InfoRepoDiscovery::remove_subscription(DDS::DomainId_t domainId,
00652                                        const RepoId& participantId,
00653                                        const RepoId& subscriptionId)
00654 {
00655   {
00656     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
00657     removeDataReaderRemote(subscriptionId);
00658   }
00659   bool removed = false;
00660   try {
00661     get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
00662     removed = true;
00663   } catch (const CORBA::Exception& ex) {
00664     ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: ");
00665   }
00666 
00667   return removed;
00668 }
00669 
00670 bool
00671 InfoRepoDiscovery::ignore_subscription(DDS::DomainId_t domainId,
00672                                        const RepoId& participantId,
00673                                        const RepoId& ignoreId)
00674 {
00675   try {
00676     get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
00677     return true;
00678   } catch (const CORBA::Exception& ex) {
00679     ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: ");
00680     return false;
00681   }
00682 }
00683 
00684 bool
00685 InfoRepoDiscovery::update_subscription_qos(DDS::DomainId_t domainId,
00686                                            const RepoId& participantId,
00687                                            const RepoId& drId,
00688                                            const DDS::DataReaderQos& qos,
00689                                            const DDS::SubscriberQos& subQos)
00690 {
00691   try {
00692     return get_dcps_info()->update_subscription_qos(domainId, participantId,
00693       drId, qos, subQos);
00694   } catch (const CORBA::Exception& ex) {
00695     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: ");
00696     return false;
00697   }
00698 }
00699 
00700 bool
00701 InfoRepoDiscovery::update_subscription_params(DDS::DomainId_t domainId,
00702                                               const RepoId& participantId,
00703                                               const RepoId& subId,
00704                                               const DDS::StringSeq& params)
00705 
00706 {
00707   try {
00708     return get_dcps_info()->update_subscription_params(domainId, participantId,
00709       subId, params);
00710   } catch (const CORBA::Exception& ex) {
00711     ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: ");
00712     return false;
00713   }
00714 }
00715 
00716 
00717 // Managing reader/writer associations:
00718 
00719 void
00720 InfoRepoDiscovery::association_complete(DDS::DomainId_t domainId,
00721                                         const RepoId& participantId,
00722                                         const RepoId& localId, const RepoId& remoteId)
00723 {
00724   try {
00725     get_dcps_info()->association_complete(domainId, participantId, localId, remoteId);
00726   } catch (const CORBA::Exception& ex) {
00727     ex._tao_print_exception("ERROR: InfoRepoDiscovery::association_complete: ");
00728   }
00729 }
00730 
00731 void
00732 InfoRepoDiscovery::removeDataReaderRemote(const RepoId& subscriptionId)
00733 {
00734   DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId);
00735   if (drr == dataReaderMap_.end()) {
00736     ACE_ERROR((LM_ERROR,
00737                ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
00738                ACE_TEXT(" could not find DataReader for subscriptionId.\n")));
00739     return;
00740   }
00741 
00742   DataReaderRemoteImpl* impl =
00743     remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_);
00744   impl->detach_parent();
00745   deactivate_remote_object(drr->second.in(), orb_);
00746 
00747   dataReaderMap_.erase(drr);
00748 }
00749 
00750 void
00751 InfoRepoDiscovery::removeDataWriterRemote(const RepoId& publicationId)
00752 {
00753   DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId);
00754   if (dwr == dataWriterMap_.end()) {
00755     ACE_ERROR((LM_ERROR,
00756                ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
00757                ACE_TEXT(" could not find DataWriter for publicationId.\n")));
00758     return;
00759   }
00760 
00761   DataWriterRemoteImpl* impl =
00762     remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_);
00763   impl->detach_parent();
00764   deactivate_remote_object(dwr->second.in(), orb_);
00765 
00766   dataWriterMap_.erase(dwr);
00767 }
00768 
00769 namespace {
00770   const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00771 }
00772 
00773 int
00774 InfoRepoDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00775 {
00776   const ACE_Configuration_Section_Key& root = cf.root_section();
00777   ACE_Configuration_Section_Key repo_sect;
00778 
00779   if (cf.open_section(root, REPO_SECTION_NAME, 0, repo_sect) != 0) {
00780     if (DCPS_debug_level > 0) {
00781       // This is not an error if the configuration file does not have
00782       // any repository (sub)section. The code default configuration will be used.
00783       ACE_DEBUG((LM_NOTICE,
00784                  ACE_TEXT("(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
00785                  ACE_TEXT("failed to open [%s] section.\n"),
00786                  REPO_SECTION_NAME));
00787     }
00788 
00789     return 0;
00790 
00791   } else {
00792     // Ensure there are no properties in this section
00793     ValueMap vm;
00794     if (pullValues(cf, repo_sect, vm) > 0) {
00795       // There are values inside [repo]
00796       ACE_ERROR_RETURN((LM_ERROR,
00797                         ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00798                         ACE_TEXT("repo sections must have a subsection name\n")),
00799                        -1);
00800     }
00801     // Process the subsections of this section (the individual repos)
00802     KeyList keys;
00803     if (processSections( cf, repo_sect, keys ) != 0) {
00804       ACE_ERROR_RETURN((LM_ERROR,
00805                         ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00806                         ACE_TEXT("too many nesting layers in the [repo] section.\n")),
00807                        -1);
00808     }
00809 
00810     // Loop through the [repo/*] sections
00811     for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
00812       std::string repo_name = (*it).first;
00813 
00814       ValueMap values;
00815       pullValues( cf, (*it).second, values );
00816       Discovery::RepoKey repoKey = Discovery::DEFAULT_REPO;
00817       bool repoKeySpecified = false, bitIpSpecified = false,
00818         bitPortSpecified = false;
00819       std::string repoIor;
00820       int bitPort = 0;
00821       std::string bitIp;
00822       for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
00823         std::string name = (*it).first;
00824         if (name == "RepositoryKey") {
00825           repoKey = (*it).second;
00826           repoKeySpecified = true;
00827           if (DCPS_debug_level > 0) {
00828             ACE_DEBUG((LM_DEBUG,
00829                        ACE_TEXT("(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
00830                        repo_name.c_str(), repoKey.c_str()));
00831           }
00832 
00833         } else if (name == "RepositoryIor") {
00834           repoIor = (*it).second;
00835 
00836           if (DCPS_debug_level > 0) {
00837             ACE_DEBUG((LM_DEBUG,
00838                        ACE_TEXT("(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
00839                        repo_name.c_str(), repoIor.c_str()));
00840           }
00841         } else if (name == "DCPSBitTransportIPAddress") {
00842           bitIp = (*it).second;
00843           bitIpSpecified = true;
00844           if (DCPS_debug_level > 0) {
00845             ACE_DEBUG((LM_DEBUG,
00846                        ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
00847                        repo_name.c_str(), bitIp.c_str()));
00848           }
00849         } else if (name == "DCPSBitTransportPort") {
00850           std::string value = (*it).second;
00851           bitPort = ACE_OS::atoi(value.c_str());
00852           bitPortSpecified = true;
00853           if (convertToInteger(value, bitPort)) {
00854           } else {
00855             ACE_ERROR_RETURN((LM_ERROR,
00856                               ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00857                               ACE_TEXT("Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
00858                               value.c_str(), repo_name.c_str()),
00859                              -1);
00860           }
00861           if (DCPS_debug_level > 0) {
00862             ACE_DEBUG((LM_DEBUG,
00863                        ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
00864                        repo_name.c_str(), bitPort));
00865           }
00866         } else {
00867           ACE_ERROR_RETURN((LM_ERROR,
00868                             ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00869                             ACE_TEXT("Unexpected entry (%C) in [repository/%C] section.\n"),
00870                             name.c_str(), repo_name.c_str()),
00871                            -1);
00872         }
00873       }
00874 
00875       if (values.find("RepositoryIor") == values.end()) {
00876         ACE_ERROR_RETURN((LM_ERROR,
00877                           ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
00878                           ACE_TEXT("Repository section [repository/%C] section is missing RepositoryIor value.\n"),
00879                           repo_name.c_str()),
00880                          -1);
00881       }
00882 
00883       if (!repoKeySpecified) {
00884         // If the RepositoryKey option was not specified, use the section
00885         // name as the repo key
00886         repoKey = repo_name;
00887       }
00888       InfoRepoDiscovery_rch discovery =
00889         new InfoRepoDiscovery(repoKey, repoIor.c_str());
00890       if (bitPortSpecified) discovery->bit_transport_port(bitPort);
00891       if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
00892       TheServiceParticipant->add_discovery(
00893         DCPS::static_rchandle_cast<Discovery>(discovery));
00894     }
00895   }
00896 
00897   return 0;
00898 }
00899 
00900 void
00901 InfoRepoDiscovery::OrbRunner::shutdown()
00902 {
00903   orb_->shutdown();
00904   wait();
00905 }
00906 
00907 InfoRepoDiscovery::OrbRunner* InfoRepoDiscovery::orb_runner_;
00908 ACE_Thread_Mutex InfoRepoDiscovery::mtx_orb_runner_;
00909 
00910 int
00911 InfoRepoDiscovery::OrbRunner::svc()
00912 {
00913   // this method was originally Service_Participant::svc()
00914   bool done = false;
00915 
00916   // Ignore all signals to avoid
00917   //     ERROR: <something descriptive> Interrupted system call
00918   // The main thread will handle signals.
00919   sigset_t set;
00920   ACE_OS::sigfillset(&set);
00921   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00922 
00923   while (!done) {
00924     try {
00925       if (orb_->orb_core()->has_shutdown() == false) {
00926         orb_->run();
00927       }
00928 
00929       done = true;
00930 
00931     } catch (const CORBA::SystemException& sysex) {
00932       sysex._tao_print_exception(
00933         "ERROR: InfoRepoDiscovery::OrbRunner");
00934 
00935     } catch (const CORBA::UserException& userex) {
00936       userex._tao_print_exception(
00937         "ERROR: InfoRepoDiscovery::OrbRunner");
00938 
00939     } catch (const CORBA::Exception& ex) {
00940       ex._tao_print_exception(
00941         "ERROR: InfoRepoDiscovery::OrbRunner");
00942     }
00943 
00944     if (orb_->orb_core()->has_shutdown()) {
00945       done = true;
00946 
00947     } else {
00948       orb_->orb_core()->reactor()->reset_reactor_event_loop();
00949     }
00950   }
00951 
00952   return 0;
00953 }
00954 
00955 
00956 InfoRepoDiscovery::StaticInitializer::StaticInitializer()
00957 {
00958   TheServiceParticipant->register_discovery_type("repository", new Config);
00959 }
00960 
00961 int
00962 IRDiscoveryLoader::init(int, ACE_TCHAR*[])
00963 {
00964   // no-op: since the library is loaded, InfoRepoDiscovery::StaticInitializer
00965   // has already been constructed.
00966   return 0;
00967 }
00968 
00969 ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader);
00970 ACE_STATIC_SVC_DEFINE(
00971   IRDiscoveryLoader,
00972   ACE_TEXT("OpenDDS_InfoRepoDiscovery"),
00973   ACE_SVC_OBJ_T,
00974   &ACE_SVC_NAME(IRDiscoveryLoader),
00975   ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00976   0)
00977 
00978 } // namespace DCPS
00979 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7