Service_Participant.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "WaitSet.h"
00010 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00011 #include "debug.h"
00012 #include "Service_Participant.h"
00013 #include "BuiltInTopicUtils.h"
00014 #include "DataDurabilityCache.h"
00015 #include "GuidConverter.h"
00016 #include "MonitorFactory.h"
00017 #include "ConfigUtils.h"
00018 #include "RecorderImpl.h"
00019 #include "ReplayerImpl.h"
00020 #include "StaticDiscovery.h"
00021 
00022 #include "ace/Singleton.h"
00023 #include "ace/Arg_Shifter.h"
00024 #include "ace/Reactor.h"
00025 #include "ace/Select_Reactor.h"
00026 #include "ace/Configuration_Import_Export.h"
00027 #include "ace/Service_Config.h"
00028 #include "ace/Argv_Type_Converter.h"
00029 #include "ace/Auto_Ptr.h"
00030 #include "ace/Sched_Params.h"
00031 #include "ace/Malloc_Allocator.h"
00032 #include "ace/OS_NS_unistd.h"
00033 
00034 #ifdef OPENDDS_SAFETY_PROFILE
00035 #include <stdio.h> // <cstdio> after FaceCTS bug 623 is fixed
00036 #else
00037 #include <fstream>
00038 #endif
00039 
00040 #if !defined (__ACE_INLINE__)
00041 #include "Service_Participant.inl"
00042 #endif /* __ACE_INLINE__ */
00043 
00044 namespace {
00045 
00046 void set_log_file_name(const char* fname)
00047 {
00048 #ifdef OPENDDS_SAFETY_PROFILE
00049   ACE_LOG_MSG->msg_ostream(fopen(fname, "a"), true);
00050 #else
00051   std::ofstream* output_stream = new std::ofstream(fname, ios::app);
00052   if (output_stream->bad()) {
00053     delete output_stream;
00054   } else {
00055     ACE_LOG_MSG->msg_ostream(output_stream, true);
00056   }
00057 #endif
00058   ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::LOGGER);
00059   ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
00060 }
00061 
00062 
00063 void set_log_verbose(unsigned long verbose_logging)
00064 {
00065   // Code copied from TAO_ORB_Core::init() in
00066   // TAO version 1.6a_p13.
00067 
00068   typedef void (ACE_Log_Msg::*PTMF)(u_long);
00069   PTMF flagop = &ACE_Log_Msg::set_flags;
00070   u_long value;
00071 
00072   switch (verbose_logging)
00073     {
00074     case 0:
00075       flagop = &ACE_Log_Msg::clr_flags;
00076       value = ACE_Log_Msg::VERBOSE | ACE_Log_Msg::VERBOSE_LITE;
00077       break;
00078     case 1:
00079       value = ACE_Log_Msg::VERBOSE_LITE; break;
00080     default:
00081       value = ACE_Log_Msg::VERBOSE; break;
00082     }
00083 
00084   (ACE_LOG_MSG->*flagop)(value);
00085 }
00086 
00087 
00088 }
00089 
00090 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00091 
00092 namespace OpenDDS {
00093 namespace DCPS {
00094 
00095 int Service_Participant::zero_argc = 0;
00096 
00097 const size_t DEFAULT_NUM_CHUNKS = 20;
00098 
00099 const size_t DEFAULT_CHUNK_MULTIPLIER = 10;
00100 
00101 const int DEFAULT_FEDERATION_RECOVERY_DURATION       = 900; // 15 minutes in seconds.
00102 const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS = 1;   // Wait only 1 second.
00103 const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER      = 2;   // Exponential backoff.
00104 const int DEFAULT_FEDERATION_LIVELINESS              = 60;  // 1 minute hearbeat.
00105 
00106 const int BIT_LOOKUP_DURATION_MSEC = 2000;
00107 
00108 static ACE_TString config_fname(ACE_TEXT(""));
00109 
00110 static const ACE_TCHAR DEFAULT_REPO_IOR[] = ACE_TEXT("file://repo.ior");
00111 
00112 static const ACE_CString DEFAULT_PERSISTENT_DATA_DIR = "OpenDDS-durable-data-dir";
00113 
00114 static const ACE_TCHAR COMMON_SECTION_NAME[] = ACE_TEXT("common");
00115 static const ACE_TCHAR DOMAIN_SECTION_NAME[] = ACE_TEXT("domain");
00116 static const ACE_TCHAR REPO_SECTION_NAME[]   = ACE_TEXT("repository");
00117 static const ACE_TCHAR RTPS_SECTION_NAME[]   = ACE_TEXT("rtps_discovery");
00118 
00119 static bool got_debug_level = false;
00120 static bool got_use_rti_serialization = false;
00121 static bool got_info = false;
00122 static bool got_chunks = false;
00123 static bool got_chunk_association_multiplier = false;
00124 static bool got_liveliness_factor = false;
00125 static bool got_bit_transport_port = false;
00126 static bool got_bit_transport_ip = false;
00127 static bool got_bit_lookup_duration_msec = false;
00128 static bool got_global_transport_config = false;
00129 static bool got_bit_flag = false;
00130 
00131 #if defined(OPENDDS_SECURITY)
00132 static bool got_security_flag = false;
00133 #endif
00134 
00135 static bool got_publisher_content_filter = false;
00136 static bool got_transport_debug_level = false;
00137 static bool got_pending_timeout = false;
00138 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00139 static bool got_persistent_data_dir = false;
00140 #endif
00141 static bool got_default_discovery = false;
00142 #ifndef DDS_DEFAULT_DISCOVERY_METHOD
00143 # ifdef OPENDDS_SAFETY_PROFILE
00144 #  define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_RTPS
00145 # else
00146 #  define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_REPO
00147 # endif
00148 #endif
00149 static bool got_log_fname = false;
00150 static bool got_log_verbose = false;
00151 static bool got_default_address = false;
00152 static bool got_bidir_giop = false;
00153 static bool got_monitor = false;
00154 
00155 Service_Participant::Service_Participant()
00156   :
00157 #ifndef OPENDDS_SAFETY_PROFILE
00158     ORB_argv_(false /*substitute_env_args*/),
00159 #endif
00160     reactor_owner_(ACE_OS::NULL_thread),
00161     defaultDiscovery_(DDS_DEFAULT_DISCOVERY_METHOD),
00162     n_chunks_(DEFAULT_NUM_CHUNKS),
00163     association_chunk_multiplier_(DEFAULT_CHUNK_MULTIPLIER),
00164     liveliness_factor_(80),
00165     bit_transport_port_(0),
00166     bit_enabled_(
00167 #ifdef DDS_HAS_MINIMUM_BIT
00168       false
00169 #else
00170       true
00171 #endif
00172     ),
00173 #if defined(OPENDDS_SECURITY)
00174     security_enabled_(false),
00175 #endif
00176     bit_lookup_duration_msec_(BIT_LOOKUP_DURATION_MSEC),
00177     global_transport_config_(ACE_TEXT("")),
00178     monitor_factory_(0),
00179     federation_recovery_duration_(DEFAULT_FEDERATION_RECOVERY_DURATION),
00180     federation_initial_backoff_seconds_(DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS),
00181     federation_backoff_multiplier_(DEFAULT_FEDERATION_BACKOFF_MULTIPLIER),
00182     federation_liveliness_(DEFAULT_FEDERATION_LIVELINESS),
00183     schedulerQuantum_(ACE_Time_Value::zero),
00184 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00185     pool_size_(1024*1024*16),
00186     pool_granularity_(8),
00187 #endif
00188     scheduler_(-1),
00189     priority_min_(0),
00190     priority_max_(0),
00191     publisher_content_filter_(true),
00192 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00193     persistent_data_dir_(DEFAULT_PERSISTENT_DATA_DIR),
00194 #endif
00195     pending_timeout_(ACE_Time_Value::zero),
00196     bidir_giop_(true),
00197     monitor_enabled_(false),
00198     shut_down_(false)
00199 {
00200   initialize();
00201 }
00202 
00203 Service_Participant::~Service_Participant()
00204 {
00205   shutdown();
00206 
00207   if (DCPS_debug_level > 0) {
00208     ACE_DEBUG((LM_DEBUG,
00209                "%T (%P|%t) Service_Participant::~Service_Participant()\n"));
00210   }
00211 }
00212 
00213 Service_Participant*
00214 Service_Participant::instance()
00215 {
00216   // Hide the template instantiation to prevent multiple instances
00217   // from being created.
00218 
00219   return ACE_Singleton<Service_Participant, ACE_SYNCH_MUTEX>::instance();
00220 }
00221 
00222 int
00223 Service_Participant::ReactorTask::svc()
00224 {
00225   Service_Participant* sp = instance();
00226   sp->reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
00227   sp->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00228   this->wait_for_startup();
00229   sp->reactor_->run_reactor_event_loop();
00230   return 0;
00231 }
00232 
00233 ACE_Reactor_Timer_Interface*
00234 Service_Participant::timer() const
00235 {
00236   return reactor_.get();
00237 }
00238 
00239 ACE_Reactor*
00240 Service_Participant::reactor() const
00241 {
00242   return reactor_.get();
00243 }
00244 
00245 ACE_thread_t
00246 Service_Participant::reactor_owner() const
00247 {
00248   return reactor_owner_;
00249 }
00250 
00251 void
00252 Service_Participant::shutdown()
00253 {
00254   // When we are already shutdown just let the shutdown be a noop
00255   if (shut_down_) {
00256     return;
00257   }
00258 
00259   shut_down_ = true;
00260   try {
00261     TransportRegistry::instance()->release();
00262     {
00263       ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00264 
00265       if (dp_factory_servant_)
00266         dp_factory_servant_->cleanup();
00267 
00268       domainRepoMap_.clear();
00269 
00270       if (reactor_) {
00271         reactor_->end_reactor_event_loop();
00272         reactor_task_.wait();
00273       }
00274 
00275       discoveryMap_.clear();
00276 
00277   #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00278       transient_data_cache_.reset();
00279       persistent_data_cache_.reset();
00280   #endif
00281 
00282       discovery_types_.clear();
00283     }
00284     TransportRegistry::close();
00285   } catch (const CORBA::Exception& ex) {
00286     ex._tao_print_exception("ERROR: Service_Participant::shutdown");
00287   }
00288 }
00289 
00290 #ifdef ACE_USES_WCHAR
00291 DDS::DomainParticipantFactory_ptr
00292 Service_Participant::get_domain_participant_factory(int &argc,
00293                                                     char *argv[])
00294 {
00295   ACE_Argv_Type_Converter converter(argc, argv);
00296   return get_domain_participant_factory(converter.get_argc(),
00297                                         converter.get_TCHAR_argv());
00298 }
00299 #endif
00300 
00301 DDS::DomainParticipantFactory_ptr
00302 Service_Participant::get_domain_participant_factory(int &argc,
00303                                                     ACE_TCHAR *argv[])
00304 {
00305   if (!dp_factory_servant_) {
00306     ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
00307                      guard,
00308                      this->factory_lock_,
00309                      DDS::DomainParticipantFactory::_nil());
00310 
00311     if (!dp_factory_servant_) {
00312       // This used to be a call to ORB_init().  Since the ORB is now managed
00313       // by InfoRepoDiscovery, just save the -ORB* args for later use.
00314       // The exceptions are -ORBLogFile and -ORBVerboseLogging, which
00315       // are processed by the service participant. This allows log control
00316       // even if an ORB is not being used.
00317 #ifndef OPENDDS_SAFETY_PROFILE
00318       ORB_argv_.add(ACE_TEXT("unused_arg_0"));
00319 #endif
00320       ACE_Arg_Shifter shifter(argc, argv);
00321       while (shifter.is_anything_left()) {
00322         if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBLogFile")) == 0) {
00323           shifter.ignore_arg();
00324         } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBVerboseLogging")) == 0) {
00325           shifter.ignore_arg();
00326         } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) < 0) {
00327           shifter.ignore_arg();
00328         } else {
00329 #ifndef OPENDDS_SAFETY_PROFILE
00330           ORB_argv_.add(shifter.get_current());
00331 #endif
00332           shifter.consume_arg();
00333           if (shifter.is_parameter_next()) {
00334 #ifndef OPENDDS_SAFETY_PROFILE
00335             ORB_argv_.add(shifter.get_current(), true /*quote_arg*/);
00336 #endif
00337             shifter.consume_arg();
00338           }
00339         }
00340       }
00341 
00342       if (parse_args(argc, argv) != 0) {
00343         return DDS::DomainParticipantFactory::_nil();
00344       }
00345 
00346       if (config_fname == ACE_TEXT("")) {
00347         if (DCPS_debug_level) {
00348           ACE_DEBUG((LM_NOTICE,
00349                      ACE_TEXT("(%P|%t) NOTICE: not using file configuration - no configuration ")
00350                      ACE_TEXT("file specified.\n")));
00351         }
00352 
00353       } else {
00354         // Load configuration only if the configuration
00355         // file exists.
00356         FILE* in = ACE_OS::fopen(config_fname.c_str(),
00357                                  ACE_TEXT("r"));
00358 
00359         if (!in) {
00360           ACE_DEBUG((LM_WARNING,
00361                      ACE_TEXT("(%P|%t) WARNING: not using file configuration - ")
00362                      ACE_TEXT("can not open \"%s\" for reading. %p\n"),
00363                      config_fname.c_str(), ACE_TEXT("fopen")));
00364 
00365         } else {
00366           ACE_OS::fclose(in);
00367 
00368           if (this->load_configuration() != 0) {
00369             ACE_ERROR((LM_ERROR,
00370                        ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: ")
00371                        ACE_TEXT("load_configuration() failed.\n")));
00372             return DDS::DomainParticipantFactory::_nil();
00373           }
00374         }
00375       }
00376 
00377 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00378       // For non-FACE tests, configure pool
00379       configure_pool();
00380 #endif
00381 
00382       // Establish the default scheduling mechanism and
00383       // priority here.  Sadly, the ORB is already
00384       // initialized so we have no influence over its
00385       // scheduling or thread priority(ies).
00386 
00387       /// @TODO: Move ORB initialization to after the
00388       ///        configuration file is processed and the
00389       ///        initial scheduling policy and priority are
00390       ///        established.
00391       this->initializeScheduling();
00392 
00393       dp_factory_servant_ = make_rch<DomainParticipantFactoryImpl>();
00394 
00395       if (!reactor_)
00396         reactor_.reset(new ACE_Reactor(new ACE_Select_Reactor, true));
00397 
00398       if (reactor_task_.activate(THR_NEW_LWP | THR_JOINABLE) == -1) {
00399         ACE_ERROR((LM_ERROR,
00400                    ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00401                    ACE_TEXT("Failed to activate the reactor task.\n")));
00402         return DDS::DomainParticipantFactory::_nil();
00403       }
00404 
00405       reactor_task_.wait_for_startup();
00406 
00407       if (this->monitor_enabled_) {
00408 #if !defined(ACE_AS_STATIC_LIBS)
00409         ACE_TString directive = ACE_TEXT("dynamic OpenDDS_Monitor Service_Object * OpenDDS_monitor:_make_MonitorFactoryImpl()");
00410         ACE_Service_Config::process_directive(directive.c_str());
00411 #endif
00412         this->monitor_factory_ =
00413           ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor");
00414 
00415         if (this->monitor_factory_ == 0) {
00416           if (this->monitor_enabled_) {
00417             ACE_ERROR((LM_ERROR,
00418                        ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00419                        ACE_TEXT("Unable to enable monitor factory.\n")));
00420           }
00421         }
00422       }
00423 
00424       if (this->monitor_factory_ == 0) {
00425         // Use the stubbed factory
00426         MonitorFactory::service_initialize();
00427         this->monitor_factory_ =
00428           ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor_Default");
00429       }
00430       if (this->monitor_enabled_) {
00431         this->monitor_factory_->initialize();
00432       }
00433 
00434       this->monitor_.reset(this->monitor_factory_->create_sp_monitor(this));
00435     }
00436   }
00437 
00438   return DDS::DomainParticipantFactory::_duplicate(dp_factory_servant_.in());
00439 }
00440 
00441 int
00442 Service_Participant::parse_args(int &argc, ACE_TCHAR *argv[])
00443 {
00444   ACE_Arg_Shifter arg_shifter(argc, argv);
00445 
00446   while (arg_shifter.is_anything_left()) {
00447     const ACE_TCHAR *currentArg = 0;
00448 
00449     if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDebugLevel"))) != 0) {
00450       set_DCPS_debug_level(ACE_OS::atoi(currentArg));
00451       arg_shifter.consume_arg();
00452       got_debug_level = true;
00453 
00454     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0) {
00455       this->set_repo_ior(currentArg, Discovery::DEFAULT_REPO);
00456       arg_shifter.consume_arg();
00457       got_info = true;
00458 
00459     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSRTISerialization"))) != 0) {
00460       Serializer::set_use_rti_serialization(ACE_OS::atoi(currentArg));
00461       arg_shifter.consume_arg();
00462       got_use_rti_serialization = true;
00463 
00464     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunks"))) != 0) {
00465       n_chunks_ = ACE_OS::atoi(currentArg);
00466       arg_shifter.consume_arg();
00467       got_chunks = true;
00468 
00469     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunkAssociationMultiplier"))) != 0) {
00470       association_chunk_multiplier_ = ACE_OS::atoi(currentArg);
00471       arg_shifter.consume_arg();
00472       got_chunk_association_multiplier = true;
00473 
00474     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSConfigFile"))) != 0) {
00475       config_fname = currentArg;
00476       arg_shifter.consume_arg();
00477 
00478     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLivelinessFactor"))) != 0) {
00479       liveliness_factor_ = ACE_OS::atoi(currentArg);
00480       arg_shifter.consume_arg();
00481       got_liveliness_factor = true;
00482 
00483     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportPort"))) != 0) {
00484       /// No need to guard this insertion as we are still single
00485       /// threaded here.
00486       this->bit_transport_port_ = ACE_OS::atoi(currentArg);
00487       arg_shifter.consume_arg();
00488       got_bit_transport_port = true;
00489 
00490     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportIPAddress"))) != 0) {
00491       /// No need to guard this insertion as we are still single
00492       /// threaded here.
00493       this->bit_transport_ip_ = currentArg;
00494       arg_shifter.consume_arg();
00495       got_bit_transport_ip = true;
00496 
00497     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitLookupDurationMsec"))) != 0) {
00498       bit_lookup_duration_msec_ = ACE_OS::atoi(currentArg);
00499       arg_shifter.consume_arg();
00500       got_bit_lookup_duration_msec = true;
00501 
00502     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSGlobalTransportConfig"))) != 0) {
00503       global_transport_config_ = currentArg;
00504       arg_shifter.consume_arg();
00505       got_global_transport_config = true;
00506 
00507     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBit"))) != 0) {
00508       bit_enabled_ = ACE_OS::atoi(currentArg);
00509       arg_shifter.consume_arg();
00510       got_bit_flag = true;
00511 
00512     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTransportDebugLevel"))) != 0) {
00513       OpenDDS::DCPS::Transport_debug_level = ACE_OS::atoi(currentArg);
00514       arg_shifter.consume_arg();
00515       got_transport_debug_level = true;
00516 
00517 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00518     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPersistentDataDir"))) != 0) {
00519       this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00520       arg_shifter.consume_arg();
00521       got_persistent_data_dir = true;
00522 #endif
00523 
00524     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPendingTimeout"))) != 0) {
00525       this->pending_timeout_ = ACE_OS::atoi(currentArg);
00526       arg_shifter.consume_arg();
00527       got_pending_timeout = true;
00528 
00529     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPublisherContentFilter"))) != 0) {
00530       this->publisher_content_filter_ = ACE_OS::atoi(currentArg);
00531       arg_shifter.consume_arg();
00532       got_publisher_content_filter = true;
00533 
00534     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultDiscovery"))) != 0) {
00535       this->defaultDiscovery_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00536       arg_shifter.consume_arg();
00537       got_default_discovery = true;
00538 
00539     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBidirGIOP"))) != 0) {
00540       bidir_giop_ = ACE_OS::atoi(currentArg);
00541       arg_shifter.consume_arg();
00542       got_bidir_giop = true;
00543 
00544     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationRecoveryDuration"))) != 0) {
00545       this->federation_recovery_duration_ = ACE_OS::atoi(currentArg);
00546       arg_shifter.consume_arg();
00547 
00548     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationInitialBackoffSeconds"))) != 0) {
00549       this->federation_initial_backoff_seconds_ = ACE_OS::atoi(currentArg);
00550       arg_shifter.consume_arg();
00551 
00552     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationBackoffMultiplier"))) != 0) {
00553       this->federation_backoff_multiplier_ = ACE_OS::atoi(currentArg);
00554       arg_shifter.consume_arg();
00555 
00556     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationLivelinessDuration"))) != 0) {
00557       this->federation_liveliness_ = ACE_OS::atoi(currentArg);
00558       arg_shifter.consume_arg();
00559 
00560     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBLogFile"))) != 0) {
00561       set_log_file_name(ACE_TEXT_ALWAYS_CHAR(currentArg));
00562       arg_shifter.consume_arg();
00563       got_log_fname = true;
00564 
00565     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBVerboseLogging"))) != 0) {
00566       set_log_verbose(ACE_OS::atoi(currentArg));
00567       arg_shifter.consume_arg();
00568       got_log_verbose = true;
00569 
00570     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultAddress"))) != 0) {
00571       this->default_address_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00572       arg_shifter.consume_arg();
00573       got_default_address = true;
00574 
00575     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSMonitor"))) != 0) {
00576       this->monitor_enabled_ = ACE_OS::atoi(currentArg);
00577       arg_shifter.consume_arg();
00578       got_monitor = true;
00579 
00580 #if defined(OPENDDS_SECURITY)
00581     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurity"))) != 0) {
00582       security_enabled_ = ACE_OS::atoi(currentArg);
00583       arg_shifter.consume_arg();
00584       got_security_flag = true;
00585 #endif
00586 
00587     } else {
00588       arg_shifter.ignore_arg();
00589     }
00590   }
00591 
00592   // Indicates successful parsing of the command line
00593   return 0;
00594 }
00595 
00596 void
00597 Service_Participant::initialize()
00598 {
00599   //NOTE: in the future these initial values may be configurable
00600   //      (to override the Specification's default values
00601   //       hmm - I guess that would be OK since the user
00602   //       is overriding them.)
00603   initial_TransportPriorityQosPolicy_.value = 0;
00604   initial_LifespanQosPolicy_.duration.sec = DDS::DURATION_INFINITE_SEC;
00605   initial_LifespanQosPolicy_.duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00606 
00607   initial_DurabilityQosPolicy_.kind = DDS::VOLATILE_DURABILITY_QOS;
00608 
00609   initial_DurabilityServiceQosPolicy_.service_cleanup_delay.sec =
00610     DDS::DURATION_ZERO_SEC;
00611   initial_DurabilityServiceQosPolicy_.service_cleanup_delay.nanosec =
00612     DDS::DURATION_ZERO_NSEC;
00613   initial_DurabilityServiceQosPolicy_.history_kind =
00614     DDS::KEEP_LAST_HISTORY_QOS;
00615   initial_DurabilityServiceQosPolicy_.history_depth = 1;
00616   initial_DurabilityServiceQosPolicy_.max_samples =
00617     DDS::LENGTH_UNLIMITED;
00618   initial_DurabilityServiceQosPolicy_.max_instances =
00619     DDS::LENGTH_UNLIMITED;
00620   initial_DurabilityServiceQosPolicy_.max_samples_per_instance =
00621     DDS::LENGTH_UNLIMITED;
00622 
00623   initial_PresentationQosPolicy_.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
00624   initial_PresentationQosPolicy_.coherent_access = false;
00625   initial_PresentationQosPolicy_.ordered_access = false;
00626 
00627   initial_DeadlineQosPolicy_.period.sec = DDS::DURATION_INFINITE_SEC;
00628   initial_DeadlineQosPolicy_.period.nanosec = DDS::DURATION_INFINITE_NSEC;
00629 
00630   initial_LatencyBudgetQosPolicy_.duration.sec = DDS::DURATION_ZERO_SEC;
00631   initial_LatencyBudgetQosPolicy_.duration.nanosec = DDS::DURATION_ZERO_NSEC;
00632 
00633   initial_OwnershipQosPolicy_.kind = DDS::SHARED_OWNERSHIP_QOS;
00634 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00635   initial_OwnershipStrengthQosPolicy_.value = 0;
00636 #endif
00637 
00638   initial_LivelinessQosPolicy_.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00639   initial_LivelinessQosPolicy_.lease_duration.sec = DDS::DURATION_INFINITE_SEC;
00640   initial_LivelinessQosPolicy_.lease_duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00641 
00642   initial_TimeBasedFilterQosPolicy_.minimum_separation.sec = DDS::DURATION_ZERO_SEC;
00643   initial_TimeBasedFilterQosPolicy_.minimum_separation.nanosec = DDS::DURATION_ZERO_NSEC;
00644 
00645   initial_ReliabilityQosPolicy_.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00646   initial_ReliabilityQosPolicy_.max_blocking_time.sec = DDS::DURATION_INFINITE_SEC;
00647   initial_ReliabilityQosPolicy_.max_blocking_time.nanosec = DDS::DURATION_INFINITE_NSEC;
00648 
00649   initial_DestinationOrderQosPolicy_.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00650 
00651   initial_HistoryQosPolicy_.kind = DDS::KEEP_LAST_HISTORY_QOS;
00652   initial_HistoryQosPolicy_.depth = 1;
00653 
00654   initial_ResourceLimitsQosPolicy_.max_samples = DDS::LENGTH_UNLIMITED;
00655   initial_ResourceLimitsQosPolicy_.max_instances = DDS::LENGTH_UNLIMITED;
00656   initial_ResourceLimitsQosPolicy_.max_samples_per_instance = DDS::LENGTH_UNLIMITED;
00657 
00658   initial_EntityFactoryQosPolicy_.autoenable_created_entities = true;
00659 
00660   initial_WriterDataLifecycleQosPolicy_.autodispose_unregistered_instances = true;
00661 
00662   initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00663   initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00664   initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00665   initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00666 
00667   initial_DomainParticipantQos_.user_data = initial_UserDataQosPolicy_;
00668   initial_DomainParticipantQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00669   initial_DomainParticipantFactoryQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00670 
00671   initial_TopicQos_.topic_data = initial_TopicDataQosPolicy_;
00672   initial_TopicQos_.durability = initial_DurabilityQosPolicy_;
00673   initial_TopicQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00674   initial_TopicQos_.deadline = initial_DeadlineQosPolicy_;
00675   initial_TopicQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00676   initial_TopicQos_.liveliness = initial_LivelinessQosPolicy_;
00677   initial_TopicQos_.reliability = initial_ReliabilityQosPolicy_;
00678   initial_TopicQos_.destination_order = initial_DestinationOrderQosPolicy_;
00679   initial_TopicQos_.history = initial_HistoryQosPolicy_;
00680   initial_TopicQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00681   initial_TopicQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00682   initial_TopicQos_.lifespan = initial_LifespanQosPolicy_;
00683   initial_TopicQos_.ownership = initial_OwnershipQosPolicy_;
00684 
00685   initial_DataWriterQos_.durability = initial_DurabilityQosPolicy_;
00686   initial_DataWriterQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00687   initial_DataWriterQos_.deadline = initial_DeadlineQosPolicy_;
00688   initial_DataWriterQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00689   initial_DataWriterQos_.liveliness = initial_LivelinessQosPolicy_;
00690   initial_DataWriterQos_.reliability = initial_ReliabilityQosPolicy_;
00691   initial_DataWriterQos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00692   initial_DataWriterQos_.reliability.max_blocking_time.sec = 0;
00693   initial_DataWriterQos_.reliability.max_blocking_time.nanosec = 100000000;
00694   initial_DataWriterQos_.destination_order = initial_DestinationOrderQosPolicy_;
00695   initial_DataWriterQos_.history = initial_HistoryQosPolicy_;
00696   initial_DataWriterQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00697   initial_DataWriterQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00698   initial_DataWriterQos_.lifespan = initial_LifespanQosPolicy_;
00699   initial_DataWriterQos_.user_data = initial_UserDataQosPolicy_;
00700   initial_DataWriterQos_.ownership = initial_OwnershipQosPolicy_;
00701 #ifdef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00702   initial_DataWriterQos_.ownership_strength.value = 0;
00703 #else
00704   initial_DataWriterQos_.ownership_strength = initial_OwnershipStrengthQosPolicy_;
00705 #endif
00706   initial_DataWriterQos_.writer_data_lifecycle = initial_WriterDataLifecycleQosPolicy_;
00707 
00708   initial_PublisherQos_.presentation = initial_PresentationQosPolicy_;
00709   initial_PublisherQos_.partition = initial_PartitionQosPolicy_;
00710   initial_PublisherQos_.group_data = initial_GroupDataQosPolicy_;
00711   initial_PublisherQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00712 
00713   initial_DataReaderQos_.durability = initial_DurabilityQosPolicy_;
00714   initial_DataReaderQos_.deadline = initial_DeadlineQosPolicy_;
00715   initial_DataReaderQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00716   initial_DataReaderQos_.liveliness = initial_LivelinessQosPolicy_;
00717   initial_DataReaderQos_.reliability = initial_ReliabilityQosPolicy_;
00718   initial_DataReaderQos_.destination_order = initial_DestinationOrderQosPolicy_;
00719   initial_DataReaderQos_.history = initial_HistoryQosPolicy_;
00720   initial_DataReaderQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00721   initial_DataReaderQos_.user_data = initial_UserDataQosPolicy_;
00722   initial_DataReaderQos_.time_based_filter = initial_TimeBasedFilterQosPolicy_;
00723   initial_DataReaderQos_.ownership = initial_OwnershipQosPolicy_;
00724   initial_DataReaderQos_.reader_data_lifecycle = initial_ReaderDataLifecycleQosPolicy_;
00725 
00726   initial_SubscriberQos_.presentation = initial_PresentationQosPolicy_;
00727   initial_SubscriberQos_.partition = initial_PartitionQosPolicy_;
00728   initial_SubscriberQos_.group_data = initial_GroupDataQosPolicy_;
00729   initial_SubscriberQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00730 }
00731 
00732 void
00733 Service_Participant::initializeScheduling()
00734 {
00735   //
00736   // Establish the scheduler if specified.
00737   //
00738   if (this->schedulerString_.length() == 0) {
00739     if (DCPS_debug_level > 0) {
00740       ACE_DEBUG((LM_NOTICE,
00741                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::intializeScheduling() - ")
00742                  ACE_TEXT("no scheduling policy specified, not setting policy.\n")));
00743     }
00744 
00745   } else {
00746     //
00747     // Translate the scheduling policy to a usable value.
00748     //
00749     int ace_scheduler = ACE_SCHED_OTHER;
00750     this->scheduler_  = THR_SCHED_DEFAULT;
00751 
00752     if (this->schedulerString_ == ACE_TEXT("SCHED_RR")) {
00753       this->scheduler_ = THR_SCHED_RR;
00754       ace_scheduler    = ACE_SCHED_RR;
00755 
00756     } else if (this->schedulerString_ == ACE_TEXT("SCHED_FIFO")) {
00757       this->scheduler_ = THR_SCHED_FIFO;
00758       ace_scheduler    = ACE_SCHED_FIFO;
00759 
00760     } else if (this->schedulerString_ == ACE_TEXT("SCHED_OTHER")) {
00761       this->scheduler_ = THR_SCHED_DEFAULT;
00762       ace_scheduler    = ACE_SCHED_OTHER;
00763 
00764     } else {
00765       ACE_DEBUG((LM_WARNING,
00766                  ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00767                  ACE_TEXT("unrecognized scheduling policy: %s, set to SCHED_OTHER.\n"),
00768                  this->schedulerString_.c_str()));
00769     }
00770 
00771     //
00772     // Attempt to set the scheduling policy.
00773     //
00774 #ifdef ACE_WIN32
00775     ACE_DEBUG((LM_NOTICE,
00776                ACE_TEXT("(%P|%t) NOTICE: Service_Participant::initializeScheduling() - ")
00777                ACE_TEXT("scheduling is not implemented on Win32.\n")));
00778     ACE_UNUSED_ARG(ace_scheduler);
00779 #else
00780     ACE_Sched_Params params(
00781       ace_scheduler,
00782       ACE_Sched_Params::priority_min(ace_scheduler),
00783       ACE_SCOPE_THREAD,
00784       this->schedulerQuantum_);
00785 
00786     if (ACE_OS::sched_params(params) != 0) {
00787       if (ACE_OS::last_error() == EPERM) {
00788         ACE_DEBUG((LM_WARNING,
00789                    ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00790                    ACE_TEXT("user is not superuser, requested scheduler not set.\n")));
00791 
00792       } else {
00793         ACE_ERROR((LM_ERROR,
00794                    ACE_TEXT("(%P|%t) ERROR: Service_Participant::initializeScheduling() - ")
00795                    ACE_TEXT("sched_params failed: %m.\n")));
00796       }
00797 
00798       // Reset the scheduler value(s) if we did not succeed.
00799       this->scheduler_ = -1;
00800       ace_scheduler    = ACE_SCHED_OTHER;
00801 
00802     } else if (DCPS_debug_level > 0) {
00803       ACE_DEBUG((LM_DEBUG,
00804                  ACE_TEXT("(%P|%t) Service_Participant::initializeScheduling() - ")
00805                  ACE_TEXT("scheduling policy set to %s(%d).\n"),
00806                  this->schedulerString_.c_str()));
00807     }
00808 
00809     //
00810     // Setup some scheduler specific information for later use.
00811     //
00812     this->priority_min_ = ACE_Sched_Params::priority_min(ace_scheduler, ACE_SCOPE_THREAD);
00813     this->priority_max_ = ACE_Sched_Params::priority_max(ace_scheduler, ACE_SCOPE_THREAD);
00814 #endif // ACE_WIN32
00815   }
00816 }
00817 
00818 #ifdef DDS_HAS_WCHAR
00819 bool
00820 Service_Participant::set_repo_ior(const wchar_t* ior,
00821                                   Discovery::RepoKey key,
00822                                   bool attach_participant)
00823 {
00824   return set_repo_ior(ACE_Wide_To_Ascii(ior).char_rep(), key, attach_participant);
00825 }
00826 #endif
00827 
00828 bool
00829 Service_Participant::set_repo_ior(const char* ior,
00830                                   Discovery::RepoKey key,
00831                                   bool attach_participant)
00832 {
00833   if (DCPS_debug_level > 0) {
00834     ACE_DEBUG((LM_DEBUG,
00835                ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior: Repo[ %C] == %C\n"),
00836                key.c_str(), ior));
00837   }
00838 
00839   // This is a global used for the bizzare commandline/configfile
00840   // processing done for this class.
00841   got_info = true;
00842 
00843   if (key == "-1") {
00844     key = Discovery::DEFAULT_REPO;
00845   }
00846 
00847   const OPENDDS_STRING repo_type = ACE_TEXT_ALWAYS_CHAR(REPO_SECTION_NAME);
00848   if (!discovery_types_.count(repo_type)) {
00849     // Re-use a transport registry function to attempt a dynamic load of the
00850     // library that implements the 'repo_type' (InfoRepoDiscovery)
00851     TheTransportRegistry->load_transport_lib(repo_type);
00852   }
00853 
00854   if (discovery_types_.count(repo_type)) {
00855     ACE_Configuration_Heap cf;
00856     cf.open();
00857     ACE_Configuration_Section_Key sect_key;
00858     ACE_TString section = REPO_SECTION_NAME;
00859     section += ACE_TEXT('\\');
00860     section += ACE_TEXT_CHAR_TO_TCHAR(key.c_str());
00861     cf.open_section(cf.root_section(), section.c_str(), 1 /*create*/, sect_key);
00862     cf.set_string_value(sect_key, ACE_TEXT("RepositoryIor"),
00863                         ACE_TEXT_CHAR_TO_TCHAR(ior));
00864 
00865     discovery_types_[repo_type]->discovery_config(cf);
00866 
00867     this->remap_domains(key, key, attach_participant);
00868     return true;
00869   }
00870 
00871   ACE_ERROR_RETURN((LM_ERROR,
00872                     ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior ")
00873                     ACE_TEXT("ERROR - no discovery type registered for ")
00874                     ACE_TEXT("InfoRepoDiscovery\n")),
00875                    false);
00876 }
00877 
00878 void
00879 Service_Participant::remap_domains(Discovery::RepoKey oldKey,
00880                                    Discovery::RepoKey newKey,
00881                                    bool attach_participant)
00882 {
00883   // Search the mappings for any domains mapped to this repository.
00884   OPENDDS_VECTOR(DDS::DomainId_t) domainList;
00885   {
00886     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00887 
00888     for (DomainRepoMap::const_iterator current = this->domainRepoMap_.begin();
00889          current != this->domainRepoMap_.end();
00890          ++current) {
00891       if (current->second == oldKey) {
00892         domainList.push_back(current->first);
00893       }
00894     }
00895   }
00896 
00897   // Remap the domains that were attached to this repository.
00898   for (unsigned int index = 0; index < domainList.size(); ++index) {
00899     // For mapped domains, attach their participants by setting the
00900     // mapping again.
00901     this->set_repo_domain(domainList[ index], newKey, attach_participant);
00902   }
00903 }
00904 
00905 void
00906 Service_Participant::set_repo_domain(const DDS::DomainId_t domain,
00907                                      Discovery::RepoKey key,
00908                                      bool attach_participant)
00909 {
00910   typedef std::pair<Discovery_rch, RepoId> DiscRepoPair;
00911   OPENDDS_VECTOR(DiscRepoPair) repoList;
00912   {
00913     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00914     DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
00915 
00916     if (key == "-1") {
00917       key = Discovery::DEFAULT_REPO;
00918     }
00919 
00920     if ((where == this->domainRepoMap_.end()) || (where->second != key)) {
00921       // Only assign entries into the map when they change the
00922       // contents.
00923       this->domainRepoMap_[ domain] = key;
00924 
00925       if (DCPS_debug_level > 0) {
00926         ACE_DEBUG((LM_DEBUG,
00927                    ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00928                    ACE_TEXT("Domain[ %d] = Repo[ %C].\n"),
00929                    domain, key.c_str()));
00930       }
00931     }
00932 
00933     //
00934     // Make sure that we mark each DomainParticipant for this domain
00935     // using this repository as attached to this repository.
00936     //
00937     // @TODO: Move this note into user documentation.
00938     // N.B. Calling set_repo() or set_repo_ior() will result in this
00939     //      code executing again with the new repository.  It is best
00940     //      to call those routines first when making changes.
00941     //
00942 
00943     // No servant means no participant.  No worries.
00944     if (this->dp_factory_servant_) {
00945       // Map of domains to sets of participants.
00946       const DomainParticipantFactoryImpl::DPMap& participants
00947       = this->dp_factory_servant_->participants();
00948 
00949       // Extract the set of participants for the current domain.
00950       DomainParticipantFactoryImpl::DPMap::const_iterator
00951       which  = participants.find(domain);
00952 
00953       if (which != participants.end()) {
00954         // Extract the repository to attach this domain to.
00955         RepoKeyDiscoveryMap::const_iterator disc_iter = this->discoveryMap_.find(key);
00956 
00957         if (disc_iter != this->discoveryMap_.end()) {
00958           for (DomainParticipantFactoryImpl::DPSet::const_iterator
00959                current  = which->second.begin();
00960                current != which->second.end();
00961                ++current) {
00962             try {
00963               // Attach each DomainParticipant in this domain to this
00964               // repository.
00965               RepoId id = (*current)->get_id();
00966               repoList.push_back(std::make_pair(disc_iter->second, id));
00967 
00968               if (DCPS_debug_level > 0) {
00969                 GuidConverter converter(id);
00970                 ACE_DEBUG((LM_DEBUG,
00971                            ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00972                            ACE_TEXT("participant %C attached to Repo[ %C].\n"),
00973                            OPENDDS_STRING(converter).c_str(),
00974                            key.c_str()));
00975               }
00976 
00977             } catch (const CORBA::Exception& ex) {
00978               ex._tao_print_exception(
00979                 "ERROR: Service_Participant::set_repo_domain: failed to attach repository - ");
00980               return;
00981             }
00982           }
00983         }
00984       }
00985     }
00986   } // End of GUARD scope.
00987 
00988   // Make all of the remote calls after releasing the lock.
00989   for (unsigned int index = 0; index < repoList.size(); ++index) {
00990     if (DCPS_debug_level > 0) {
00991       GuidConverter converter(repoList[ index].second);
00992       ACE_DEBUG((LM_DEBUG,
00993                  ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00994                  ACE_TEXT("(%d of %d) attaching domain %d participant %C to Repo[ %C].\n"),
00995                  (1+index), repoList.size(), domain,
00996                  OPENDDS_STRING(converter).c_str(),
00997                  key.c_str()));
00998     }
00999 
01000     if (attach_participant)
01001     {
01002       repoList[ index].first->attach_participant(domain, repoList[ index].second);
01003     }
01004   }
01005 }
01006 
01007 void
01008 Service_Participant::repository_lost(Discovery::RepoKey key)
01009 {
01010   // Find the lost repository.
01011   RepoKeyDiscoveryMap::iterator initialLocation = this->discoveryMap_.find(key);
01012   RepoKeyDiscoveryMap::iterator current         = initialLocation;
01013 
01014   if (current == this->discoveryMap_.end()) {
01015     ACE_DEBUG((LM_WARNING,
01016                ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01017                ACE_TEXT("lost repository %C was not present, ")
01018                ACE_TEXT("finding another anyway.\n"),
01019                key.c_str()));
01020 
01021   } else {
01022     // Start with the repository *after* the lost one.
01023     ++current;
01024   }
01025 
01026   // Calculate the bounding end time for attempts.
01027   ACE_Time_Value recoveryFailedTime
01028   = ACE_OS::gettimeofday()
01029     + ACE_Time_Value(this->federation_recovery_duration(), 0);
01030 
01031   // Backoff delay.
01032   int backoff = this->federation_initial_backoff_seconds();
01033 
01034   // Keep trying until the total recovery time specified is exceeded.
01035   while (recoveryFailedTime > ACE_OS::gettimeofday()) {
01036 
01037     // Wrap to the beginning at the end of the list.
01038     if (current == this->discoveryMap_.end()) {
01039       // Continue to traverse the list.
01040       current = this->discoveryMap_.begin();
01041     }
01042 
01043     // Handle reaching the lost repository by waiting before trying
01044     // again.
01045     if (current == initialLocation) {
01046       if (DCPS_debug_level > 0) {
01047         ACE_DEBUG((LM_DEBUG,
01048                    ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01049                    ACE_TEXT("waiting %d seconds to traverse the ")
01050                    ACE_TEXT("repository list another time ")
01051                    ACE_TEXT("for lost key %C.\n"),
01052                    backoff,
01053                    key.c_str()));
01054       }
01055 
01056       // Wait to traverse the list and try again.
01057       ACE_OS::sleep(backoff);
01058 
01059       // Exponentially backoff delay.
01060       backoff *= this->federation_backoff_multiplier();
01061 
01062       // Don't increment current to allow us to reattach to the
01063       // original repository if it is restarted.
01064     }
01065 
01066     // Check the availability of the current repository.
01067     if (current->second->active()) {
01068 
01069       if (DCPS_debug_level > 0) {
01070         ACE_DEBUG((LM_DEBUG,
01071                    ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01072                    ACE_TEXT("replacing repository %C with %C.\n"),
01073                    key.c_str(),
01074                    current->first.c_str()));
01075       }
01076 
01077       // If we reach here, the validate_connection() call succeeded
01078       // and the repository is reachable.
01079       this->remap_domains(key, current->first);
01080 
01081       // Now we are done.  This is the only non-failure exit from
01082       // this method.
01083       return;
01084 
01085     } else {
01086       ACE_DEBUG((LM_WARNING,
01087                  ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01088                  ACE_TEXT("repository %C was not available to replace %C, ")
01089                  ACE_TEXT("looking for another.\n"),
01090                  current->first.c_str(),
01091                  key.c_str()));
01092     }
01093 
01094     // Move to the next candidate repository.
01095     ++current;
01096   }
01097 
01098   // If we reach here, we have exceeded the total recovery time
01099   // specified.
01100   ACE_ASSERT(recoveryFailedTime == ACE_Time_Value::zero);
01101 }
01102 
01103 void
01104 Service_Participant::set_default_discovery(const Discovery::RepoKey& key)
01105 {
01106   this->defaultDiscovery_ = key;
01107 }
01108 
01109 Discovery::RepoKey
01110 Service_Participant::get_default_discovery()
01111 {
01112   return this->defaultDiscovery_;
01113 }
01114 
01115 Discovery_rch
01116 Service_Participant::get_discovery(const DDS::DomainId_t domain)
01117 {
01118   // Default to the Default InfoRepo-based discovery unless the user has
01119   // changed defaultDiscovery_ using the API or config file
01120   Discovery::RepoKey repo = defaultDiscovery_;
01121 
01122   // Find if this domain has a repo key (really a discovery key)
01123   // mapped to it.
01124   DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
01125   if (where != this->domainRepoMap_.end()) {
01126     repo = where->second;
01127   }
01128 
01129   RepoKeyDiscoveryMap::const_iterator location = this->discoveryMap_.find(repo);
01130 
01131   if (location == this->discoveryMap_.end()) {
01132     if ((repo == Discovery::DEFAULT_REPO) ||
01133         (repo == "-1")) {
01134       // Set the default repository IOR if it hasn't already happened
01135       // by this point.  This is why this can't be const.
01136       bool ok = this->set_repo_ior(DEFAULT_REPO_IOR, Discovery::DEFAULT_REPO);
01137 
01138       if (!ok) {
01139         if (DCPS_debug_level > 0) {
01140           ACE_DEBUG((LM_DEBUG,
01141                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01142                      ACE_TEXT("failed attempt to set default IOR for domain %d.\n"),
01143                      domain));
01144         }
01145 
01146       } else {
01147         // Found the default!
01148         if (DCPS_debug_level > 4) {
01149           ACE_DEBUG((LM_DEBUG,
01150                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01151                      ACE_TEXT("returning default repository for domain %d.\n"),
01152                      domain));
01153         }
01154 
01155       }
01156       return this->discoveryMap_[Discovery::DEFAULT_REPO];
01157 
01158     } else if (repo == Discovery::DEFAULT_RTPS) {
01159 
01160       ACE_Configuration_Heap cf;
01161       cf.open();
01162       ACE_Configuration_Section_Key k;
01163       cf.open_section(cf.root_section(), RTPS_SECTION_NAME, 1 /*create*/, k);
01164       this->load_discovery_configuration(cf, RTPS_SECTION_NAME);
01165 
01166       // Try to find it again
01167       location = this->discoveryMap_.find(Discovery::DEFAULT_RTPS);
01168 
01169       if (location == this->discoveryMap_.end()) {
01170         // Unable to load DEFAULT_RTPS
01171         if (DCPS_debug_level > 0) {
01172           ACE_DEBUG((LM_DEBUG,
01173                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01174                      ACE_TEXT("failed attempt to set default RTPS discovery for domain %d.\n"),
01175                      domain));
01176         }
01177 
01178         return Discovery_rch();
01179 
01180       } else {
01181         // Found the default!
01182         if (DCPS_debug_level > 4) {
01183           ACE_DEBUG((LM_DEBUG,
01184                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01185                      ACE_TEXT("returning default RTPS discovery for domain %d.\n"),
01186                      domain));
01187         }
01188 
01189         return location->second;
01190       }
01191 
01192     } else {
01193       // Non-default repositories _must_ be loaded by application.
01194       if (DCPS_debug_level > 4) {
01195         ACE_DEBUG((LM_DEBUG,
01196                    ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01197                    ACE_TEXT("repository for domain %d was not set.\n"),
01198                    domain));
01199       }
01200 
01201       return Discovery_rch();
01202     }
01203   }
01204 
01205   if (DCPS_debug_level > 4) {
01206     ACE_DEBUG((LM_DEBUG,
01207                ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01208                ACE_TEXT("returning repository for domain %d, repo %C.\n"),
01209                domain, repo.c_str()));
01210   }
01211 
01212   return location->second;
01213 }
01214 
01215 OPENDDS_STRING
01216 Service_Participant::bit_transport_ip() const
01217 {
01218   return ACE_TEXT_ALWAYS_CHAR(this->bit_transport_ip_.c_str());
01219 }
01220 
01221 int
01222 Service_Participant::bit_transport_port() const
01223 {
01224   return this->bit_transport_port_;
01225 }
01226 
01227 void
01228 Service_Participant::bit_transport_port(int port)
01229 {
01230   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01231   this->bit_transport_port_ = port;
01232   got_bit_transport_port = true;
01233 }
01234 
01235 int
01236 Service_Participant::bit_lookup_duration_msec() const
01237 {
01238   return bit_lookup_duration_msec_;
01239 }
01240 
01241 void
01242 Service_Participant::bit_lookup_duration_msec(int sec)
01243 {
01244   bit_lookup_duration_msec_ = sec;
01245   got_bit_lookup_duration_msec = true;
01246 }
01247 
01248 size_t
01249 Service_Participant::n_chunks() const
01250 {
01251   return n_chunks_;
01252 }
01253 
01254 void
01255 Service_Participant::n_chunks(size_t chunks)
01256 {
01257   n_chunks_ = chunks;
01258   got_chunks = true;
01259 }
01260 
01261 size_t
01262 Service_Participant::association_chunk_multiplier() const
01263 {
01264   return association_chunk_multiplier_;
01265 }
01266 
01267 void
01268 Service_Participant::association_chunk_multiplier(size_t multiplier)
01269 {
01270   association_chunk_multiplier_ = multiplier;
01271   got_chunk_association_multiplier = true;
01272 }
01273 
01274 void
01275 Service_Participant::liveliness_factor(int factor)
01276 {
01277   liveliness_factor_ = factor;
01278   got_liveliness_factor = true;
01279 }
01280 
01281 int
01282 Service_Participant::liveliness_factor() const
01283 {
01284   return liveliness_factor_;
01285 }
01286 
01287 void
01288 Service_Participant::register_discovery_type(const char* section_name,
01289                                              Discovery::Config* cfg)
01290 {
01291   discovery_types_[section_name].reset(cfg);
01292 }
01293 
01294 int
01295 Service_Participant::load_configuration()
01296 {
01297   int status = 0;
01298 
01299   if ((status = this->cf_.open()) != 0)
01300     ACE_ERROR_RETURN((LM_ERROR,
01301                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01302                       ACE_TEXT("open() returned %d\n"),
01303                       status),
01304                      -1);
01305 
01306   ACE_Ini_ImpExp import(this->cf_);
01307   status = import.import_config(config_fname.c_str());
01308 
01309   if (status != 0) {
01310     ACE_ERROR_RETURN((LM_ERROR,
01311                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01312                       ACE_TEXT("import_config () returned %d\n"),
01313                       status),
01314                      -1);
01315   } else {
01316     status = this->load_configuration(this->cf_, config_fname.c_str());
01317   }
01318   return status;
01319 }
01320 
01321 int
01322 Service_Participant::load_configuration(
01323   ACE_Configuration_Heap& config,
01324   const ACE_TCHAR* filename)
01325 {
01326   int status = 0;
01327 
01328   status = this->load_common_configuration(config, filename);
01329 
01330   if (status != 0) {
01331     ACE_ERROR_RETURN((LM_ERROR,
01332                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01333                       ACE_TEXT("load_common_configuration () returned %d\n"),
01334                       status),
01335                      -1);
01336   }
01337 
01338   // Register static discovery.
01339   this->add_discovery(static_rchandle_cast<Discovery>(StaticDiscovery::instance()));
01340 
01341   status = this->load_discovery_configuration(config, RTPS_SECTION_NAME);
01342 
01343   if (status != 0) {
01344     ACE_ERROR_RETURN((LM_ERROR,
01345                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01346                       ACE_TEXT("load_discovery_configuration() returned %d\n"),
01347                       status),
01348                      -1);
01349   }
01350 
01351   status = this->load_discovery_configuration(config, REPO_SECTION_NAME);
01352 
01353   if (status != 0) {
01354     ACE_ERROR_RETURN((LM_ERROR,
01355                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01356                       ACE_TEXT("load_discovery_configuration() returned %d\n"),
01357                       status),
01358                      -1);
01359   }
01360 
01361   status = TransportRegistry::instance()->load_transport_configuration(
01362              ACE_TEXT_ALWAYS_CHAR(filename), config);
01363   if (this->global_transport_config_ != ACE_TEXT("")) {
01364     TransportConfig_rch config = TransportRegistry::instance()->get_config(
01365       ACE_TEXT_ALWAYS_CHAR(this->global_transport_config_.c_str()));
01366     if (!config) {
01367       ACE_ERROR_RETURN((LM_ERROR,
01368                         ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01369                         ACE_TEXT("Unable to locate specified global transport config: %s\n"),
01370                         this->global_transport_config_.c_str()),
01371                        -1);
01372     }
01373     TransportRegistry::instance()->global_config(config);
01374   }
01375 
01376   if (status != 0) {
01377     ACE_ERROR_RETURN((LM_ERROR,
01378                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01379                       ACE_TEXT("load_transport_configuration () returned %d\n"),
01380                       status),
01381                      -1);
01382   }
01383 
01384   // Needs to be loaded after the [rtps_discovery/*] and [repository/*]
01385   // sections to allow error reporting on bad discovery config names.
01386   // Also loaded after the transport configuration so that
01387   // DefaultTransportConfig within [domain/*] can use TransportConfig objects.
01388   status = this->load_domain_configuration(config, filename);
01389 
01390   if (status != 0) {
01391     ACE_ERROR_RETURN((LM_ERROR,
01392                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01393                       ACE_TEXT("load_domain_configuration () returned %d\n"),
01394                       status),
01395                      -1);
01396   }
01397 
01398   // Needs to be loaded after transport configs and instances and domains.
01399   try {
01400     status = StaticDiscovery::instance()->load_configuration(config);
01401 
01402     if (status != 0) {
01403       ACE_ERROR_RETURN((LM_ERROR,
01404         ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01405         ACE_TEXT("load_discovery_configuration() returned %d\n"),
01406         status),
01407         -1);
01408     }
01409   } catch (const CORBA::BAD_PARAM& ex) {
01410     ex._tao_print_exception("Exception caught in Service_Participant::load_configuration: "
01411       "trying to load_discovery_configuration()");
01412     return -1;
01413   }
01414 
01415   return 0;
01416 }
01417 
01418 int
01419 Service_Participant::load_common_configuration(ACE_Configuration_Heap& cf,
01420                                                const ACE_TCHAR* filename)
01421 {
01422   const ACE_Configuration_Section_Key &root = cf.root_section();
01423   ACE_Configuration_Section_Key sect;
01424 
01425   if (cf.open_section(root, COMMON_SECTION_NAME, 0, sect) != 0) {
01426     if (DCPS_debug_level > 0) {
01427       // This is not an error if the configuration file does not have
01428       // a common section. The code default configuration will be used.
01429       ACE_DEBUG((LM_NOTICE,
01430                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_common_configuration ")
01431                  ACE_TEXT("failed to open section %s\n"),
01432                  COMMON_SECTION_NAME));
01433     }
01434 
01435     return 0;
01436 
01437   } else {
01438     if (got_debug_level) {
01439       ACE_DEBUG((LM_NOTICE,
01440                  ACE_TEXT("(%P|%t) NOTICE: using DCPSDebugLevel value from command option (overrides value if it's in config file).\n")));
01441     } else {
01442       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSDebugLevel"), DCPS_debug_level, int)
01443     }
01444 
01445     if (got_info) {
01446       ACE_DEBUG((LM_NOTICE,
01447                  ACE_TEXT("(%P|%t) NOTICE: using DCPSInfoRepo value from command option (overrides value if it's in config file).\n")));
01448     } else {
01449       ACE_TString value;
01450       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSInfoRepo"), value)
01451       if (!value.empty()) {
01452         this->set_repo_ior(value.c_str(), Discovery::DEFAULT_REPO);
01453       }
01454     }
01455 
01456     if (got_use_rti_serialization) {
01457       ACE_DEBUG((LM_NOTICE,
01458                  ACE_TEXT("(%P|%t) NOTICE: using DCPSRTISerialization value from command option (overrides value if it's in config file).\n")));
01459     } else {
01460       bool should_use = false;
01461       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSRTISerialization"), should_use, bool)
01462       Serializer::set_use_rti_serialization(should_use);
01463     }
01464 
01465     if (got_chunks) {
01466       ACE_DEBUG((LM_NOTICE,
01467                  ACE_TEXT("(%P|%t) NOTICE: using DCPSChunks value from command option (overrides value if it's in config file).\n")));
01468     } else {
01469       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunks"), this->n_chunks_, size_t)
01470     }
01471 
01472     if (got_chunk_association_multiplier) {
01473       ACE_DEBUG((LM_NOTICE,
01474                  ACE_TEXT("(%P|%t) NOTICE: using DCPSChunkAssociationMutltiplier value from command option (overrides value if it's in config file).\n")));
01475     } else {
01476       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMutltiplier"), this->association_chunk_multiplier_, size_t)
01477     }
01478 
01479     if (got_bit_transport_port) {
01480       ACE_DEBUG((LM_NOTICE,
01481                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportPort value from command option (overrides value if it's in config file).\n")));
01482     } else {
01483       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportPort"), this->bit_transport_port_, int)
01484     }
01485 
01486     if (got_bit_transport_ip) {
01487       ACE_DEBUG((LM_DEBUG,
01488                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportIPAddress value from command option (overrides value if it's in config file).\n")));
01489     } else {
01490       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportIPAddress"), this->bit_transport_ip_)
01491     }
01492 
01493     if (got_liveliness_factor) {
01494       ACE_DEBUG((LM_DEBUG,
01495                  ACE_TEXT("(%P|%t) NOTICE: using DCPSLivelinessFactor value from command option (overrides value if it's in config file).\n")));
01496     } else {
01497       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLivelinessFactor"), this->liveliness_factor_, int)
01498     }
01499 
01500     if (got_bit_lookup_duration_msec) {
01501       ACE_DEBUG((LM_DEBUG,
01502                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitLookupDurationMsec value from command option (overrides value if it's in config file).\n")));
01503     } else {
01504       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitLookupDurationMsec"), this->bit_lookup_duration_msec_, int)
01505     }
01506 
01507     if (got_global_transport_config) {
01508       ACE_DEBUG((LM_DEBUG,
01509                  ACE_TEXT("(%P|%t) NOTICE: using DCPSGlobalTransportConfig value from command option (overrides value if it's in config file).\n")));
01510     } else {
01511       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSGlobalTransportConfig"), this->global_transport_config_);
01512       if (this->global_transport_config_ == ACE_TEXT("$file")) {
01513         // When the special string of "$file" is used, substitute the file name
01514         this->global_transport_config_ = filename;
01515       }
01516     }
01517 
01518     if (got_bit_flag) {
01519       ACE_DEBUG((LM_DEBUG,
01520                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBit value from command option (overrides value if it's in config file).\n")));
01521     } else {
01522       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBit"), this->bit_enabled_, int)
01523     }
01524 
01525 #if defined(OPENDDS_SECURITY)
01526     if (got_security_flag) {
01527       ACE_DEBUG((LM_DEBUG,
01528                  ACE_TEXT("(%P|%t) NOTICE: using DCPSSecurity value from command option (overrides value if it's in config file).\n")));
01529     } else {
01530       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSSecurity"), this->security_enabled_, int)
01531     }
01532 #endif
01533 
01534     if (got_transport_debug_level) {
01535       ACE_DEBUG((LM_NOTICE,
01536                  ACE_TEXT("(%P|%t) NOTICE: using DCPSTransportDebugLevel value from command option (overrides value if it's in config file).\n")));
01537     } else {
01538       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSTransportDebugLevel"), OpenDDS::DCPS::Transport_debug_level, int)
01539     }
01540 
01541 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01542     if (got_persistent_data_dir) {
01543       ACE_DEBUG((LM_NOTICE,
01544                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPersistentDataDir value from command option (overrides value if it's in config file).\n")));
01545     } else {
01546       ACE_TString value;
01547       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSPersistentDataDir"), value)
01548       this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(value.c_str());
01549     }
01550 #endif
01551 
01552     if (got_pending_timeout) {
01553       ACE_DEBUG((LM_NOTICE,
01554                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPendingTimeout value from command option (overrides value if it's in config file).\n")));
01555     } else {
01556       int timeout = 0;
01557       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPendingTimeout"), timeout, int)
01558       this->pending_timeout_ = timeout;
01559     }
01560 
01561     if (got_publisher_content_filter) {
01562       ACE_DEBUG((LM_DEBUG,
01563                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPublisherContentFilter ")
01564                  ACE_TEXT("value from command option (overrides value if it's ")
01565                  ACE_TEXT("in config file).\n")));
01566     } else {
01567       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPublisherContentFilter"),
01568         this->publisher_content_filter_, bool)
01569     }
01570 
01571     if (got_default_discovery) {
01572       ACE_Configuration::VALUETYPE type;
01573       if (cf.find_value(sect, ACE_TEXT("DCPSDefaultDiscovery"), type) != -1) {
01574         ACE_DEBUG((LM_NOTICE,
01575                    ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultDiscovery value ")
01576                    ACE_TEXT("from command option, overriding config file\n")));
01577       }
01578     } else {
01579       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultDiscovery"),
01580         this->defaultDiscovery_);
01581     }
01582 
01583     if (got_bidir_giop) {
01584       ACE_Configuration::VALUETYPE type;
01585       if (cf.find_value(sect, ACE_TEXT("DCPSBidirGIOP"), type) != -1) {
01586         ACE_DEBUG((LM_NOTICE,
01587           ACE_TEXT("(%P|%t) NOTICE: using DCPSBidirGIOP value ")
01588           ACE_TEXT("from command option, overriding config file\n")));
01589       }
01590     } else {
01591       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBidirGIOP"), bidir_giop_, bool)
01592     }
01593 
01594     ACE_Configuration::VALUETYPE type;
01595     if (got_log_fname) {
01596       if (cf.find_value(sect, ACE_TEXT("ORBLogFile"), type) != -1) {
01597         ACE_DEBUG((LM_NOTICE,
01598                    ACE_TEXT("(%P|%t) NOTICE: using ORBLogFile value ")
01599                    ACE_TEXT("from command option, overriding config file\n")));
01600       }
01601     } else {
01602       OPENDDS_STRING log_fname;
01603       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("ORBLogFile"), log_fname);
01604       if (!log_fname.empty()) {
01605         set_log_file_name(log_fname.c_str());
01606       }
01607     }
01608 
01609     if (got_log_verbose) {
01610       if (cf.find_value(sect, ACE_TEXT("ORBVerboseLogging"), type) != -1) {
01611         ACE_DEBUG((LM_NOTICE,
01612                    ACE_TEXT("(%P|%t) NOTICE: using ORBVerboseLogging value ")
01613                    ACE_TEXT("from command option, overriding config file\n")));
01614       }
01615     } else {
01616       unsigned long verbose_logging = 0;
01617       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ORBVerboseLogging"), verbose_logging, unsigned long);
01618       set_log_verbose(verbose_logging);
01619     }
01620 
01621     if (got_default_address) {
01622       ACE_DEBUG((LM_DEBUG,
01623                  ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultAddress value from command option (overrides value if it's in config file).\n")));
01624     } else {
01625       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultAddress"), this->default_address_)
01626     }
01627 
01628     if (got_monitor) {
01629       ACE_DEBUG((LM_DEBUG,
01630                  ACE_TEXT("(%P|%t) NOTICE: using DCPSMonitor value from command option (overrides value if it's in config file).\n")));
01631     } else {
01632       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSMonitor"), monitor_enabled_, bool)
01633     }
01634 
01635     // These are not handled on the command line.
01636     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationRecoveryDuration"), this->federation_recovery_duration_, int)
01637     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationInitialBackoffSeconds"), this->federation_initial_backoff_seconds_, int)
01638     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationBackoffMultiplier"), this->federation_backoff_multiplier_, int)
01639     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationLivelinessDuration"), this->federation_liveliness_, int)
01640 
01641 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01642     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_size"), pool_size_, size_t)
01643     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_granularity"), pool_granularity_, size_t)
01644 #endif
01645 
01646     //
01647     // Establish the scheduler if specified.
01648     //
01649     GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("scheduler"), this->schedulerString_)
01650 
01651     suseconds_t usec(0);
01652 
01653     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("scheduler_slice"), usec, suseconds_t)
01654 
01655     if (usec > 0)
01656       this->schedulerQuantum_.usec(usec);
01657   }
01658 
01659   return 0;
01660 }
01661 
01662 int
01663 Service_Participant::load_domain_configuration(ACE_Configuration_Heap& cf,
01664                                                const ACE_TCHAR* filename)
01665 {
01666   const ACE_Configuration_Section_Key& root = cf.root_section();
01667   ACE_Configuration_Section_Key domain_sect;
01668 
01669   if (cf.open_section(root, DOMAIN_SECTION_NAME, 0, domain_sect) != 0) {
01670     if (DCPS_debug_level > 0) {
01671       // This is not an error if the configuration file does not have
01672       // any domain (sub)section. The code default configuration will be used.
01673       ACE_DEBUG((LM_NOTICE,
01674                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_configuration ")
01675                  ACE_TEXT("failed to open [%s] section - using code default.\n"),
01676                  DOMAIN_SECTION_NAME));
01677     }
01678 
01679     return 0;
01680 
01681   } else {
01682     // Ensure there are no properties in this section
01683     ValueMap vm;
01684     if (pullValues(cf, domain_sect, vm) > 0) {
01685       // There are values inside [domain]
01686       ACE_ERROR_RETURN((LM_ERROR,
01687                         ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01688                         ACE_TEXT("domain sections must have a subsection name\n")),
01689                        -1);
01690     }
01691     // Process the subsections of this section (the individual domains)
01692     KeyList keys;
01693     if (processSections(cf, domain_sect, keys) != 0) {
01694       ACE_ERROR_RETURN((LM_ERROR,
01695                         ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01696                         ACE_TEXT("too many nesting layers in the [domain] section.\n")),
01697                        -1);
01698     }
01699 
01700     // Loop through the [domain/*] sections
01701     for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01702       OPENDDS_STRING domain_name = it->first;
01703 
01704       ValueMap values;
01705       pullValues(cf, it->second, values);
01706       DDS::DomainId_t domainId = -1;
01707       Discovery::RepoKey repoKey;
01708       OPENDDS_STRING perDomainDefaultTportConfig;
01709       for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01710         OPENDDS_STRING name = it->first;
01711         if (name == "DomainId") {
01712           OPENDDS_STRING value = it->second;
01713           if (!convertToInteger(value, domainId)) {
01714             ACE_ERROR_RETURN((LM_ERROR,
01715                               ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01716                               ACE_TEXT("Illegal integer value for DomainId (%C) in [domain/%C] section.\n"),
01717                               value.c_str(), domain_name.c_str()),
01718                              -1);
01719           }
01720           if (DCPS_debug_level > 0) {
01721             ACE_DEBUG((LM_DEBUG,
01722                        ACE_TEXT("(%P|%t) [domain/%C]: DomainId == %d\n"),
01723                        domain_name.c_str(), domainId));
01724           }
01725         } else if (name == "DomainRepoKey") {
01726           // We will still process this for backward compatibility, but
01727           // it can now be replaced by "DiscoveryConfig=REPO:<key>"
01728           repoKey = it->second;
01729           if (repoKey == "-1") {
01730             repoKey = Discovery::DEFAULT_REPO;
01731           }
01732 
01733           if (DCPS_debug_level > 0) {
01734             ACE_DEBUG((LM_DEBUG,
01735                        ACE_TEXT("(%P|%t) [domain/%C]: DomainRepoKey == %C\n"),
01736                        domain_name.c_str(), repoKey.c_str()));
01737           }
01738         } else if (name == "DiscoveryConfig") {
01739           repoKey = it->second;
01740 
01741         } else if (name == "DefaultTransportConfig") {
01742           if (it->second == "$file") {
01743             // When the special string of "$file" is used, substitute the file name
01744             perDomainDefaultTportConfig = ACE_TEXT_ALWAYS_CHAR(filename);
01745 
01746           } else {
01747             perDomainDefaultTportConfig = it->second;
01748           }
01749 
01750         } else {
01751           ACE_ERROR_RETURN((LM_ERROR,
01752                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01753                             ACE_TEXT("Unexpected entry (%C) in [domain/%C] section.\n"),
01754                             name.c_str(), domain_name.c_str()),
01755                            -1);
01756         }
01757       }
01758 
01759       if (domainId == -1) {
01760         // DomainId parameter is not set, try using the domain name as an ID
01761         if (!convertToInteger(domain_name, domainId)) {
01762           ACE_ERROR_RETURN((LM_ERROR,
01763                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01764                             ACE_TEXT("Missing DomainId value in [domain/%C] section.\n"),
01765                             domain_name.c_str()),
01766                            -1);
01767         }
01768       }
01769 
01770       if (!perDomainDefaultTportConfig.empty()) {
01771         TransportRegistry* const reg = TransportRegistry::instance();
01772         TransportConfig_rch tc = reg->get_config(perDomainDefaultTportConfig);
01773         if (tc.is_nil()) {
01774           ACE_ERROR_RETURN((LM_ERROR,
01775             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01776             ACE_TEXT("Unknown transport config %C in [domain/%C] section.\n"),
01777             perDomainDefaultTportConfig.c_str(), domain_name.c_str()), -1);
01778         } else {
01779           reg->domain_default_config(domainId, tc);
01780         }
01781       }
01782 
01783       // Check to see if the specified discovery configuration has been defined
01784       if (!repoKey.empty()) {
01785         if ((repoKey != Discovery::DEFAULT_REPO) &&
01786             (repoKey != Discovery::DEFAULT_RTPS) &&
01787             (repoKey != Discovery::DEFAULT_STATIC) &&
01788             (this->discoveryMap_.find(repoKey) == this->discoveryMap_.end())) {
01789           ACE_ERROR_RETURN((LM_ERROR,
01790                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01791                             ACE_TEXT("Specified configuration (%C) not found.  Referenced in [domain/%C] section.\n"),
01792                             repoKey.c_str(), domain_name.c_str()),
01793                            -1);
01794         }
01795         this->set_repo_domain(domainId, repoKey);
01796       }
01797     }
01798   }
01799 
01800   return 0;
01801 }
01802 
01803 int
01804 Service_Participant::load_discovery_configuration(ACE_Configuration_Heap& cf,
01805                                                   const ACE_TCHAR* section_name)
01806 {
01807   const ACE_Configuration_Section_Key &root = cf.root_section();
01808   ACE_Configuration_Section_Key sect;
01809   if (cf.open_section(root, section_name, 0, sect) == 0) {
01810 
01811     const OPENDDS_STRING sect_name = ACE_TEXT_ALWAYS_CHAR(section_name);
01812     DiscoveryTypes::iterator iter =
01813       this->discovery_types_.find(sect_name);
01814 
01815     if (iter == this->discovery_types_.end()) {
01816       // See if we can dynamically load the required libraries
01817       TheTransportRegistry->load_transport_lib(sect_name);
01818       iter = this->discovery_types_.find(sect_name);
01819     }
01820 
01821     if (iter != this->discovery_types_.end()) {
01822       // discovery code is loaded, process options
01823       return iter->second->discovery_config(cf);
01824     } else {
01825       // No discovery code can be loaded, report an error
01826       ACE_ERROR_RETURN((LM_ERROR,
01827                         ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
01828                         ACE_TEXT("load_discovery_configuration ")
01829                         ACE_TEXT("Unable to load libraries for %s\n"),
01830                         section_name),
01831                        -1);
01832     }
01833   }
01834   return 0;
01835 }
01836 
01837 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01838 void
01839 Service_Participant::configure_pool()
01840 {
01841   if (pool_size_) {
01842     SafetyProfilePool::instance()->configure_pool(pool_size_, pool_granularity_);
01843     SafetyProfilePool::instance()->install();
01844   }
01845 }
01846 #endif
01847 
01848 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01849 DataDurabilityCache *
01850 Service_Participant::get_data_durability_cache(
01851   DDS::DurabilityQosPolicy const & durability)
01852 {
01853   DDS::DurabilityQosPolicyKind const kind =
01854     durability.kind;
01855 
01856   DataDurabilityCache * cache = 0;
01857 
01858   if (kind == DDS::TRANSIENT_DURABILITY_QOS) {
01859     {
01860       ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01861                        guard,
01862                        this->factory_lock_,
01863                        0);
01864 
01865       if (!this->transient_data_cache_) {
01866         this->transient_data_cache_.reset(new DataDurabilityCache(kind));
01867       }
01868     }
01869 
01870     cache = this->transient_data_cache_.get();
01871 
01872   } else if (kind == DDS::PERSISTENT_DURABILITY_QOS) {
01873     {
01874       ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01875                        guard,
01876                        this->factory_lock_,
01877                        0);
01878 
01879       try {
01880         if (!this->persistent_data_cache_) {
01881           this->persistent_data_cache_.reset(new DataDurabilityCache(kind,
01882                                                                      this->persistent_data_dir_));
01883         }
01884 
01885       } catch (const std::exception& ex) {
01886         if (DCPS_debug_level > 0) {
01887           ACE_ERROR((LM_WARNING,
01888                      ACE_TEXT("(%P|%t) WARNING: Service_Participant::get_data_durability_cache ")
01889                      ACE_TEXT("failed to create PERSISTENT cache, falling back on ")
01890                      ACE_TEXT("TRANSIENT behavior: %C\n"), ex.what()));
01891         }
01892 
01893         this->persistent_data_cache_.reset(new DataDurabilityCache(DDS::TRANSIENT_DURABILITY_QOS));
01894       }
01895     }
01896 
01897     cache = this->persistent_data_cache_.get();
01898   }
01899 
01900   return cache;
01901 }
01902 #endif
01903 
01904 void
01905 Service_Participant::add_discovery(Discovery_rch discovery)
01906 {
01907   if (discovery) {
01908     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01909     this->discoveryMap_[discovery->key()] = discovery;
01910   }
01911 }
01912 
01913 const Service_Participant::RepoKeyDiscoveryMap&
01914 Service_Participant::discoveryMap() const
01915 {
01916   return this->discoveryMap_;
01917 }
01918 
01919 const Service_Participant::DomainRepoMap&
01920 Service_Participant::domainRepoMap() const
01921 {
01922   return this->domainRepoMap_;
01923 }
01924 
01925 Recorder_ptr
01926 Service_Participant::create_recorder(DDS::DomainParticipant_ptr participant,
01927                                      DDS::Topic_ptr a_topic,
01928                                      const DDS::SubscriberQos& subscriber_qos,
01929                                      const DDS::DataReaderQos& datareader_qos,
01930                                      const RecorderListener_rch& a_listener)
01931 {
01932   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01933   if (participant_servant)
01934     return participant_servant->create_recorder(a_topic, subscriber_qos, datareader_qos, a_listener, 0);
01935   return 0;
01936 }
01937 
01938 DDS::ReturnCode_t
01939 Service_Participant::delete_recorder(Recorder_ptr recorder)
01940 {
01941   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01942   RecorderImpl* impl = dynamic_cast<RecorderImpl*>(recorder);
01943   if (impl){
01944     ret = impl->cleanup();
01945     impl->participant()->delete_recorder(recorder);
01946   }
01947   return ret;
01948 }
01949 
01950 Replayer_ptr
01951 Service_Participant::create_replayer(DDS::DomainParticipant_ptr participant,
01952                                      DDS::Topic_ptr a_topic,
01953                                      const DDS::PublisherQos& publisher_qos,
01954                                      const DDS::DataWriterQos& datawriter_qos,
01955                                      const ReplayerListener_rch& a_listener)
01956 {
01957   ACE_DEBUG((LM_DEBUG, "Service_Participant::create_replayer\n"));
01958   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01959   if (participant_servant)
01960     return participant_servant->create_replayer(a_topic, publisher_qos, datawriter_qos, a_listener, 0);
01961   return 0;
01962 }
01963 
01964 DDS::ReturnCode_t
01965 Service_Participant::delete_replayer(Replayer_ptr replayer)
01966 {
01967   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01968   ReplayerImpl* impl = static_cast<ReplayerImpl*>(replayer);
01969   if (impl) {
01970     ret = impl->cleanup();
01971     impl->participant()->delete_replayer(replayer);
01972   }
01973   return ret;
01974 }
01975 
01976 DDS::Topic_ptr
01977 Service_Participant::create_typeless_topic(DDS::DomainParticipant_ptr participant,
01978                                      const char * topic_name,
01979                                      const char * type_name,
01980                                      bool type_has_keys,
01981                                      const DDS::TopicQos & qos,
01982                                      DDS::TopicListener_ptr a_listener,
01983                                      DDS::StatusMask mask)
01984 {
01985   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01986   if (! participant_servant) {
01987     return 0;
01988   }
01989   return participant_servant->create_typeless_topic(topic_name, type_name, type_has_keys, qos, a_listener, mask);
01990 }
01991 
01992 } // namespace DCPS
01993 } // namespace OpenDDS
01994 
01995 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1