00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPSInfoRepoServ.h"
00012 #include "FederatorConfig.h"
00013 #include "FederatorManagerImpl.h"
00014 #include "ShutdownInterface.h"
00015 #include "PersistenceUpdater.h"
00016 #include "UpdateManager.h"
00017
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00020
00021
00022 #if !defined(DDS_HAS_MINIMUM_BIT)
00023 #include "dds/DCPS/transport/tcp/Tcp.h"
00024 #endif
00025
00026 #include "tao/IORTable/IORTable.h"
00027 #include "tao/BiDir_GIOP/BiDirGIOP.h"
00028
00029 #include <orbsvcs/Shutdown_Utilities.h>
00030
00031 #ifdef ACE_AS_STATIC_LIBS
00032 #include "tao/ImR_Client/ImR_Client.h"
00033 #endif
00034
00035 #include "ace/Get_Opt.h"
00036 #include "ace/Arg_Shifter.h"
00037 #include "ace/Service_Config.h"
00038 #include "ace/Argv_Type_Converter.h"
00039
00040 #include <string>
00041 #include <sstream>
00042
00043 InfoRepo::InfoRepo(int argc, ACE_TCHAR *argv[])
00044 : ior_file_(ACE_TEXT("repo.ior"))
00045 , listen_address_given_(0)
00046 #ifdef DDS_HAS_MINIMUM_BIT
00047 , use_bits_(false)
00048 #else
00049 , use_bits_(true)
00050 #endif
00051 , resurrect_(true)
00052 , finalized_(false)
00053 , servant_finalized_(false)
00054 , federator_(this->federatorConfig_)
00055 , federatorConfig_(argc, argv)
00056 , lock_()
00057 , cond_(lock_)
00058 , shutdown_complete_(false)
00059 , dispatch_cleanup_delay_(30,0)
00060 {
00061 try {
00062 this->init();
00063 } catch (...) {
00064 this->finalize();
00065 throw;
00066 }
00067 }
00068
00069 InfoRepo::~InfoRepo()
00070 {
00071 this->finalize();
00072 }
00073
00074 void
00075 InfoRepo::run()
00076 {
00077 this->shutdown_complete_ = false;
00078 this->orb_->run();
00079 this->finalize();
00080 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00081 this->shutdown_complete_ = true;
00082 this->cond_.signal();
00083 }
00084
00085 void
00086 InfoRepo::finalize()
00087 {
00088 if (this->finalized_) {
00089 return;
00090 }
00091
00092 if (!this->servant_finalized_) {
00093
00094
00095 this->info_servant_->finalize();
00096 this->federator_.finalize();
00097 TheServiceParticipant->shutdown();
00098 this->servant_finalized_ = true;
00099 }
00100
00101 if (!CORBA::is_nil(this->orb_)) {
00102 try {
00103 this->orb_->destroy();
00104 }
00105 catch (const CORBA::Exception&) {
00106
00107 }
00108 }
00109
00110 this->finalized_ = true;
00111 }
00112
00113 int
00114 InfoRepo::handle_exception(ACE_HANDLE )
00115 {
00116
00117 this->info_servant_->finalize();
00118 this->federator_.finalize();
00119 TheServiceParticipant->shutdown();
00120 this->servant_finalized_ = true;
00121
00122 this->orb_->shutdown(true);
00123 return 0;
00124 }
00125
00126 void
00127 InfoRepo::shutdown()
00128 {
00129 this->orb_->orb_core()->reactor()->notify(this);
00130
00131 }
00132
00133 void
00134 InfoRepo::sync_shutdown()
00135 {
00136 this->shutdown();
00137 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00138
00139 while (!this->shutdown_complete_) {
00140 this->cond_.wait();
00141 }
00142 }
00143
00144 void
00145 InfoRepo::usage(const ACE_TCHAR* cmd)
00146 {
00147
00148
00149 ACE_DEBUG((LM_INFO,
00150 ACE_TEXT("Usage:\n")
00151 ACE_TEXT(" %s\n")
00152 ACE_TEXT(" -a <address> listening address for Built-In Topics\n")
00153 ACE_TEXT(" -o <file> write ior to file\n")
00154 ACE_TEXT(" -NOBITS disable the Built-In Topics\n")
00155 ACE_TEXT(" -z turn on verbose Transport logging\n")
00156 ACE_TEXT(" -r Resurrect from persistent file\n")
00157 ACE_TEXT(" -FederatorConfig <file> configure federation from <file>\n")
00158 ACE_TEXT(" -FederationId <number> value for this repository\n")
00159 ACE_TEXT(" -FederateWith <ior> federate initially with object at <ior>\n")
00160 ACE_TEXT(" -ReassociateDelay <msec> delay between reassociations\n")
00161 ACE_TEXT(" -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n")
00162 ACE_TEXT(" -?\n")
00163 ACE_TEXT("\n"),
00164 cmd));
00165 }
00166
00167 void
00168 InfoRepo::parse_args(int argc, ACE_TCHAR *argv[])
00169 {
00170 ACE_Arg_Shifter arg_shifter(argc, argv);
00171
00172 const ACE_TCHAR* current_arg = 0;
00173
00174 while (arg_shifter.is_anything_left()) {
00175 if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) {
00176 this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
00177 this->listen_address_given_ = 1;
00178 arg_shifter.consume_arg();
00179
00180 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) {
00181 long msec = ACE_OS::atoi(current_arg);
00182 this->reassociate_delay_.msec(msec);
00183
00184 arg_shifter.consume_arg();
00185 } else if ((current_arg = arg_shifter.get_the_parameter
00186 (ACE_TEXT("-r"))) != 0) {
00187 int p = ACE_OS::atoi(current_arg);
00188 this->resurrect_ = true;
00189
00190 if (p == 0) {
00191 this->resurrect_ = false;
00192 }
00193
00194 arg_shifter.consume_arg();
00195
00196 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) {
00197 this->ior_file_ = current_arg;
00198 arg_shifter.consume_arg();
00199
00200 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) {
00201 this->use_bits_ = false;
00202 arg_shifter.consume_arg();
00203
00204 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) {
00205 TURN_ON_VERBOSE_DEBUG;
00206 arg_shifter.consume_arg();
00207
00208 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) {
00209 long sec = ACE_OS::atoi(current_arg);
00210 this->dispatch_cleanup_delay_.sec(sec);
00211 arg_shifter.consume_arg();
00212
00213 }
00214
00215
00216 else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) {
00217 this->usage(argv[0]);
00218 throw InitError("Usage");
00219 }
00220
00221
00222
00223 else {
00224 arg_shifter.ignore_arg();
00225 }
00226 }
00227 }
00228
00229 void
00230 InfoRepo::init()
00231 {
00232 ACE_ARGV args;
00233 args.add(federatorConfig_.argv(), true );
00234
00235 bool use_bidir = true;
00236
00237 for (int i = 0; i < args.argc() - 1; ++i) {
00238 if (0 == ACE_OS::strcmp(args[i], ACE_TEXT("-DCPSBidirGIOP"))) {
00239 use_bidir = ACE_OS::atoi(args[i + 1]);
00240 break;
00241 }
00242 }
00243
00244 if (use_bidir) {
00245 const ACE_TCHAR* config[] = {
00246 ACE_TEXT("-ORBSvcConfDirective"),
00247 ACE_TEXT("static Client_Strategy_Factory \"-ORBWaitStrategy rw ")
00248 ACE_TEXT("-ORBTransportMuxStrategy exclusive -ORBConnectStrategy blocked ")
00249 ACE_TEXT("-ORBConnectionHandlerCleanup 1\""),
00250 ACE_TEXT("-ORBSvcConfDirective"),
00251 ACE_TEXT("static Resource_Factory \"-ORBFlushingStrategy blocking\""),
00252 0
00253 };
00254 args.add((ACE_TCHAR**)config, true );
00255 }
00256
00257 int argc = args.argc();
00258 orb_ = CORBA::ORB_init(argc, args.argv());
00259
00260 this->info_servant_ =
00261 new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this,
00262 this->federatorConfig_.federationId());
00263
00264
00265 this->federator_.info() = this->info_servant_.in();
00266
00267 CORBA::Object_var obj =
00268 this->orb_->resolve_initial_references("RootPOA");
00269 PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
00270
00271 PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
00272
00273
00274
00275 CORBA::PolicyList policies(2 + use_bidir);
00276 policies.length(2 + use_bidir);
00277 policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID);
00278 policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
00279 if (use_bidir) {
00280 CORBA::Any policy;
00281 policy <<= BiDirPolicy::BOTH;
00282 policies[2] =
00283 orb_->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
00284 }
00285 PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo",
00286 poa_manager,
00287 policies);
00288
00289
00290 for (CORBA::ULong i = 0; i < policies.length(); ++i) {
00291 policies[i]->destroy();
00292 }
00293
00294 PortableServer::ObjectId_var oid =
00295 PortableServer::string_to_ObjectId("InfoRepo");
00296 info_poa->activate_object_with_id(oid, this->info_servant_.in());
00297 obj = info_poa->id_to_reference(oid);
00298
00299
00300
00301 OpenDDS::DCPS::DCPSInfo_var info_repo =
00302 OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj);
00303
00304 CORBA::String_var objref_str =
00305 orb_->object_to_string(info_repo);
00306
00307
00308 DDS::DomainParticipantFactory_var dpf =
00309 TheParticipantFactoryWithArgs(argc, args.argv());
00310
00311
00312
00313
00314
00315 this->parse_args(argc, args.argv());
00316
00317
00318
00319 poa_manager->activate();
00320
00321 if (this->use_bits_) {
00322 if (this->info_servant_->init_transport(this->listen_address_given_,
00323 this->listen_address_str_.c_str())
00324 != 0 ) {
00325 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00326 ACE_TEXT("Unable to initialize transport.\n")));
00327 throw InitError("Unable to initialize transport.");
00328 }
00329
00330 } else {
00331 TheServiceParticipant->set_BIT(false);
00332 }
00333
00334
00335
00336 OpenDDS::DCPS::Service_Participant* serv_part = TheServiceParticipant;
00337 serv_part->set_repo_ior(objref_str, OpenDDS::DCPS::Discovery::DEFAULT_REPO);
00338
00339 OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 );
00340 OpenDDS::DCPS::InfoRepoDiscovery_rch ird =
00341 OpenDDS::DCPS::static_rchandle_cast<OpenDDS::DCPS::InfoRepoDiscovery>(disc);
00342 if (!ird->set_ORB(orb_)) {
00343 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00344 ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n")));
00345 throw InitError("Unable to set the ORB in InfoRepoDiscovery.");
00346 }
00347
00348
00349
00350 if (!this->info_servant_->init_persistence()) {
00351 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00352 ACE_TEXT("Unable to initialize persistence.\n")));
00353 throw InitError("Unable to initialize persistence.");
00354 }
00355
00356
00357 if (this->reassociate_delay_ != ACE_Time_Value::zero &&
00358 !this->info_servant_->init_reassociation(this->reassociate_delay_)) {
00359 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00360 ACE_TEXT("Unable to initialize reassociation.\n")));
00361 throw InitError("Unable to initialize reassociation.");
00362 }
00363
00364
00365 if (this->dispatch_cleanup_delay_ != ACE_Time_Value::zero &&
00366 !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) {
00367 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00368 ACE_TEXT("Unable to initialize Dispatch checking.\n")));
00369 throw InitError("Unable to initialize dispatch checking.");
00370 }
00371
00372
00373 OpenDDS::Federator::Manager_var federator;
00374 CORBA::String_var federator_ior;
00375
00376 if (federator_.id().overridden()) {
00377 oid = PortableServer::string_to_ObjectId("Federator");
00378 info_poa->activate_object_with_id(oid, &federator_);
00379 obj = info_poa->id_to_reference(oid);
00380 federator = OpenDDS::Federator::Manager::_narrow(obj);
00381
00382 federator_ior = orb_->object_to_string(federator);
00383
00384
00385
00386 this->federator_.localRepo(info_repo);
00387
00388
00389
00390
00391
00392
00393
00394 this->federator_.orb(this->orb_);
00395
00396
00397
00398
00399
00400
00401
00402 this->info_servant_->add(&this->federator_);
00403 }
00404
00405
00406 CORBA::Object_var table_object =
00407 this->orb_->resolve_initial_references("IORTable");
00408
00409 IORTable::Table_var adapter = IORTable::Table::_narrow(table_object);
00410
00411 if (CORBA::is_nil(adapter)) {
00412 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00413
00414 } else {
00415 adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str);
00416
00417 if (this->federator_.id().overridden()) {
00418
00419 adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior);
00420
00421
00422 std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY);
00423 buffer << "/" << std::dec << this->federatorConfig_.federationDomain();
00424 adapter->bind(buffer.str().c_str(), federator_ior);
00425 }
00426 }
00427
00428 FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w"));
00429
00430 if (output_file == 0) {
00431 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"),
00432 ior_file_.c_str()));
00433 throw InitError("Unable to open IOR file.");
00434 }
00435
00436 ACE_OS::fprintf(output_file, "%s", objref_str.in());
00437 ACE_OS::fclose(output_file);
00438
00439
00440 if (this->federator_.id().overridden()
00441 && !this->federatorConfig_.federateIor().empty()) {
00442 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00443 ACE_DEBUG((LM_DEBUG,
00444 ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ")
00445 ACE_TEXT("joining federation with repository %s\n"),
00446 this->federatorConfig_.federateIor().c_str()));
00447 }
00448
00449 obj = this->orb_->string_to_object(
00450 this->federatorConfig_.federateIor().c_str());
00451
00452 if (CORBA::is_nil(obj)) {
00453 ACE_ERROR((LM_ERROR,
00454 ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"),
00455 this->federatorConfig_.federateIor().c_str()));
00456 throw InitError("Unable to resolve IOR for initial federation.");
00457 }
00458
00459 OpenDDS::Federator::Manager_var peer =
00460 OpenDDS::Federator::Manager::_narrow(obj);
00461
00462 if (CORBA::is_nil(peer)) {
00463 ACE_ERROR((LM_ERROR,
00464 ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"),
00465 this->federatorConfig_.federateIor().c_str()));
00466 throw InitError("Unable to narrow peer for initial federation.");
00467 }
00468
00469
00470 peer = peer->join_federation(federator,
00471 this->federatorConfig_.federationDomain());
00472 }
00473 }
00474
00475 InfoRepo_Shutdown::InfoRepo_Shutdown(InfoRepo &ir)
00476 : ir_(ir)
00477 {
00478 }
00479
00480 void
00481 InfoRepo_Shutdown::operator()(int which_signal)
00482 {
00483 ACE_DEBUG((LM_DEBUG,
00484 "InfoRepo_Shutdown: shutting down on signal %d\n",
00485 which_signal));
00486 this->ir_.shutdown();
00487 }