DCPSInfoRepoServ.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPSInfo_i.h"
00010 #include "DCPSInfoRepoServ.h"
00011 #include "FederatorConfig.h"
00012 #include "FederatorManagerImpl.h"
00013 #include "ShutdownInterface.h"
00014 #include "PersistenceUpdater.h"
00015 #include "UpdateManager.h"
00016 
00017 #include "dds/DCPS/Service_Participant.h"
00018 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00019 
00020 //If we need BIT support, pull in TCP so that static builds will have it.
00021 #if !defined(DDS_HAS_MINIMUM_BIT)
00022 #include "dds/DCPS/transport/tcp/Tcp.h"
00023 #endif
00024 
00025 #include "tao/ORB_Core.h"
00026 #include "tao/IORTable/IORTable.h"
00027 
00028 #include <orbsvcs/Shutdown_Utilities.h>
00029 
00030 #ifdef ACE_AS_STATIC_LIBS
00031 #include "tao/ImR_Client/ImR_Client.h"
00032 #endif
00033 
00034 #include "ace/Get_Opt.h"
00035 #include "ace/Arg_Shifter.h"
00036 #include "ace/Service_Config.h"
00037 #include "ace/Argv_Type_Converter.h"
00038 
00039 #include <string>
00040 #include <sstream>
00041 
00042 InfoRepo::InfoRepo(int argc, ACE_TCHAR *argv[])
00043 : ior_file_(ACE_TEXT("repo.ior"))
00044 , listen_address_given_(0)
00045 #ifdef DDS_HAS_MINIMUM_BIT
00046 , use_bits_(false)
00047 #else
00048 , use_bits_(true)
00049 #endif
00050 , resurrect_(true)
00051 , finalized_(false)
00052 , servant_finalized_(false)
00053 , federator_(this->federatorConfig_)
00054 , federatorConfig_(argc, argv)
00055 , lock_()
00056 , cond_(lock_)
00057 , shutdown_complete_(false)
00058 , dispatch_cleanup_delay_(30,0)
00059 {
00060   try {
00061     this->init();
00062   } catch (...) {
00063     this->finalize();
00064     throw;
00065   }
00066 }
00067 
00068 InfoRepo::~InfoRepo()
00069 {
00070   this->finalize();
00071 }
00072 
00073 void
00074 InfoRepo::run()
00075 {
00076   this->shutdown_complete_ = false;
00077   this->orb_->run();
00078   this->finalize();
00079   ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00080   this->shutdown_complete_ = true;
00081   this->cond_.signal();
00082 }
00083 
00084 void
00085 InfoRepo::finalize()
00086 {
00087   if (this->finalized_) {
00088     return;
00089   }
00090 
00091   if (!this->servant_finalized_) {
00092     // reached if the ImR caused the ORB to shut down,
00093     // which bypasses InfoRepo::handle_exception()
00094     this->info_servant_->finalize();
00095     this->federator_.finalize();
00096     this->servant_finalized_ = true;
00097   }
00098 
00099   TheServiceParticipant->shutdown();
00100 
00101   if (!CORBA::is_nil(this->orb_)) {
00102     this->orb_->destroy();
00103   }
00104 
00105   this->finalized_ = true;
00106 }
00107 
00108 int
00109 InfoRepo::handle_exception(ACE_HANDLE /* fd */)
00110 {
00111   // these should occur before ORB::shutdown() since they use the ORB/reactor
00112   this->info_servant_->finalize();
00113   this->federator_.finalize();
00114   this->servant_finalized_ = true;
00115 
00116   this->orb_->shutdown(true);
00117   return 0;
00118 }
00119 
00120 void
00121 InfoRepo::shutdown()
00122 {
00123   this->orb_->orb_core()->reactor()->notify(this);
00124   // reactor will invoke our InfoRepo::handle_exception()
00125 }
00126 
00127 void
00128 InfoRepo::sync_shutdown()
00129 {
00130   this->shutdown();
00131   ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00132 
00133   while (!this->shutdown_complete_) {
00134     this->cond_.wait();
00135   }
00136 }
00137 
00138 void
00139 InfoRepo::usage(const ACE_TCHAR* cmd)
00140 {
00141   // NOTE: The federation arguments are parsed early by the
00142   //       FederationConfig object.
00143   ACE_DEBUG((LM_INFO,
00144              ACE_TEXT("Usage:\n")
00145              ACE_TEXT("  %s\n")
00146              ACE_TEXT("    -a <address> listening address for Built-In Topics\n")
00147              ACE_TEXT("    -o <file> write ior to file\n")
00148              ACE_TEXT("    -NOBITS disable the Built-In Topics\n")
00149              ACE_TEXT("    -z turn on verbose Transport logging\n")
00150              ACE_TEXT("    -r Resurrect from persistent file\n")
00151              ACE_TEXT("    -FederatorConfig <file> configure federation from <file>\n")
00152              ACE_TEXT("    -FederationId <number> value for this repository\n")
00153              ACE_TEXT("    -FederateWith <ior> federate initially with object at <ior>\n")
00154              ACE_TEXT("    -ReassociateDelay <msec> delay between reassociations\n")
00155              ACE_TEXT("    -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n")
00156              ACE_TEXT("    -?\n")
00157              ACE_TEXT("\n"),
00158              cmd));
00159 }
00160 
00161 void
00162 InfoRepo::parse_args(int argc, ACE_TCHAR *argv[])
00163 {
00164   ACE_Arg_Shifter arg_shifter(argc, argv);
00165 
00166   const ACE_TCHAR* current_arg = 0;
00167 
00168   while (arg_shifter.is_anything_left()) {
00169     if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) {
00170       this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
00171       this->listen_address_given_ = 1;
00172       arg_shifter.consume_arg();
00173     // Must check for -ReassociateDelay before -r
00174     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) {
00175       long msec = ACE_OS::atoi(current_arg);
00176       this->reassociate_delay_.msec(msec);
00177 
00178       arg_shifter.consume_arg();
00179     } else if ((current_arg = arg_shifter.get_the_parameter
00180                               (ACE_TEXT("-r"))) != 0) {
00181       int p = ACE_OS::atoi(current_arg);
00182       this->resurrect_ = true;
00183 
00184       if (p == 0) {
00185         this->resurrect_ = false;
00186       }
00187 
00188       arg_shifter.consume_arg();
00189 
00190     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) {
00191       this->ior_file_ = current_arg;
00192       arg_shifter.consume_arg();
00193 
00194     } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) {
00195       this->use_bits_ = false;
00196       arg_shifter.consume_arg();
00197 
00198     } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) {
00199       TURN_ON_VERBOSE_DEBUG;
00200       arg_shifter.consume_arg();
00201 
00202     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) {
00203       long sec = ACE_OS::atoi(current_arg);
00204       this->dispatch_cleanup_delay_.sec(sec);
00205       arg_shifter.consume_arg();
00206 
00207     }
00208 
00209     // The '-?' option
00210     else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) {
00211       this->usage(argv[0]);
00212       throw InitError("Usage");
00213     }
00214 
00215     // Anything else we just skip
00216 
00217     else {
00218       arg_shifter.ignore_arg();
00219     }
00220   }
00221 }
00222 
00223 void
00224 InfoRepo::init()
00225 {
00226   ACE_Argv_Type_Converter cvt(this->federatorConfig_.argc(),
00227                               this->federatorConfig_.argv());
00228   this->orb_ = CORBA::ORB_init(cvt.get_argc(), cvt.get_ASCII_argv(), "");
00229 
00230   this->info_servant_ =
00231     new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this,
00232                            this->federatorConfig_.federationId());
00233 
00234   // Install the DCPSInfo_i into the Federator::Manager.
00235   this->federator_.info() = this->info_servant_.in();
00236 
00237   CORBA::Object_var obj =
00238     this->orb_->resolve_initial_references("RootPOA");
00239   PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
00240 
00241   PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
00242 
00243   // Use persistent and user id POA policies so the Info Repo's
00244   // object references are consistent.
00245   CORBA::PolicyList policies(2);
00246   policies.length(2);
00247   policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID);
00248   policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
00249   PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo",
00250                                                           poa_manager,
00251                                                           policies);
00252 
00253   // Creation of the new POAs over, so destroy the Policy_ptr's.
00254   for (CORBA::ULong i = 0; i < policies.length(); ++i) {
00255     policies[i]->destroy();
00256   }
00257 
00258   PortableServer::ObjectId_var oid =
00259     PortableServer::string_to_ObjectId("InfoRepo");
00260   info_poa->activate_object_with_id(oid, this->info_servant_.in());
00261   obj = info_poa->id_to_reference(oid);
00262   // the object is created locally, so it is safe to do an
00263   // _unchecked_narrow, this was needed to prevent an exception
00264   // when dealing with ImR-ified objects
00265   OpenDDS::DCPS::DCPSInfo_var info_repo =
00266     OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj);
00267 
00268   CORBA::String_var objref_str =
00269     orb_->object_to_string(info_repo);
00270 
00271   // Initialize the DomainParticipantFactory
00272   DDS::DomainParticipantFactory_var dpf =
00273     TheParticipantFactoryWithArgs(cvt.get_argc(),
00274                                   cvt.get_TCHAR_argv());
00275 
00276   // We need parse the command line options for DCPSInfoRepo after parsing DCPS specific
00277   // command line options.
00278 
00279   // Check the non-ORB arguments.
00280   this->parse_args(cvt.get_argc(), cvt.get_TCHAR_argv());
00281 
00282   // Activate the POA manager before initialize built-in-topics
00283   // so invocations can be processed.
00284   poa_manager->activate();
00285 
00286   if (this->use_bits_) {
00287     if (this->info_servant_->init_transport(this->listen_address_given_,
00288         this->listen_address_str_.c_str())
00289         != 0 /* init_transport returns 0 for success */) {
00290       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00291                  ACE_TEXT("Unable to initialize transport.\n")));
00292       throw InitError("Unable to initialize transport.");
00293     }
00294 
00295   } else {
00296     TheServiceParticipant->set_BIT(false);
00297   }
00298 
00299   // This needs to be done after initialization since we create the reference
00300   // to ourselves in the service here.
00301   OpenDDS::DCPS::Service_Participant* serv_part = TheServiceParticipant;
00302   serv_part->set_repo_ior(objref_str, OpenDDS::DCPS::Discovery::DEFAULT_REPO);
00303 
00304   OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 /*domainId*/);
00305   OpenDDS::DCPS::InfoRepoDiscovery_rch ird =
00306     OpenDDS::DCPS::static_rchandle_cast<OpenDDS::DCPS::InfoRepoDiscovery>(disc);
00307   if (!ird->set_ORB(orb_)) {
00308     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00309                ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n")));
00310     throw InitError("Unable to set the ORB in InfoRepoDiscovery.");
00311   }
00312 
00313   // Initialize persistence _after_ initializing the participant factory
00314   // and intializing the transport.
00315   if (!this->info_servant_->init_persistence()) {
00316     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00317                ACE_TEXT("Unable to initialize persistence.\n")));
00318     throw InitError("Unable to initialize persistence.");
00319   }
00320 
00321   // Initialize reassociation.
00322   if (this->reassociate_delay_ != ACE_Time_Value::zero &&
00323      !this->info_servant_->init_reassociation(this->reassociate_delay_)) {
00324     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00325                ACE_TEXT("Unable to initialize reassociation.\n")));
00326     throw InitError("Unable to initialize reassociation.");
00327   }
00328 
00329   // Initialize dispatch checking
00330   if (this->dispatch_cleanup_delay_ != ACE_Time_Value::zero &&
00331      !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) {
00332     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00333                ACE_TEXT("Unable to initialize Dispatch checking.\n")));
00334     throw InitError("Unable to initialize dispatch checking.");
00335   }
00336 
00337   // Fire up the federator.
00338   OpenDDS::Federator::Manager_var federator;
00339   CORBA::String_var               federator_ior;
00340 
00341   if (federator_.id().overridden()) {
00342     oid = PortableServer::string_to_ObjectId("Federator");
00343     info_poa->activate_object_with_id(oid, &federator_);
00344     obj = info_poa->id_to_reference(oid);
00345     federator = OpenDDS::Federator::Manager::_narrow(obj);
00346 
00347     federator_ior = orb_->object_to_string(federator);
00348 
00349     // Add a local repository reference that can be returned via a
00350     // remote call to a peer.
00351     this->federator_.localRepo(info_repo);
00352 
00353     // It should be safe to initialize the federation mechanism at this
00354     // point.  What we really needed to wait for is the initialization of
00355     // the service components - like the DomainParticipantFactory and the
00356     // repository bindings.
00357     // N.B. This is done *before* being added to the IOR table to avoid any
00358     //      races with an eager client.
00359     this->federator_.orb(this->orb_);
00360 
00361     //
00362     // Add the federator to the info_servant update manager as an
00363     // additional updater interface to be called.
00364     // N.B. This needs to be done *after* the call to load_domains()
00365     //      since that is where the update manager is initialized in the
00366     //      info startup sequencing.
00367     this->info_servant_->add(&this->federator_);
00368   }
00369 
00370   // Grab the IOR table.
00371   CORBA::Object_var table_object =
00372     this->orb_->resolve_initial_references("IORTable");
00373 
00374   IORTable::Table_var adapter = IORTable::Table::_narrow(table_object);
00375 
00376   if (CORBA::is_nil(adapter)) {
00377     ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00378 
00379   } else {
00380     adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str);
00381 
00382     if (this->federator_.id().overridden()) {
00383       // Bind to '/Federator'
00384       adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior);
00385 
00386       // Bind to '/Federator/1382379631'
00387       std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY);
00388       buffer << "/" << std::dec << this->federatorConfig_.federationDomain();
00389       adapter->bind(buffer.str().c_str(), federator_ior);
00390     }
00391   }
00392 
00393   FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w"));
00394 
00395   if (output_file == 0) {
00396     ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"),
00397                ior_file_.c_str()));
00398     throw InitError("Unable to open IOR file.");
00399   }
00400 
00401   ACE_OS::fprintf(output_file, "%s", objref_str.in());
00402   ACE_OS::fclose(output_file);
00403 
00404   // Initial federation join if specified on command line.
00405   if (this->federator_.id().overridden()
00406        && !this->federatorConfig_.federateIor().empty()) {
00407     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00408       ACE_DEBUG((LM_DEBUG,
00409                  ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ")
00410                  ACE_TEXT("joining federation with repository %s\n"),
00411                  this->federatorConfig_.federateIor().c_str()));
00412     }
00413 
00414     obj = this->orb_->string_to_object(
00415           this->federatorConfig_.federateIor().c_str());
00416 
00417     if (CORBA::is_nil(obj)) {
00418       ACE_ERROR((LM_ERROR,
00419                  ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"),
00420                  this->federatorConfig_.federateIor().c_str()));
00421       throw InitError("Unable to resolve IOR for initial federation.");
00422     }
00423 
00424     OpenDDS::Federator::Manager_var peer =
00425       OpenDDS::Federator::Manager::_narrow(obj);
00426 
00427     if (CORBA::is_nil(peer)) {
00428       ACE_ERROR((LM_ERROR,
00429                  ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"),
00430                  this->federatorConfig_.federateIor().c_str()));
00431       throw InitError("Unable to narrow peer for initial federation.");
00432     }
00433 
00434     // Actually join.
00435     peer->join_federation(federator,
00436                           this->federatorConfig_.federationDomain());
00437   }
00438 }
00439 
00440 InfoRepo_Shutdown::InfoRepo_Shutdown(InfoRepo &ir)
00441 : ir_(ir)
00442 {
00443 }
00444 
00445 void
00446 InfoRepo_Shutdown::operator()(int which_signal)
00447 {
00448   ACE_DEBUG((LM_DEBUG,
00449              "InfoRepo_Shutdown: shutting down on signal %d\n",
00450              which_signal));
00451   this->ir_.shutdown();
00452 }

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7