DCPSInfoRepoServ.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPSInfoRepoServ.h"
00012 #include "FederatorConfig.h"
00013 #include "FederatorManagerImpl.h"
00014 #include "ShutdownInterface.h"
00015 #include "PersistenceUpdater.h"
00016 #include "UpdateManager.h"
00017 
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00020 
00021 //If we need BIT support, pull in TCP so that static builds will have it.
00022 #if !defined(DDS_HAS_MINIMUM_BIT)
00023 #include "dds/DCPS/transport/tcp/Tcp.h"
00024 #endif
00025 
00026 #include "tao/IORTable/IORTable.h"
00027 #include "tao/BiDir_GIOP/BiDirGIOP.h"
00028 
00029 #include <orbsvcs/Shutdown_Utilities.h>
00030 
00031 #ifdef ACE_AS_STATIC_LIBS
00032 #include "tao/ImR_Client/ImR_Client.h"
00033 #endif
00034 
00035 #include "ace/Get_Opt.h"
00036 #include "ace/Arg_Shifter.h"
00037 #include "ace/Service_Config.h"
00038 #include "ace/Argv_Type_Converter.h"
00039 
00040 #include <string>
00041 #include <sstream>
00042 
00043 InfoRepo::InfoRepo(int argc, ACE_TCHAR *argv[])
00044 : ior_file_(ACE_TEXT("repo.ior"))
00045 , listen_address_given_(0)
00046 #ifdef DDS_HAS_MINIMUM_BIT
00047 , use_bits_(false)
00048 #else
00049 , use_bits_(true)
00050 #endif
00051 , resurrect_(true)
00052 , finalized_(false)
00053 , servant_finalized_(false)
00054 , federator_(this->federatorConfig_)
00055 , federatorConfig_(argc, argv)
00056 , lock_()
00057 , cond_(lock_)
00058 , shutdown_complete_(false)
00059 , dispatch_cleanup_delay_(30,0)
00060 {
00061   try {
00062     this->init();
00063   } catch (...) {
00064     this->finalize();
00065     throw;
00066   }
00067 }
00068 
00069 InfoRepo::~InfoRepo()
00070 {
00071   this->finalize();
00072 }
00073 
00074 void
00075 InfoRepo::run()
00076 {
00077   this->shutdown_complete_ = false;
00078   this->orb_->run();
00079   this->finalize();
00080   ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00081   this->shutdown_complete_ = true;
00082   this->cond_.signal();
00083 }
00084 
00085 void
00086 InfoRepo::finalize()
00087 {
00088   if (this->finalized_) {
00089     return;
00090   }
00091 
00092   if (!this->servant_finalized_) {
00093     // reached if the ImR caused the ORB to shut down,
00094     // which bypasses InfoRepo::handle_exception()
00095     this->info_servant_->finalize();
00096     this->federator_.finalize();
00097     TheServiceParticipant->shutdown();
00098     this->servant_finalized_ = true;
00099   }
00100 
00101   if (!CORBA::is_nil(this->orb_)) {
00102     try {
00103       this->orb_->destroy();
00104     }
00105     catch (const CORBA::Exception&) {
00106       //finalizing anyway, not an issue.
00107     }
00108   }
00109 
00110   this->finalized_ = true;
00111 }
00112 
00113 int
00114 InfoRepo::handle_exception(ACE_HANDLE /* fd */)
00115 {
00116   // these should occur before ORB::shutdown() since they use the ORB/reactor
00117   this->info_servant_->finalize();
00118   this->federator_.finalize();
00119   TheServiceParticipant->shutdown();
00120   this->servant_finalized_ = true;
00121 
00122   this->orb_->shutdown(true);
00123   return 0;
00124 }
00125 
00126 void
00127 InfoRepo::shutdown()
00128 {
00129   this->orb_->orb_core()->reactor()->notify(this);
00130   // reactor will invoke our InfoRepo::handle_exception()
00131 }
00132 
00133 void
00134 InfoRepo::sync_shutdown()
00135 {
00136   this->shutdown();
00137   ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00138 
00139   while (!this->shutdown_complete_) {
00140     this->cond_.wait();
00141   }
00142 }
00143 
00144 void
00145 InfoRepo::usage(const ACE_TCHAR* cmd)
00146 {
00147   // NOTE: The federation arguments are parsed early by the
00148   //       FederationConfig object.
00149   ACE_DEBUG((LM_INFO,
00150              ACE_TEXT("Usage:\n")
00151              ACE_TEXT("  %s\n")
00152              ACE_TEXT("    -a <address> listening address for Built-In Topics\n")
00153              ACE_TEXT("    -o <file> write ior to file\n")
00154              ACE_TEXT("    -NOBITS disable the Built-In Topics\n")
00155              ACE_TEXT("    -z turn on verbose Transport logging\n")
00156              ACE_TEXT("    -r Resurrect from persistent file\n")
00157              ACE_TEXT("    -FederatorConfig <file> configure federation from <file>\n")
00158              ACE_TEXT("    -FederationId <number> value for this repository\n")
00159              ACE_TEXT("    -FederateWith <ior> federate initially with object at <ior>\n")
00160              ACE_TEXT("    -ReassociateDelay <msec> delay between reassociations\n")
00161              ACE_TEXT("    -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n")
00162              ACE_TEXT("    -?\n")
00163              ACE_TEXT("\n"),
00164              cmd));
00165 }
00166 
00167 void
00168 InfoRepo::parse_args(int argc, ACE_TCHAR *argv[])
00169 {
00170   ACE_Arg_Shifter arg_shifter(argc, argv);
00171 
00172   const ACE_TCHAR* current_arg = 0;
00173 
00174   while (arg_shifter.is_anything_left()) {
00175     if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) {
00176       this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
00177       this->listen_address_given_ = 1;
00178       arg_shifter.consume_arg();
00179     // Must check for -ReassociateDelay before -r
00180     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) {
00181       long msec = ACE_OS::atoi(current_arg);
00182       this->reassociate_delay_.msec(msec);
00183 
00184       arg_shifter.consume_arg();
00185     } else if ((current_arg = arg_shifter.get_the_parameter
00186                               (ACE_TEXT("-r"))) != 0) {
00187       int p = ACE_OS::atoi(current_arg);
00188       this->resurrect_ = true;
00189 
00190       if (p == 0) {
00191         this->resurrect_ = false;
00192       }
00193 
00194       arg_shifter.consume_arg();
00195 
00196     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) {
00197       this->ior_file_ = current_arg;
00198       arg_shifter.consume_arg();
00199 
00200     } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) {
00201       this->use_bits_ = false;
00202       arg_shifter.consume_arg();
00203 
00204     } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) {
00205       TURN_ON_VERBOSE_DEBUG;
00206       arg_shifter.consume_arg();
00207 
00208     } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) {
00209       long sec = ACE_OS::atoi(current_arg);
00210       this->dispatch_cleanup_delay_.sec(sec);
00211       arg_shifter.consume_arg();
00212 
00213     }
00214 
00215     // The '-?' option
00216     else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) {
00217       this->usage(argv[0]);
00218       throw InitError("Usage");
00219     }
00220 
00221     // Anything else we just skip
00222 
00223     else {
00224       arg_shifter.ignore_arg();
00225     }
00226   }
00227 }
00228 
00229 void
00230 InfoRepo::init()
00231 {
00232   ACE_ARGV args;
00233   args.add(federatorConfig_.argv(), true /*quote arg*/);
00234 
00235   bool use_bidir = true;
00236 
00237   for (int i = 0; i < args.argc() - 1; ++i) {
00238     if (0 == ACE_OS::strcmp(args[i], ACE_TEXT("-DCPSBidirGIOP"))) {
00239       use_bidir = ACE_OS::atoi(args[i + 1]);
00240       break;
00241     }
00242   }
00243 
00244   if (use_bidir) {
00245     const ACE_TCHAR* config[] = {
00246       ACE_TEXT("-ORBSvcConfDirective"),
00247       ACE_TEXT("static Client_Strategy_Factory \"-ORBWaitStrategy rw ")
00248         ACE_TEXT("-ORBTransportMuxStrategy exclusive -ORBConnectStrategy blocked ")
00249         ACE_TEXT("-ORBConnectionHandlerCleanup 1\""),
00250       ACE_TEXT("-ORBSvcConfDirective"),
00251       ACE_TEXT("static Resource_Factory \"-ORBFlushingStrategy blocking\""),
00252       0
00253     };
00254     args.add((ACE_TCHAR**)config, true /*quote arg*/);
00255   }
00256 
00257   int argc = args.argc();
00258   orb_ = CORBA::ORB_init(argc, args.argv());
00259 
00260   this->info_servant_ =
00261     new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this,
00262                            this->federatorConfig_.federationId());
00263 
00264   // Install the DCPSInfo_i into the Federator::Manager.
00265   this->federator_.info() = this->info_servant_.in();
00266 
00267   CORBA::Object_var obj =
00268     this->orb_->resolve_initial_references("RootPOA");
00269   PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
00270 
00271   PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
00272 
00273   // Use persistent and user id POA policies so the Info Repo's
00274   // object references are consistent.
00275   CORBA::PolicyList policies(2 + use_bidir);
00276   policies.length(2 + use_bidir);
00277   policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID);
00278   policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
00279   if (use_bidir) {
00280     CORBA::Any policy;
00281     policy <<= BiDirPolicy::BOTH;
00282     policies[2] =
00283       orb_->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
00284   }
00285   PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo",
00286                                                           poa_manager,
00287                                                           policies);
00288 
00289   // Creation of the new POAs over, so destroy the Policy_ptr's.
00290   for (CORBA::ULong i = 0; i < policies.length(); ++i) {
00291     policies[i]->destroy();
00292   }
00293 
00294   PortableServer::ObjectId_var oid =
00295     PortableServer::string_to_ObjectId("InfoRepo");
00296   info_poa->activate_object_with_id(oid, this->info_servant_.in());
00297   obj = info_poa->id_to_reference(oid);
00298   // the object is created locally, so it is safe to do an
00299   // _unchecked_narrow, this was needed to prevent an exception
00300   // when dealing with ImR-ified objects
00301   OpenDDS::DCPS::DCPSInfo_var info_repo =
00302     OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj);
00303 
00304   CORBA::String_var objref_str =
00305     orb_->object_to_string(info_repo);
00306 
00307   // Initialize the DomainParticipantFactory
00308   DDS::DomainParticipantFactory_var dpf =
00309     TheParticipantFactoryWithArgs(argc, args.argv());
00310 
00311   // We need parse the command line options for DCPSInfoRepo after parsing DCPS specific
00312   // command line options.
00313 
00314   // Check the non-ORB arguments.
00315   this->parse_args(argc, args.argv());
00316 
00317   // Activate the POA manager before initialize built-in-topics
00318   // so invocations can be processed.
00319   poa_manager->activate();
00320 
00321   if (this->use_bits_) {
00322     if (this->info_servant_->init_transport(this->listen_address_given_,
00323         this->listen_address_str_.c_str())
00324         != 0 /* init_transport returns 0 for success */) {
00325       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00326                  ACE_TEXT("Unable to initialize transport.\n")));
00327       throw InitError("Unable to initialize transport.");
00328     }
00329 
00330   } else {
00331     TheServiceParticipant->set_BIT(false);
00332   }
00333 
00334   // This needs to be done after initialization since we create the reference
00335   // to ourselves in the service here.
00336   OpenDDS::DCPS::Service_Participant* serv_part = TheServiceParticipant;
00337   serv_part->set_repo_ior(objref_str, OpenDDS::DCPS::Discovery::DEFAULT_REPO);
00338 
00339   OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 /*domainId*/);
00340   OpenDDS::DCPS::InfoRepoDiscovery_rch ird =
00341     OpenDDS::DCPS::static_rchandle_cast<OpenDDS::DCPS::InfoRepoDiscovery>(disc);
00342   if (!ird->set_ORB(orb_)) {
00343     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00344                ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n")));
00345     throw InitError("Unable to set the ORB in InfoRepoDiscovery.");
00346   }
00347 
00348   // Initialize persistence _after_ initializing the participant factory
00349   // and initializing the transport.
00350   if (!this->info_servant_->init_persistence()) {
00351     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00352                ACE_TEXT("Unable to initialize persistence.\n")));
00353     throw InitError("Unable to initialize persistence.");
00354   }
00355 
00356   // Initialize reassociation.
00357   if (this->reassociate_delay_ != ACE_Time_Value::zero &&
00358      !this->info_servant_->init_reassociation(this->reassociate_delay_)) {
00359     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00360                ACE_TEXT("Unable to initialize reassociation.\n")));
00361     throw InitError("Unable to initialize reassociation.");
00362   }
00363 
00364   // Initialize dispatch checking
00365   if (this->dispatch_cleanup_delay_ != ACE_Time_Value::zero &&
00366      !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) {
00367     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00368                ACE_TEXT("Unable to initialize Dispatch checking.\n")));
00369     throw InitError("Unable to initialize dispatch checking.");
00370   }
00371 
00372   // Fire up the federator.
00373   OpenDDS::Federator::Manager_var federator;
00374   CORBA::String_var               federator_ior;
00375 
00376   if (federator_.id().overridden()) {
00377     oid = PortableServer::string_to_ObjectId("Federator");
00378     info_poa->activate_object_with_id(oid, &federator_);
00379     obj = info_poa->id_to_reference(oid);
00380     federator = OpenDDS::Federator::Manager::_narrow(obj);
00381 
00382     federator_ior = orb_->object_to_string(federator);
00383 
00384     // Add a local repository reference that can be returned via a
00385     // remote call to a peer.
00386     this->federator_.localRepo(info_repo);
00387 
00388     // It should be safe to initialize the federation mechanism at this
00389     // point.  What we really needed to wait for is the initialization of
00390     // the service components - like the DomainParticipantFactory and the
00391     // repository bindings.
00392     // N.B. This is done *before* being added to the IOR table to avoid any
00393     //      races with an eager client.
00394     this->federator_.orb(this->orb_);
00395 
00396     //
00397     // Add the federator to the info_servant update manager as an
00398     // additional updater interface to be called.
00399     // N.B. This needs to be done *after* the call to load_domains()
00400     //      since that is where the update manager is initialized in the
00401     //      info startup sequencing.
00402     this->info_servant_->add(&this->federator_);
00403   }
00404 
00405   // Grab the IOR table.
00406   CORBA::Object_var table_object =
00407     this->orb_->resolve_initial_references("IORTable");
00408 
00409   IORTable::Table_var adapter = IORTable::Table::_narrow(table_object);
00410 
00411   if (CORBA::is_nil(adapter)) {
00412     ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00413 
00414   } else {
00415     adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str);
00416 
00417     if (this->federator_.id().overridden()) {
00418       // Bind to '/Federator'
00419       adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior);
00420 
00421       // Bind to '/Federator/1382379631'
00422       std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY);
00423       buffer << "/" << std::dec << this->federatorConfig_.federationDomain();
00424       adapter->bind(buffer.str().c_str(), federator_ior);
00425     }
00426   }
00427 
00428   FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w"));
00429 
00430   if (output_file == 0) {
00431     ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"),
00432                ior_file_.c_str()));
00433     throw InitError("Unable to open IOR file.");
00434   }
00435 
00436   ACE_OS::fprintf(output_file, "%s", objref_str.in());
00437   ACE_OS::fclose(output_file);
00438 
00439   // Initial federation join if specified on command line.
00440   if (this->federator_.id().overridden()
00441        && !this->federatorConfig_.federateIor().empty()) {
00442     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00443       ACE_DEBUG((LM_DEBUG,
00444                  ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ")
00445                  ACE_TEXT("joining federation with repository %s\n"),
00446                  this->federatorConfig_.federateIor().c_str()));
00447     }
00448 
00449     obj = this->orb_->string_to_object(
00450           this->federatorConfig_.federateIor().c_str());
00451 
00452     if (CORBA::is_nil(obj)) {
00453       ACE_ERROR((LM_ERROR,
00454                  ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"),
00455                  this->federatorConfig_.federateIor().c_str()));
00456       throw InitError("Unable to resolve IOR for initial federation.");
00457     }
00458 
00459     OpenDDS::Federator::Manager_var peer =
00460       OpenDDS::Federator::Manager::_narrow(obj);
00461 
00462     if (CORBA::is_nil(peer)) {
00463       ACE_ERROR((LM_ERROR,
00464                  ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"),
00465                  this->federatorConfig_.federateIor().c_str()));
00466       throw InitError("Unable to narrow peer for initial federation.");
00467     }
00468 
00469     // Actually join.
00470     peer = peer->join_federation(federator,
00471                           this->federatorConfig_.federationDomain());
00472   }
00473 }
00474 
00475 InfoRepo_Shutdown::InfoRepo_Shutdown(InfoRepo &ir)
00476 : ir_(ir)
00477 {
00478 }
00479 
00480 void
00481 InfoRepo_Shutdown::operator()(int which_signal)
00482 {
00483   ACE_DEBUG((LM_DEBUG,
00484              "InfoRepo_Shutdown: shutting down on signal %d\n",
00485              which_signal));
00486   this->ir_.shutdown();
00487 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1