OpenDDS  Snapshot(2023/04/28-20:55)
DCPSInfoRepoServ.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "DcpsInfo_pch.h"
7 
8 #include "DCPSInfoRepoServ.h"
9 
10 #include "DCPSInfo_i.h"
11 #include "FederatorConfig.h"
12 #include "FederatorManagerImpl.h"
13 #include "ShutdownInterface.h"
14 #include "PersistenceUpdater.h"
15 #include "UpdateManager.h"
16 
18 #include <dds/DCPS/DCPS_Utils.h>
20 //If we need BIT support, pull in TCP so that static builds will have it.
21 #ifndef DDS_HAS_MINIMUM_BIT
23 #endif
24 
25 #include <tao/ORB_Core.h>
26 #include <tao/IORTable/IORTable.h>
27 #include <tao/BiDir_GIOP/BiDirGIOP.h>
28 #include <orbsvcs/Shutdown_Utilities.h>
29 #ifdef ACE_AS_STATIC_LIBS
30 # include <tao/ImR_Client/ImR_Client.h>
31 #endif
32 
33 #include <ace/Get_Opt.h>
34 #include <ace/Arg_Shifter.h>
35 #include <ace/Service_Config.h>
37 
38 #include <string>
39 #include <sstream>
40 
41 InfoRepo::InfoRepo(int argc, ACE_TCHAR *argv[])
42 : ior_file_(ACE_TEXT("repo.ior"))
43 , listen_address_given_(0)
44 #ifdef DDS_HAS_MINIMUM_BIT
45 , use_bits_(false)
46 #else
47 , use_bits_(true)
48 #endif
49 , resurrect_(true)
50 , finalized_(false)
51 , servant_finalized_(false)
52 , federatorConfig_(argc, argv)
53 , federator_(this->federatorConfig_)
54 , lock_()
55 , cond_(lock_)
56 , shutdown_complete_(false)
57 , shutdown_signal_(0)
58 , dispatch_cleanup_delay_(30,0)
59 {
60  try {
61  this->init();
62  } catch (...) {
63  this->finalize();
64  throw;
65  }
66 }
67 
69 {
70  try {
71  this->finalize();
72  } catch (const OpenDDS::Federator::Incomplete&) {}
73 }
74 
75 void
77 {
78  this->shutdown_complete_ = false;
79  this->orb_->run();
80  this->finalize();
81  ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
82  this->shutdown_complete_ = true;
83  this->cond_.signal();
84 }
85 
86 void
88 {
89  if (this->finalized_) {
90  return;
91  }
92 
93  if (!this->servant_finalized_) {
94  // reached if the ImR caused the ORB to shut down,
95  // which bypasses InfoRepo::handle_exception()
96  this->info_servant_->finalize();
97  this->federator_.finalize();
98  info_servant_->cleanup_all_built_in_topics(); // Used by federator_->finalize
99  const DDS::ReturnCode_t shutdown_error = TheServiceParticipant->shutdown();
100  if (shutdown_error) {
101  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: InfoRepo::finalize: "
102  "TheServiceParticipant->shutdown returned: %C\n",
103  OpenDDS::DCPS::retcode_to_string(shutdown_error)));
104  }
105  this->servant_finalized_ = true;
106  }
107 
108  if (!CORBA::is_nil(this->orb_)) {
109  try {
110  this->orb_->destroy();
111  }
112  catch (const CORBA::Exception&) {
113  //finalizing anyway, not an issue.
114  }
115  }
116 
117  this->finalized_ = true;
118 }
119 
120 int
121 InfoRepo::handle_exception(ACE_HANDLE /* fd */)
122 {
123  if (shutdown_signal_) {
125  "InfoRepo_Shutdown: shutting down on signal %d\n",
127  }
128 
129  // these should occur before ORB::shutdown() since they use the ORB/reactor
130  this->info_servant_->finalize();
131  this->federator_.finalize();
132  info_servant_->cleanup_all_built_in_topics(); // Used by federator_->finalize
133  const DDS::ReturnCode_t shutdown_error = TheServiceParticipant->shutdown();
134  if (shutdown_error) {
135  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: InfoRepo::handle_exception: "
136  "TheServiceParticipant->shutdown returned: %C\n",
137  OpenDDS::DCPS::retcode_to_string(shutdown_error)));
138  }
139  this->servant_finalized_ = true;
140 
141  this->orb_->shutdown(true);
142  return 0;
143 }
144 
145 void
147 {
148  shutdown_signal_ = which_signal;
149 }
150 
151 void
153 {
154  this->orb_->orb_core()->reactor()->notify(this);
155  // reactor will invoke our InfoRepo::handle_exception()
156 }
157 
158 void
160 {
161  this->shutdown();
162  ACE_GUARD(ACE_Thread_Mutex, g, this->lock_);
163 
164  while (!this->shutdown_complete_) {
165  this->cond_.wait();
166  }
167 }
168 
169 void
171 {
172  // NOTE: The federation arguments are parsed early by the
173  // FederationConfig object.
175  ACE_TEXT("Usage:\n")
176  ACE_TEXT(" %s\n")
177  ACE_TEXT(" -a <address> listening address for Built-In Topics\n")
178  ACE_TEXT(" -o <file> write ior to file\n")
179  ACE_TEXT(" -NOBITS disable the Built-In Topics\n")
180  ACE_TEXT(" -z turn on verbose Transport logging\n")
181  ACE_TEXT(" -r Resurrect from persistent file\n")
182  ACE_TEXT(" -FederatorConfig <file> configure federation from <file>\n")
183  ACE_TEXT(" -FederationId <number> value for this repository\n")
184  ACE_TEXT(" -FederateWith <ior> federate initially with object at <ior>\n")
185  ACE_TEXT(" -ReassociateDelay <msec> delay between reassociations\n")
186  ACE_TEXT(" -DispatchingCheckDelay <sec> delay between checks for cleaning up dispatching connections.\n")
187  ACE_TEXT(" -?\n")
188  ACE_TEXT("\n"),
189  cmd));
190 }
191 
192 void
193 InfoRepo::parse_args(int argc, ACE_TCHAR *argv[])
194 {
195  ACE_Arg_Shifter arg_shifter(argc, argv);
196 
197  const ACE_TCHAR* current_arg = 0;
198 
199  while (arg_shifter.is_anything_left()) {
200  if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-a"))) != 0) {
201  this->listen_address_str_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
202  this->listen_address_given_ = 1;
203  arg_shifter.consume_arg();
204  // Must check for -ReassociateDelay before -r
205  } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ReassociateDelay"))) != 0) {
206  long msec = ACE_OS::atoi(current_arg);
207  this->reassociate_delay_.msec(msec);
208 
209  arg_shifter.consume_arg();
210  } else if ((current_arg = arg_shifter.get_the_parameter
211  (ACE_TEXT("-r"))) != 0) {
212  int p = ACE_OS::atoi(current_arg);
213  this->resurrect_ = true;
214 
215  if (p == 0) {
216  this->resurrect_ = false;
217  }
218 
219  arg_shifter.consume_arg();
220 
221  } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-o"))) != 0) {
222  this->ior_file_ = current_arg;
223  arg_shifter.consume_arg();
224 
225  } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-NOBITS")) == 0) {
226  this->use_bits_ = false;
227  arg_shifter.consume_arg();
228 
229  } else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-z")) == 0) {
231  arg_shifter.consume_arg();
232 
233  } else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-DispatchingCheckDelay"))) != 0) {
234  long sec = ACE_OS::atoi(current_arg);
235  this->dispatch_cleanup_delay_.sec(sec);
236  arg_shifter.consume_arg();
237 
238  }
239 
240  // The '-?' option
241  else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-?")) == 0) {
242  this->usage(argv[0]);
243  throw InitError("Usage");
244  }
245 
246  // Anything else we just skip
247 
248  else {
249  arg_shifter.ignore_arg();
250  }
251  }
252 }
253 
254 void
256 {
257  ACE_ARGV args;
258  args.add(federatorConfig_.argv(), true /*quote arg*/);
259 
260  bool use_bidir = true;
261 
262  for (int i = 0; i < args.argc() - 1; ++i) {
263  if (0 == ACE_OS::strcmp(args[i], ACE_TEXT("-DCPSBidirGIOP"))) {
264  use_bidir = ACE_OS::atoi(args[i + 1]);
265  break;
266  }
267  }
268 
269  if (use_bidir) {
270  const ACE_TCHAR* config[] = {
271  ACE_TEXT("-ORBSvcConfDirective"),
272  ACE_TEXT("static Client_Strategy_Factory \"-ORBWaitStrategy rw ")
273  ACE_TEXT("-ORBTransportMuxStrategy exclusive -ORBConnectStrategy blocked ")
274  ACE_TEXT("-ORBConnectionHandlerCleanup 1\""),
275  ACE_TEXT("-ORBSvcConfDirective"),
276  ACE_TEXT("static Resource_Factory \"-ORBFlushingStrategy blocking\""),
277  0
278  };
279  args.add((ACE_TCHAR**)config, true /*quote arg*/);
280  }
281 
282  int argc = args.argc();
283  orb_ = CORBA::ORB_init(argc, args.argv());
284 
285  this->info_servant_ =
286  new TAO_DDS_DCPSInfo_i(this->orb_, this->resurrect_, this,
288 
289  // Install the DCPSInfo_i into the Federator::Manager.
290  this->federator_.info() = this->info_servant_.in();
291 
292  CORBA::Object_var obj =
293  this->orb_->resolve_initial_references("RootPOA");
294  PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
295 
296  PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
297 
298  // Use persistent and user id POA policies so the Info Repo's
299  // object references are consistent.
300  CORBA::PolicyList policies(2 + use_bidir);
301  policies.length(2 + use_bidir);
302  policies[0] = root_poa->create_id_assignment_policy(PortableServer::USER_ID);
303  policies[1] = root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
304  if (use_bidir) {
305  CORBA::Any policy;
306  policy <<= BiDirPolicy::BOTH;
307  policies[2] =
308  orb_->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
309  }
310  PortableServer::POA_var info_poa = root_poa->create_POA("InfoRepo",
311  poa_manager,
312  policies);
313 
314  // Creation of the new POAs over, so destroy the Policy_ptr's.
315  for (CORBA::ULong i = 0; i < policies.length(); ++i) {
316  policies[i]->destroy();
317  }
318 
321  info_poa->activate_object_with_id(oid, this->info_servant_.in());
322  obj = info_poa->id_to_reference(oid);
323  // the object is created locally, so it is safe to do an
324  // _unchecked_narrow, this was needed to prevent an exception
325  // when dealing with ImR-ified objects
326  OpenDDS::DCPS::DCPSInfo_var info_repo =
327  OpenDDS::DCPS::DCPSInfo::_unchecked_narrow(obj);
328 
329  CORBA::String_var objref_str =
330  orb_->object_to_string(info_repo);
331 
332  // Initialize the DomainParticipantFactory
333  DDS::DomainParticipantFactory_var dpf =
334  TheParticipantFactoryWithArgs(argc, args.argv());
335 
336  // We need parse the command line options for DCPSInfoRepo after parsing DCPS specific
337  // command line options.
338 
339  // Check the non-ORB arguments.
340  this->parse_args(argc, args.argv());
341 
342  // Activate the POA manager before initialize built-in-topics
343  // so invocations can be processed.
344  poa_manager->activate();
345 
346  if (this->use_bits_) {
347  if (this->info_servant_->init_transport(this->listen_address_given_,
348  this->listen_address_str_.c_str())
349  != 0 /* init_transport returns 0 for success */) {
350  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
351  ACE_TEXT("Unable to initialize transport.\n")));
352  throw InitError("Unable to initialize transport.");
353  }
354 
355  } else {
356  TheServiceParticipant->set_BIT(false);
357  }
358 
359  // This needs to be done after initialization since we create the reference
360  // to ourselves in the service here.
363 
364  OpenDDS::DCPS::Discovery_rch disc = serv_part->get_discovery(0 /*domainId*/);
367  if (!ird->set_ORB(orb_)) {
368  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
369  ACE_TEXT("Unable to set the ORB in InfoRepoDiscovery.\n")));
370  throw InitError("Unable to set the ORB in InfoRepoDiscovery.");
371  }
372 
373  // Initialize persistence _after_ initializing the participant factory
374  // and initializing the transport.
375  if (!this->info_servant_->init_persistence()) {
376  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
377  ACE_TEXT("Unable to initialize persistence.\n")));
378  throw InitError("Unable to initialize persistence.");
379  }
380 
381  // Initialize reassociation.
383  !this->info_servant_->init_reassociation(this->reassociate_delay_)) {
384  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
385  ACE_TEXT("Unable to initialize reassociation.\n")));
386  throw InitError("Unable to initialize reassociation.");
387  }
388 
389  // Initialize dispatch checking
391  !this->info_servant_->init_dispatchChecking(this->dispatch_cleanup_delay_)) {
392  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DCPSInfoRepo::init: ")
393  ACE_TEXT("Unable to initialize Dispatch checking.\n")));
394  throw InitError("Unable to initialize dispatch checking.");
395  }
396 
397  // Fire up the federator.
398  OpenDDS::Federator::Manager_var federator;
399  CORBA::String_var federator_ior;
400 
401  if (federator_.id().overridden()) {
402  oid = PortableServer::string_to_ObjectId("Federator");
403  info_poa->activate_object_with_id(oid, &federator_);
404  obj = info_poa->id_to_reference(oid);
405  federator = OpenDDS::Federator::Manager::_narrow(obj);
406 
407  federator_ior = orb_->object_to_string(federator);
408 
409  // Add a local repository reference that can be returned via a
410  // remote call to a peer.
411  this->federator_.localRepo(info_repo);
412 
413  // It should be safe to initialize the federation mechanism at this
414  // point. What we really needed to wait for is the initialization of
415  // the service components - like the DomainParticipantFactory and the
416  // repository bindings.
417  // N.B. This is done *before* being added to the IOR table to avoid any
418  // races with an eager client.
419  this->federator_.orb(this->orb_);
420 
421  //
422  // Add the federator to the info_servant update manager as an
423  // additional updater interface to be called.
424  // N.B. This needs to be done *after* the call to load_domains()
425  // since that is where the update manager is initialized in the
426  // info startup sequencing.
427  this->info_servant_->add(&this->federator_);
428  }
429 
430  // Grab the IOR table.
431  CORBA::Object_var table_object =
432  this->orb_->resolve_initial_references("IORTable");
433 
434  IORTable::Table_var adapter = IORTable::Table::_narrow(table_object);
435 
436  if (CORBA::is_nil(adapter)) {
437  ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
438 
439  } else {
440  adapter->bind(OpenDDS::Federator::REPOSITORY_IORTABLE_KEY, objref_str);
441 
442  if (this->federator_.id().overridden()) {
443  // Bind to '/Federator'
444  adapter->bind(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY, federator_ior);
445 
446  // Bind to '/Federator/1382379631'
447  std::stringstream buffer(OpenDDS::Federator::FEDERATOR_IORTABLE_KEY);
448  buffer << "/" << std::dec << this->federatorConfig_.federationDomain();
449  adapter->bind(buffer.str().c_str(), federator_ior);
450  }
451  }
452 
453  FILE* output_file = ACE_OS::fopen(this->ior_file_.c_str(), ACE_TEXT("w"));
454 
455  if (output_file == 0) {
456  ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unable to open IOR file: %s\n"),
457  ior_file_.c_str()));
458  throw InitError("Unable to open IOR file.");
459  }
460 
461  ACE_OS::fprintf(output_file, "%s", objref_str.in());
462  ACE_OS::fclose(output_file);
463 
464  // Initial federation join if specified on command line.
465  if (this->federator_.id().overridden()
466  && !this->federatorConfig_.federateIor().empty()) {
469  ACE_TEXT("(%P|%t) INFO: DCPSInfoRepo::init() - ")
470  ACE_TEXT("joining federation with repository %s\n"),
471  this->federatorConfig_.federateIor().c_str()));
472  }
473 
474  obj = this->orb_->string_to_object(
475  this->federatorConfig_.federateIor().c_str());
476 
477  if (CORBA::is_nil(obj)) {
479  ACE_TEXT("(%P|%t) ERROR: could not resolve %s for initial federation.\n"),
480  this->federatorConfig_.federateIor().c_str()));
481  throw InitError("Unable to resolve IOR for initial federation.");
482  }
483 
484  OpenDDS::Federator::Manager_var peer =
485  OpenDDS::Federator::Manager::_narrow(obj);
486 
487  if (CORBA::is_nil(peer)) {
489  ACE_TEXT("(%P|%t) ERROR: could not narrow %s.\n"),
490  this->federatorConfig_.federateIor().c_str()));
491  throw InitError("Unable to narrow peer for initial federation.");
492  }
493 
494  // Actually join.
495  peer = peer->join_federation(federator,
497  }
498 }
499 
501 : ir_(ir)
502 {
503 }
504 
505 void
507 {
508  ir_.set_shutdown_signal(which_signal);
509  ir_.shutdown();
510 }
#define ACE_DEBUG(X)
int fclose(FILE *fp)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const char * c_str(void) const
static const char * DEFAULT_REPO
Key value for the default repository IOR.
Definition: Discovery.h:85
ACE_TCHAR **& argv()
Access the enhanced argc.
ACE_Time_Value reassociate_delay_
InfoRepo_Shutdown(InfoRepo &ir)
void federationDomain(long domain)
Federation Id value.
LM_INFO
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
TAO_DDS_DCPSInfo_i *& info()
Accessors for the DCPSInfo reference.
CORBA::OctetSeq_var ObjectId_var
InfoRepo(int argc, ACE_TCHAR *argv[])
ACE_Thread_Mutex lock_
#define ACE_TEXT_ALWAYS_CHAR(STRING)
CORBA::ORB_var orb_
PortableServer::ObjectId * string_to_ObjectId(const char *string)
int consume_arg(int number=1)
FILE * fopen(const char *filename, const char *mode)
bool servant_finalized_
ACE_Guard< ACE_Thread_Mutex > lock_
void set_shutdown_signal(int which_signal)
Discovery Strategy class that implements InfoRepo discovery.
CORBA::ORB_ptr orb()
Accessors for the ORB.
void usage(const ACE_TCHAR *cmd)
void operator()(int which_signal)
int is_anything_left(void) const
bool finalized_
Flag to indicate that finalization has already occurred.
LM_DEBUG
virtual int handle_exception(ACE_HANDLE fd=ACE_INVALID_HANDLE)
Handler for the reactor to dispatch finalization activity to.
void finalize()
Cleanup state for shutdown.
const CHAR_TYPE * get_the_parameter(const CHAR_TYPE *flag)
ACE_CDR::ULong ULong
char ACE_TCHAR
int argc(void) const
int wait(const ACE_Time_Value *abstime)
TAO_DDS_DCPSFederationId & federationId()
Federation Id value.
int listen_address_given_
sequence< Policy > PolicyList
void cleanup_all_built_in_topics()
bool set_repo_ior(const char *ior, Discovery::RepoKey key=Discovery::DEFAULT_REPO, bool attach_participant=true)
const string REPOSITORY_IORTABLE_KEY
Definition: Federator.idl:241
std::string listen_address_str_
time_t sec(void) const
void parse_args(int argc, ACE_TCHAR *argv[])
Implementation of the DCPSInfo.
Definition: DCPSInfo_i.h:53
int strcmp(const char *s, const char *t)
void federateIor(const tstring &ior)
Initial federation IOR value.
PortableServer::Servant_var< TAO_DDS_DCPSInfo_i > info_servant_
void localRepo(::OpenDDS::DCPS::DCPSInfo_ptr repo)
Capture a remote callable reference to the DCPSInfo.
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
CHAR_TYPE ** argv(void)
OpenDDS::Federator::ManagerImpl federator_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
unsigned long msec(void) const
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
void sync_shutdown()
ACE_TString ior_file_
const string FEDERATOR_IORTABLE_KEY
Definition: Federator.idl:242
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
static const ACE_Time_Value zero
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
Discovery_rch get_discovery(const DDS::DomainId_t domain)
Accessor of the Discovery object for a given domain.
void finalize()
Release resources gracefully.
#define TURN_ON_VERBOSE_DEBUG
ACE_Time_Value dispatch_cleanup_delay_
int cur_arg_strncasecmp(const CHAR_TYPE *flag)
bool shutdown_complete_
const character_type * in(void) const
virtual void shutdown()
ShutdownInterface used to schedule a shutdown.
ACE_Condition_Thread_Mutex cond_
#define TheServiceParticipant
int fprintf(FILE *fp, const char *format,...) ACE_GCC_FORMAT_ATTRIBUTE(printf
LM_ERROR
int ignore_arg(int number=1)
Boolean is_nil(T x)
int add(const CHAR_TYPE *next_arg, bool quote_arg=false)
#define TheParticipantFactoryWithArgs(argc, argv)
int shutdown_signal_
OpenDDS::Federator::Config federatorConfig_
Repository Federation behaviors.
void finalize()
Actual finalization of service resources.