00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPSInfo_i.h"
00010 #include "DCPSInfoRepoServ.h"
00011 #include "FederatorConfig.h"
00012 #include "FederatorManagerImpl.h"
00013 #include "ShutdownInterface.h"
00014 #include "PersistenceUpdater.h"
00015 #include "UpdateManager.h"
00016
00017 #include "dds/DCPS/Service_Participant.h"
00018 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00019
00020
00021 #if !defined(DDS_HAS_MINIMUM_BIT)
00022 #include "dds/DCPS/transport/tcp/Tcp.h"
00023 #endif
00024
00025 #include "tao/ORB_Core.h"
00026 #include "tao/IORTable/IORTable.h"
00027
00028 #include <orbsvcs/Shutdown_Utilities.h>
00029
00030 #ifdef ACE_AS_STATIC_LIBS
00031 #include "tao/ImR_Client/ImR_Client.h"
00032 #endif
00033
00034 #include "ace/Get_Opt.h"
00035 #include "ace/Arg_Shifter.h"
00036 #include "ace/Service_Config.h"
00037 #include "ace/Argv_Type_Converter.h"
00038
00039 #include <string>
00040 #include <sstream>
00041
00042 InfoRepo::InfoRepo(int argc, ACE_TCHAR *argv[])
00043 : ior_file_(ACE_TEXT("repo.ior"))
00044 , listen_address_given_(0)
00045 #ifdef DDS_HAS_MINIMUM_BIT
00046 , use_bits_(false)
00047 #else
00048 , use_bits_(true)
00049 #endif
00050 , resurrect_(true)
00051 , finalized_(false)
00052 , servant_finalized_(false)
00053 , federator_(this->federatorConfig_)
00054 , federatorConfig_(argc, argv)
00055 , lock_()
00056 , cond_(lock_)
00057 , shutdown_complete_(false)
00058 , dispatch_cleanup_delay_(30,0)
00059 {
00060 try {
00061 this->init();
00062 } catch (...) {
00063 this->finalize();
00064 throw;
00065 }
00066 }
00067
00068 InfoRepo::~InfoRepo()
00069 {
00070 this->finalize();
00071 }
00072
00073 void
00074 InfoRepo::run()
00075 {
00076 this->shutdown_complete_ = false;
00077 this->orb_->run();
00078 this->finalize();
00079 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00080 this->shutdown_complete_ = true;
00081 this->cond_.signal();
00082 }
00083
00084 void
00085 InfoRepo::finalize()
00086 {
00087 if (this->finalized_) {
00088 return;
00089 }
00090
00091 if (!this->servant_finalized_) {
00092
00093
00094 this->info_servant_->finalize();
00095 this->federator_.finalize();
00096 this->servant_finalized_ = true;
00097 }
00098
00099 TheServiceParticipant->shutdown();
00100
00101 if (!CORBA::is_nil(this->orb_)) {
00102 this->orb_->destroy();
00103 }
00104
00105 this->finalized_ = true;
00106 }
00107
00108 int
00109 InfoRepo::handle_exception(ACE_HANDLE )
00110 {
00111
00112 this->info_servant_->finalize();
00113 this->federator_.finalize();
00114 this->servant_finalized_ = true;
00115
00116 this->orb_->shutdown(true);
00117 return 0;
00118 }
00119
00120 void
00121 InfoRepo::shutdown()
00122 {
00123 this->orb_->orb_core()->reactor()->notify(this);
00124
00125 }
00126
00127 void
00128 InfoRepo::sync_shutdown()
00129 {
00130 this->shutdown();
00131 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
00132
00133 while (!this->shutdown_complete_) {
00134 this->cond_.wait();
00135 }
00136 }
00137
00138 void
00139 InfoRepo::usage(const ACE_TCHAR* cmd)
00140 {
00141
00142
00143 ACE_DEBUG((LM_INFO,
00144 ACE_TEXT("Usage:\n")
00145 ACE_TEXT(" %s\n")
00146 ACE_TEXT(" -a <address> listening address for Built-In Topics\n")
00147 ACE_TEXT(" -o <file> write ior to file\n")
00148 ACE_TEXT(" -NOBITS disable the Built-In Topics\n")
00149 ACE_TEXT(" -z turn on verbose Transport logging\n")
00150 ACE_TEXT(" -r Resurrect from persistent file\n")
00151 ACE_TEXT(" -FederatorConfig <file> configure federation from <file>\n")
00152 ACE_TEXT(" -FederationId <number> value for this repository\n")
00153 ACE_TEXT(" -FederateWith <ior> federate initially with object at <ior>\n")
00154 ACE_TEXT(" -ReassociateDelay <msec> delay between reassociations\n")
00155 ACE_TEXT(" -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n")
00156 ACE_TEXT(" -?\n")
00157 ACE_TEXT("\n"),
00158 cmd));
00159 }
00160
00161 void
00162 InfoRepo::parse_args(int argc, ACE_TCHAR *argv[])
00163 {
00164 ACE_Arg_Shifter arg_shifter(argc, argv);
00165
00166 const ACE_TCHAR* current_arg = 0;
00167
00168 while (arg_shifter.is_anything_left()) {
00169 if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) {
00170 this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
00171 this->listen_address_given_ = 1;
00172 arg_shifter.consume_arg();
00173
00174 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) {
00175 long msec = ACE_OS::atoi(current_arg);
00176 this->reassociate_delay_.msec(msec);
00177
00178 arg_shifter.consume_arg();
00179 } else if ((current_arg = arg_shifter.get_the_parameter
00180 (ACE_TEXT("-r"))) != 0) {
00181 int p = ACE_OS::atoi(current_arg);
00182 this->resurrect_ = true;
00183
00184 if (p == 0) {
00185 this->resurrect_ = false;
00186 }
00187
00188 arg_shifter.consume_arg();
00189
00190 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) {
00191 this->ior_file_ = current_arg;
00192 arg_shifter.consume_arg();
00193
00194 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) {
00195 this->use_bits_ = false;
00196 arg_shifter.consume_arg();
00197
00198 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) {
00199 TURN_ON_VERBOSE_DEBUG;
00200 arg_shifter.consume_arg();
00201
00202 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) {
00203 long sec = ACE_OS::atoi(current_arg);
00204 this->dispatch_cleanup_delay_.sec(sec);
00205 arg_shifter.consume_arg();
00206
00207 }
00208
00209
00210 else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) {
00211 this->usage(argv[0]);
00212 throw InitError("Usage");
00213 }
00214
00215
00216
00217 else {
00218 arg_shifter.ignore_arg();
00219 }
00220 }
00221 }
00222
00223 void
00224 InfoRepo::init()
00225 {
00226 ACE_Argv_Type_Converter cvt(this->federatorConfig_.argc(),
00227 this->federatorConfig_.argv());
00228 this->orb_ = CORBA::ORB_init(cvt.get_argc(), cvt.get_ASCII_argv(), "");
00229
00230 this->info_servant_ =
00231 new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this,
00232 this->federatorConfig_.federationId());
00233
00234
00235 this->federator_.info() = this->info_servant_.in();
00236
00237 CORBA::Object_var obj =
00238 this->orb_->resolve_initial_references("RootPOA");
00239 PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
00240
00241 PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
00242
00243
00244
00245 CORBA::PolicyList policies(2);
00246 policies.length(2);
00247 policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID);
00248 policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
00249 PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo",
00250 poa_manager,
00251 policies);
00252
00253
00254 for (CORBA::ULong i = 0; i < policies.length(); ++i) {
00255 policies[i]->destroy();
00256 }
00257
00258 PortableServer::ObjectId_var oid =
00259 PortableServer::string_to_ObjectId("InfoRepo");
00260 info_poa->activate_object_with_id(oid, this->info_servant_.in());
00261 obj = info_poa->id_to_reference(oid);
00262
00263
00264
00265 OpenDDS::DCPS::DCPSInfo_var info_repo =
00266 OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj);
00267
00268 CORBA::String_var objref_str =
00269 orb_->object_to_string(info_repo);
00270
00271
00272 DDS::DomainParticipantFactory_var dpf =
00273 TheParticipantFactoryWithArgs(cvt.get_argc(),
00274 cvt.get_TCHAR_argv());
00275
00276
00277
00278
00279
00280 this->parse_args(cvt.get_argc(), cvt.get_TCHAR_argv());
00281
00282
00283
00284 poa_manager->activate();
00285
00286 if (this->use_bits_) {
00287 if (this->info_servant_->init_transport(this->listen_address_given_,
00288 this->listen_address_str_.c_str())
00289 != 0 ) {
00290 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00291 ACE_TEXT("Unable to initialize transport.\n")));
00292 throw InitError("Unable to initialize transport.");
00293 }
00294
00295 } else {
00296 TheServiceParticipant->set_BIT(false);
00297 }
00298
00299
00300
00301 OpenDDS::DCPS::Service_Participant* serv_part = TheServiceParticipant;
00302 serv_part->set_repo_ior(objref_str, OpenDDS::DCPS::Discovery::DEFAULT_REPO);
00303
00304 OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 );
00305 OpenDDS::DCPS::InfoRepoDiscovery_rch ird =
00306 OpenDDS::DCPS::static_rchandle_cast<OpenDDS::DCPS::InfoRepoDiscovery>(disc);
00307 if (!ird->set_ORB(orb_)) {
00308 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00309 ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n")));
00310 throw InitError("Unable to set the ORB in InfoRepoDiscovery.");
00311 }
00312
00313
00314
00315 if (!this->info_servant_->init_persistence()) {
00316 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00317 ACE_TEXT("Unable to initialize persistence.\n")));
00318 throw InitError("Unable to initialize persistence.");
00319 }
00320
00321
00322 if (this->reassociate_delay_ != ACE_Time_Value::zero &&
00323 !this->info_servant_->init_reassociation(this->reassociate_delay_)) {
00324 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00325 ACE_TEXT("Unable to initialize reassociation.\n")));
00326 throw InitError("Unable to initialize reassociation.");
00327 }
00328
00329
00330 if (this->dispatch_cleanup_delay_ != ACE_Time_Value::zero &&
00331 !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) {
00332 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
00333 ACE_TEXT("Unable to initialize Dispatch checking.\n")));
00334 throw InitError("Unable to initialize dispatch checking.");
00335 }
00336
00337
00338 OpenDDS::Federator::Manager_var federator;
00339 CORBA::String_var federator_ior;
00340
00341 if (federator_.id().overridden()) {
00342 oid = PortableServer::string_to_ObjectId("Federator");
00343 info_poa->activate_object_with_id(oid, &federator_);
00344 obj = info_poa->id_to_reference(oid);
00345 federator = OpenDDS::Federator::Manager::_narrow(obj);
00346
00347 federator_ior = orb_->object_to_string(federator);
00348
00349
00350
00351 this->federator_.localRepo(info_repo);
00352
00353
00354
00355
00356
00357
00358
00359 this->federator_.orb(this->orb_);
00360
00361
00362
00363
00364
00365
00366
00367 this->info_servant_->add(&this->federator_);
00368 }
00369
00370
00371 CORBA::Object_var table_object =
00372 this->orb_->resolve_initial_references("IORTable");
00373
00374 IORTable::Table_var adapter = IORTable::Table::_narrow(table_object);
00375
00376 if (CORBA::is_nil(adapter)) {
00377 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00378
00379 } else {
00380 adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str);
00381
00382 if (this->federator_.id().overridden()) {
00383
00384 adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior);
00385
00386
00387 std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY);
00388 buffer << "/" << std::dec << this->federatorConfig_.federationDomain();
00389 adapter->bind(buffer.str().c_str(), federator_ior);
00390 }
00391 }
00392
00393 FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w"));
00394
00395 if (output_file == 0) {
00396 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"),
00397 ior_file_.c_str()));
00398 throw InitError("Unable to open IOR file.");
00399 }
00400
00401 ACE_OS::fprintf(output_file, "%s", objref_str.in());
00402 ACE_OS::fclose(output_file);
00403
00404
00405 if (this->federator_.id().overridden()
00406 && !this->federatorConfig_.federateIor().empty()) {
00407 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00408 ACE_DEBUG((LM_DEBUG,
00409 ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ")
00410 ACE_TEXT("joining federation with repository %s\n"),
00411 this->federatorConfig_.federateIor().c_str()));
00412 }
00413
00414 obj = this->orb_->string_to_object(
00415 this->federatorConfig_.federateIor().c_str());
00416
00417 if (CORBA::is_nil(obj)) {
00418 ACE_ERROR((LM_ERROR,
00419 ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"),
00420 this->federatorConfig_.federateIor().c_str()));
00421 throw InitError("Unable to resolve IOR for initial federation.");
00422 }
00423
00424 OpenDDS::Federator::Manager_var peer =
00425 OpenDDS::Federator::Manager::_narrow(obj);
00426
00427 if (CORBA::is_nil(peer)) {
00428 ACE_ERROR((LM_ERROR,
00429 ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"),
00430 this->federatorConfig_.federateIor().c_str()));
00431 throw InitError("Unable to narrow peer for initial federation.");
00432 }
00433
00434
00435 peer->join_federation(federator,
00436 this->federatorConfig_.federationDomain());
00437 }
00438 }
00439
00440 InfoRepo_Shutdown::InfoRepo_Shutdown(InfoRepo &ir)
00441 : ir_(ir)
00442 {
00443 }
00444
00445 void
00446 InfoRepo_Shutdown::operator()(int which_signal)
00447 {
00448 ACE_DEBUG((LM_DEBUG,
00449 "InfoRepo_Shutdown: shutting down on signal %d\n",
00450 which_signal));
00451 this->ir_.shutdown();
00452 }