#include <DCPSInfoRepoServ.h>
Inheritance diagram for InfoRepo:
Public Member Functions | |
InfoRepo (int argc, ACE_TCHAR *argv[]) | |
~InfoRepo () | |
void | run () |
virtual void | shutdown () |
ShutdownInterface used to schedule a shutdown. | |
void | sync_shutdown () |
virtual int | handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE) |
Handler for the reactor to dispatch finalization activity to. | |
Private Member Functions | |
void | init () |
void | usage (const ACE_TCHAR *cmd) |
void | parse_args (int argc, ACE_TCHAR *argv[]) |
void | finalize () |
Actual finalization of service resources. | |
Private Attributes | |
CORBA::ORB_var | orb_ |
ACE_TString | ior_file_ |
std::string | listen_address_str_ |
int | listen_address_given_ |
bool | use_bits_ |
bool | resurrect_ |
ACE_Time_Value | reassociate_delay_ |
bool | finalized_ |
Flag to indicate that finalization has already occurred. | |
bool | servant_finalized_ |
OpenDDS::Federator::ManagerImpl | federator_ |
Repository Federation behaviors. | |
OpenDDS::Federator::Config | federatorConfig_ |
PortableServer::Servant_var< TAO_DDS_DCPSInfo_i > | info_servant_ |
ACE_Thread_Mutex | lock_ |
ACE_Condition_Thread_Mutex | cond_ |
bool | shutdown_complete_ |
ACE_Time_Value | dispatch_cleanup_delay_ |
Classes | |
struct | InitError |
Definition at line 26 of file DCPSInfoRepoServ.h.
InfoRepo::InfoRepo | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) |
Definition at line 42 of file DCPSInfoRepoServ.cpp.
References finalize(), and init().
00043 : ior_file_(ACE_TEXT("repo.ior")) 00044 , listen_address_given_(0) 00045 #ifdef DDS_HAS_MINIMUM_BIT 00046 , use_bits_(false) 00047 #else 00048 , use_bits_(true) 00049 #endif 00050 , resurrect_(true) 00051 , finalized_(false) 00052 , servant_finalized_(false) 00053 , federator_(this->federatorConfig_) 00054 , federatorConfig_(argc, argv) 00055 , lock_() 00056 , cond_(lock_) 00057 , shutdown_complete_(false) 00058 , dispatch_cleanup_delay_(30,0) 00059 { 00060 try { 00061 this->init(); 00062 } catch (...) { 00063 this->finalize(); 00064 throw; 00065 } 00066 }
InfoRepo::~InfoRepo | ( | ) |
Definition at line 68 of file DCPSInfoRepoServ.cpp.
References finalize().
00069 { 00070 this->finalize(); 00071 }
void InfoRepo::finalize | ( | ) | [private] |
Actual finalization of service resources.
Definition at line 85 of file DCPSInfoRepoServ.cpp.
References federator_, OpenDDS::Federator::ManagerImpl::finalize(), finalized_, info_servant_, orb_, servant_finalized_, and TheServiceParticipant.
Referenced by InfoRepo(), run(), and ~InfoRepo().
00086 { 00087 if (this->finalized_) { 00088 return; 00089 } 00090 00091 if (!this->servant_finalized_) { 00092 // reached if the ImR caused the ORB to shut down, 00093 // which bypasses InfoRepo::handle_exception() 00094 this->info_servant_->finalize(); 00095 this->federator_.finalize(); 00096 this->servant_finalized_ = true; 00097 } 00098 00099 TheServiceParticipant->shutdown(); 00100 00101 if (!CORBA::is_nil(this->orb_)) { 00102 this->orb_->destroy(); 00103 } 00104 00105 this->finalized_ = true; 00106 }
int InfoRepo::handle_exception | ( | ACE_HANDLE | fd = ACE_INVALID_HANDLE |
) | [virtual] |
Handler for the reactor to dispatch finalization activity to.
Definition at line 109 of file DCPSInfoRepoServ.cpp.
References federator_, OpenDDS::Federator::ManagerImpl::finalize(), info_servant_, orb_, and servant_finalized_.
00110 { 00111 // these should occur before ORB::shutdown() since they use the ORB/reactor 00112 this->info_servant_->finalize(); 00113 this->federator_.finalize(); 00114 this->servant_finalized_ = true; 00115 00116 this->orb_->shutdown(true); 00117 return 0; 00118 }
void InfoRepo::init | ( | ) | [private] |
Definition at line 224 of file DCPSInfoRepoServ.cpp.
References OpenDDS::Federator::Config::argv(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Discovery::DEFAULT_REPO, OpenDDS::Federator::Config::federateIor(), OpenDDS::Federator::Config::federationDomain(), federator_, OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federatorConfig_, OpenDDS::DCPS::Service_Participant::get_discovery(), OpenDDS::Federator::ManagerImpl::id(), OpenDDS::Federator::ManagerImpl::info(), info_servant_, ior_file_, OpenDDS::Federator::ManagerImpl::localRepo(), OpenDDS::Federator::ManagerImpl::orb(), orb_, TAO_DDS_DCPSFederationId::overridden(), parse_args(), OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, OpenDDS::DCPS::Service_Participant::set_repo_ior(), TheParticipantFactoryWithArgs, and TheServiceParticipant.
Referenced by InfoRepo().
00225 { 00226 ACE_Argv_Type_Converter cvt(this->federatorConfig_.argc(), 00227 this->federatorConfig_.argv()); 00228 this->orb_ = CORBA::ORB_init(cvt.get_argc(), cvt.get_ASCII_argv(), ""); 00229 00230 this->info_servant_ = 00231 new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this, 00232 this->federatorConfig_.federationId()); 00233 00234 // Install the DCPSInfo_i into the Federator::Manager. 00235 this->federator_.info() = this->info_servant_.in(); 00236 00237 CORBA::Object_var obj = 00238 this->orb_->resolve_initial_references("RootPOA"); 00239 PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); 00240 00241 PortableServer::POAManager_var poa_manager = root_poa->the_POAManager(); 00242 00243 // Use persistent and user id POA policies so the Info Repo's 00244 // object references are consistent. 00245 CORBA::PolicyList policies(2); 00246 policies.length(2); 00247 policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID); 00248 policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT); 00249 PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo", 00250 poa_manager, 00251 policies); 00252 00253 // Creation of the new POAs over, so destroy the Policy_ptr's. 00254 for (CORBA::ULong i = 0; i < policies.length(); ++i) { 00255 policies[i]->destroy(); 00256 } 00257 00258 PortableServer::ObjectId_var oid = 00259 PortableServer::string_to_ObjectId("InfoRepo"); 00260 info_poa->activate_object_with_id(oid, this->info_servant_.in()); 00261 obj = info_poa->id_to_reference(oid); 00262 // the object is created locally, so it is safe to do an 00263 // _unchecked_narrow, this was needed to prevent an exception 00264 // when dealing with ImR-ified objects 00265 OpenDDS::DCPS::DCPSInfo_var info_repo = 00266 OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj); 00267 00268 CORBA::String_var objref_str = 00269 orb_->object_to_string(info_repo); 00270 00271 // Initialize the DomainParticipantFactory 00272 DDS::DomainParticipantFactory_var dpf = 00273 TheParticipantFactoryWithArgs(cvt.get_argc(), 00274 cvt.get_TCHAR_argv()); 00275 00276 // We need parse the command line options for DCPSInfoRepo after parsing DCPS specific 00277 // command line options. 00278 00279 // Check the non-ORB arguments. 00280 this->parse_args(cvt.get_argc(), cvt.get_TCHAR_argv()); 00281 00282 // Activate the POA manager before initialize built-in-topics 00283 // so invocations can be processed. 00284 poa_manager->activate(); 00285 00286 if (this->use_bits_) { 00287 if (this->info_servant_->init_transport(this->listen_address_given_, 00288 this->listen_address_str_.c_str()) 00289 != 0 /* init_transport returns 0 for success */) { 00290 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ") 00291 ACE_TEXT("Unable to initialize transport.\n"))); 00292 throw InitError("Unable to initialize transport."); 00293 } 00294 00295 } else { 00296 TheServiceParticipant->set_BIT(false); 00297 } 00298 00299 // This needs to be done after initialization since we create the reference 00300 // to ourselves in the service here. 00301 OpenDDS::DCPS::Service_Participant* serv_part = TheServiceParticipant; 00302 serv_part->set_repo_ior(objref_str, OpenDDS::DCPS::Discovery::DEFAULT_REPO); 00303 00304 OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 /*domainId*/); 00305 OpenDDS::DCPS::InfoRepoDiscovery_rch ird = 00306 OpenDDS::DCPS::static_rchandle_cast<OpenDDS::DCPS::InfoRepoDiscovery>(disc); 00307 if (!ird->set_ORB(orb_)) { 00308 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ") 00309 ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n"))); 00310 throw InitError("Unable to set the ORB in InfoRepoDiscovery."); 00311 } 00312 00313 // Initialize persistence _after_ initializing the participant factory 00314 // and intializing the transport. 00315 if (!this->info_servant_->init_persistence()) { 00316 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ") 00317 ACE_TEXT("Unable to initialize persistence.\n"))); 00318 throw InitError("Unable to initialize persistence."); 00319 } 00320 00321 // Initialize reassociation. 00322 if (this->reassociate_delay_ != ACE_Time_Value::zero && 00323 !this->info_servant_->init_reassociation(this->reassociate_delay_)) { 00324 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ") 00325 ACE_TEXT("Unable to initialize reassociation.\n"))); 00326 throw InitError("Unable to initialize reassociation."); 00327 } 00328 00329 // Initialize dispatch checking 00330 if (this->dispatch_cleanup_delay_ != ACE_Time_Value::zero && 00331 !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) { 00332 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ") 00333 ACE_TEXT("Unable to initialize Dispatch checking.\n"))); 00334 throw InitError("Unable to initialize dispatch checking."); 00335 } 00336 00337 // Fire up the federator. 00338 OpenDDS::Federator::Manager_var federator; 00339 CORBA::String_var federator_ior; 00340 00341 if (federator_.id().overridden()) { 00342 oid = PortableServer::string_to_ObjectId("Federator"); 00343 info_poa->activate_object_with_id(oid, &federator_); 00344 obj = info_poa->id_to_reference(oid); 00345 federator = OpenDDS::Federator::Manager::_narrow(obj); 00346 00347 federator_ior = orb_->object_to_string(federator); 00348 00349 // Add a local repository reference that can be returned via a 00350 // remote call to a peer. 00351 this->federator_.localRepo(info_repo); 00352 00353 // It should be safe to initialize the federation mechanism at this 00354 // point. What we really needed to wait for is the initialization of 00355 // the service components - like the DomainParticipantFactory and the 00356 // repository bindings. 00357 // N.B. This is done *before* being added to the IOR table to avoid any 00358 // races with an eager client. 00359 this->federator_.orb(this->orb_); 00360 00361 // 00362 // Add the federator to the info_servant update manager as an 00363 // additional updater interface to be called. 00364 // N.B. This needs to be done *after* the call to load_domains() 00365 // since that is where the update manager is initialized in the 00366 // info startup sequencing. 00367 this->info_servant_->add(&this->federator_); 00368 } 00369 00370 // Grab the IOR table. 00371 CORBA::Object_var table_object = 00372 this->orb_->resolve_initial_references("IORTable"); 00373 00374 IORTable::Table_var adapter = IORTable::Table::_narrow(table_object); 00375 00376 if (CORBA::is_nil(adapter)) { 00377 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n"))); 00378 00379 } else { 00380 adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str); 00381 00382 if (this->federator_.id().overridden()) { 00383 // Bind to '/Federator' 00384 adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior); 00385 00386 // Bind to '/Federator/1382379631' 00387 std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY); 00388 buffer << "/" << std::dec << this->federatorConfig_.federationDomain(); 00389 adapter->bind(buffer.str().c_str(), federator_ior); 00390 } 00391 } 00392 00393 FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w")); 00394 00395 if (output_file == 0) { 00396 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"), 00397 ior_file_.c_str())); 00398 throw InitError("Unable to open IOR file."); 00399 } 00400 00401 ACE_OS::fprintf(output_file, "%s", objref_str.in()); 00402 ACE_OS::fclose(output_file); 00403 00404 // Initial federation join if specified on command line. 00405 if (this->federator_.id().overridden() 00406 && !this->federatorConfig_.federateIor().empty()) { 00407 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00408 ACE_DEBUG((LM_DEBUG, 00409 ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ") 00410 ACE_TEXT("joining federation with repository %s\n"), 00411 this->federatorConfig_.federateIor().c_str())); 00412 } 00413 00414 obj = this->orb_->string_to_object( 00415 this->federatorConfig_.federateIor().c_str()); 00416 00417 if (CORBA::is_nil(obj)) { 00418 ACE_ERROR((LM_ERROR, 00419 ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"), 00420 this->federatorConfig_.federateIor().c_str())); 00421 throw InitError("Unable to resolve IOR for initial federation."); 00422 } 00423 00424 OpenDDS::Federator::Manager_var peer = 00425 OpenDDS::Federator::Manager::_narrow(obj); 00426 00427 if (CORBA::is_nil(peer)) { 00428 ACE_ERROR((LM_ERROR, 00429 ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"), 00430 this->federatorConfig_.federateIor().c_str())); 00431 throw InitError("Unable to narrow peer for initial federation."); 00432 } 00433 00434 // Actually join. 00435 peer->join_federation(federator, 00436 this->federatorConfig_.federationDomain()); 00437 } 00438 }
void InfoRepo::parse_args | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [private] |
Definition at line 162 of file DCPSInfoRepoServ.cpp.
References dispatch_cleanup_delay_, ior_file_, listen_address_given_, listen_address_str_, reassociate_delay_, resurrect_, and TURN_ON_VERBOSE_DEBUG.
Referenced by init().
00163 { 00164 ACE_Arg_Shifter arg_shifter(argc, argv); 00165 00166 const ACE_TCHAR* current_arg = 0; 00167 00168 while (arg_shifter.is_anything_left()) { 00169 if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) { 00170 this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg); 00171 this->listen_address_given_ = 1; 00172 arg_shifter.consume_arg(); 00173 // Must check for -ReassociateDelay before -r 00174 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) { 00175 long msec = ACE_OS::atoi(current_arg); 00176 this->reassociate_delay_.msec(msec); 00177 00178 arg_shifter.consume_arg(); 00179 } else if ((current_arg = arg_shifter.get_the_parameter 00180 (ACE_TEXT("-r"))) != 0) { 00181 int p = ACE_OS::atoi(current_arg); 00182 this->resurrect_ = true; 00183 00184 if (p == 0) { 00185 this->resurrect_ = false; 00186 } 00187 00188 arg_shifter.consume_arg(); 00189 00190 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) { 00191 this->ior_file_ = current_arg; 00192 arg_shifter.consume_arg(); 00193 00194 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) { 00195 this->use_bits_ = false; 00196 arg_shifter.consume_arg(); 00197 00198 } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) { 00199 TURN_ON_VERBOSE_DEBUG; 00200 arg_shifter.consume_arg(); 00201 00202 } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) { 00203 long sec = ACE_OS::atoi(current_arg); 00204 this->dispatch_cleanup_delay_.sec(sec); 00205 arg_shifter.consume_arg(); 00206 00207 } 00208 00209 // The '-?' option 00210 else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) { 00211 this->usage(argv[0]); 00212 throw InitError("Usage"); 00213 } 00214 00215 // Anything else we just skip 00216 00217 else { 00218 arg_shifter.ignore_arg(); 00219 } 00220 } 00221 }
void InfoRepo::run | ( | ) |
Definition at line 74 of file DCPSInfoRepoServ.cpp.
References cond_, finalize(), orb_, and shutdown_complete_.
Referenced by ACE_TMAIN().
00075 { 00076 this->shutdown_complete_ = false; 00077 this->orb_->run(); 00078 this->finalize(); 00079 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_); 00080 this->shutdown_complete_ = true; 00081 this->cond_.signal(); 00082 }
void InfoRepo::shutdown | ( | ) | [virtual] |
ShutdownInterface used to schedule a shutdown.
Implements ShutdownInterface.
Definition at line 121 of file DCPSInfoRepoServ.cpp.
References orb_.
Referenced by InfoRepo_Shutdown::operator()(), and sync_shutdown().
00122 { 00123 this->orb_->orb_core()->reactor()->notify(this); 00124 // reactor will invoke our InfoRepo::handle_exception() 00125 }
void InfoRepo::sync_shutdown | ( | ) |
shutdown() and wait for it to complete: cannot be called from the reactor thread.
Definition at line 128 of file DCPSInfoRepoServ.cpp.
References cond_, and shutdown().
00129 { 00130 this->shutdown(); 00131 ACE_GUARD(ACE_Thread_Mutex, g, this->lock_); 00132 00133 while (!this->shutdown_complete_) { 00134 this->cond_.wait(); 00135 } 00136 }
void InfoRepo::usage | ( | const ACE_TCHAR * | cmd | ) | [private] |
Definition at line 139 of file DCPSInfoRepoServ.cpp.
00140 { 00141 // NOTE: The federation arguments are parsed early by the 00142 // FederationConfig object. 00143 ACE_DEBUG((LM_INFO, 00144 ACE_TEXT("Usage:\n") 00145 ACE_TEXT(" %s\n") 00146 ACE_TEXT(" -a <address> listening address for Built-In Topics\n") 00147 ACE_TEXT(" -o <file> write ior to file\n") 00148 ACE_TEXT(" -NOBITS disable the Built-In Topics\n") 00149 ACE_TEXT(" -z turn on verbose Transport logging\n") 00150 ACE_TEXT(" -r Resurrect from persistent file\n") 00151 ACE_TEXT(" -FederatorConfig <file> configure federation from <file>\n") 00152 ACE_TEXT(" -FederationId <number> value for this repository\n") 00153 ACE_TEXT(" -FederateWith <ior> federate initially with object at <ior>\n") 00154 ACE_TEXT(" -ReassociateDelay <msec> delay between reassociations\n") 00155 ACE_TEXT(" -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n") 00156 ACE_TEXT(" -?\n") 00157 ACE_TEXT("\n"), 00158 cmd)); 00159 }
ACE_Condition_Thread_Mutex InfoRepo::cond_ [private] |
ACE_Time_Value InfoRepo::dispatch_cleanup_delay_ [private] |
Repository Federation behaviors.
Definition at line 71 of file DCPSInfoRepoServ.h.
Referenced by finalize(), handle_exception(), and init().
bool InfoRepo::finalized_ [private] |
Flag to indicate that finalization has already occurred.
Definition at line 67 of file DCPSInfoRepoServ.h.
Referenced by finalize().
PortableServer::Servant_var<TAO_DDS_DCPSInfo_i> InfoRepo::info_servant_ [private] |
Definition at line 74 of file DCPSInfoRepoServ.h.
Referenced by finalize(), handle_exception(), and init().
ACE_TString InfoRepo::ior_file_ [private] |
int InfoRepo::listen_address_given_ [private] |
std::string InfoRepo::listen_address_str_ [private] |
ACE_Thread_Mutex InfoRepo::lock_ [private] |
Definition at line 76 of file DCPSInfoRepoServ.h.
CORBA::ORB_var InfoRepo::orb_ [private] |
Definition at line 57 of file DCPSInfoRepoServ.h.
Referenced by finalize(), handle_exception(), init(), run(), and shutdown().
ACE_Time_Value InfoRepo::reassociate_delay_ [private] |
bool InfoRepo::resurrect_ [private] |
bool InfoRepo::servant_finalized_ [private] |
bool InfoRepo::shutdown_complete_ [private] |
bool InfoRepo::use_bits_ [private] |
Definition at line 62 of file DCPSInfoRepoServ.h.