Service_Participant.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "debug.h"
00010 #include "Service_Participant.h"
00011 #include "BuiltInTopicUtils.h"
00012 #include "DataDurabilityCache.h"
00013 #include "GuidConverter.h"
00014 #include "MonitorFactory.h"
00015 #include "ConfigUtils.h"
00016 
00017 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00018 
00019 #include "tao/ORB_Core.h"
00020 
00021 #include "ace/Singleton.h"
00022 #include "ace/Arg_Shifter.h"
00023 #include "ace/Reactor.h"
00024 #include "ace/Select_Reactor.h"
00025 #include "ace/Configuration_Import_Export.h"
00026 #include "ace/Service_Config.h"
00027 #include "ace/Argv_Type_Converter.h"
00028 #include "ace/Auto_Ptr.h"
00029 #include "ace/Sched_Params.h"
00030 #include "ace/Malloc_Allocator.h"
00031 
00032 #include "RecorderImpl.h"
00033 #include "ReplayerImpl.h"
00034 #include "StaticDiscovery.h"
00035 
00036 #ifdef OPENDDS_SAFETY_PROFILE
00037 #include <stdio.h> // <cstdio> after FaceCTS bug 623 is fixed
00038 #else
00039 #include <fstream>
00040 #endif
00041 
00042 #if !defined (__ACE_INLINE__)
00043 #include "Service_Participant.inl"
00044 #endif /* __ACE_INLINE__ */
00045 
00046 namespace {
00047 
00048 void set_log_file_name(const char* fname)
00049 {
00050 #ifdef OPENDDS_SAFETY_PROFILE
00051   ACE_LOG_MSG->msg_ostream(fopen(fname, "a"), true);
00052 #else
00053   std::ofstream* output_stream = new std::ofstream(fname, ios::app);
00054   if (output_stream->bad()) {
00055     delete output_stream;
00056   } else {
00057     ACE_LOG_MSG->msg_ostream(output_stream, true);
00058   }
00059 #endif
00060   ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::LOGGER);
00061   ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
00062 }
00063 
00064 
00065 void set_log_verbose(unsigned long verbose_logging)
00066 {
00067   // Code copied from TAO_ORB_Core::init() in
00068   // TAO version 1.6a_p13.
00069 
00070   typedef void (ACE_Log_Msg::*PTMF)(u_long);
00071   PTMF flagop = &ACE_Log_Msg::set_flags;
00072   u_long value;
00073 
00074   switch (verbose_logging)
00075     {
00076     case 0:
00077       flagop = &ACE_Log_Msg::clr_flags;
00078       value = ACE_Log_Msg::VERBOSE | ACE_Log_Msg::VERBOSE_LITE;
00079       break;
00080     case 1:
00081       value = ACE_Log_Msg::VERBOSE_LITE; break;
00082     default:
00083       value = ACE_Log_Msg::VERBOSE; break;
00084     }
00085 
00086   (ACE_LOG_MSG->*flagop)(value);
00087 
00088 }
00089 
00090 
00091 }
00092 
00093 namespace OpenDDS {
00094 namespace DCPS {
00095 
00096 int Service_Participant::zero_argc = 0;
00097 
00098 const size_t DEFAULT_NUM_CHUNKS = 20;
00099 
00100 const size_t DEFAULT_CHUNK_MULTIPLIER = 10;
00101 
00102 const int DEFAULT_FEDERATION_RECOVERY_DURATION       = 900; // 15 minutes in seconds.
00103 const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS = 1;   // Wait only 1 second.
00104 const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER      = 2;   // Exponential backoff.
00105 const int DEFAULT_FEDERATION_LIVELINESS              = 60;  // 1 minute hearbeat.
00106 
00107 const int BIT_LOOKUP_DURATION_MSEC = 2000;
00108 
00109 static ACE_TString config_fname(ACE_TEXT(""));
00110 
00111 static const ACE_TCHAR DEFAULT_REPO_IOR[] = ACE_TEXT("file://repo.ior");
00112 
00113 static const ACE_CString DEFAULT_PERSISTENT_DATA_DIR = "OpenDDS-durable-data-dir";
00114 
00115 static const ACE_TCHAR COMMON_SECTION_NAME[] = ACE_TEXT("common");
00116 static const ACE_TCHAR DOMAIN_SECTION_NAME[] = ACE_TEXT("domain");
00117 static const ACE_TCHAR REPO_SECTION_NAME[]   = ACE_TEXT("repository");
00118 static const ACE_TCHAR RTPS_SECTION_NAME[]   = ACE_TEXT("rtps_discovery");
00119 
00120 static bool got_debug_level = false;
00121 static bool got_use_rti_serialization = false;
00122 static bool got_info = false;
00123 static bool got_chunks = false;
00124 static bool got_chunk_association_multiplier = false;
00125 static bool got_liveliness_factor = false;
00126 static bool got_bit_transport_port = false;
00127 static bool got_bit_transport_ip = false;
00128 static bool got_bit_lookup_duration_msec = false;
00129 static bool got_global_transport_config = false;
00130 static bool got_bit_flag = false;
00131 static bool got_publisher_content_filter = false;
00132 static bool got_transport_debug_level = false;
00133 static bool got_pending_timeout = false;
00134 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00135 static bool got_persistent_data_dir = false;
00136 #endif
00137 static bool got_default_discovery = false;
00138 #ifndef DDS_DEFAULT_DISCOVERY_METHOD
00139 # ifdef OPENDDS_SAFETY_PROFILE
00140 #  define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_RTPS
00141 # else
00142 #  define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_REPO
00143 # endif
00144 #endif
00145 static bool got_log_fname = false;
00146 static bool got_log_verbose = false;
00147 static bool got_default_address = false;
00148 
00149 Service_Participant::Service_Participant()
00150   :
00151 #ifndef OPENDDS_SAFETY_PROFILE
00152     ORB_argv_(false /*substitute_env_args*/),
00153 #endif
00154     reactor_(0),
00155     dp_factory_servant_(0),
00156     defaultDiscovery_(DDS_DEFAULT_DISCOVERY_METHOD),
00157     n_chunks_(DEFAULT_NUM_CHUNKS),
00158     association_chunk_multiplier_(DEFAULT_CHUNK_MULTIPLIER),
00159     liveliness_factor_(80),
00160     bit_transport_port_(0),
00161     bit_enabled_(
00162 #ifdef DDS_HAS_MINIMUM_BIT
00163       false
00164 #else
00165       true
00166 #endif
00167     ),
00168     bit_lookup_duration_msec_(BIT_LOOKUP_DURATION_MSEC),
00169     global_transport_config_(ACE_TEXT("")),
00170     monitor_factory_(0),
00171     monitor_(0),
00172     federation_recovery_duration_(DEFAULT_FEDERATION_RECOVERY_DURATION),
00173     federation_initial_backoff_seconds_(DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS),
00174     federation_backoff_multiplier_(DEFAULT_FEDERATION_BACKOFF_MULTIPLIER),
00175     federation_liveliness_(DEFAULT_FEDERATION_LIVELINESS),
00176     schedulerQuantum_(ACE_Time_Value::zero),
00177 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00178     pool_size_(1024*1024*16),
00179     pool_granularity_(8),
00180 #endif
00181     scheduler_(-1),
00182     priority_min_(0),
00183     priority_max_(0),
00184     publisher_content_filter_(true),
00185 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00186     transient_data_cache_(),
00187     persistent_data_cache_(),
00188     persistent_data_dir_(DEFAULT_PERSISTENT_DATA_DIR),
00189 #endif
00190     pending_timeout_(ACE_Time_Value::zero),
00191     shut_down_(false)
00192 {
00193   initialize();
00194 }
00195 
00196 Service_Participant::~Service_Participant()
00197 {
00198   shutdown();
00199   ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00200   typedef OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter_t;
00201   for (iter_t it = discovery_types_.begin(); it != discovery_types_.end(); ++it) {
00202     delete it->second;
00203   }
00204   delete monitor_;
00205   delete reactor_;
00206 
00207   if (DCPS_debug_level > 0) {
00208     ACE_DEBUG((LM_DEBUG,
00209                "%T (%P|%t) Service_Participant::~Service_Participant()\n"));
00210   }
00211 }
00212 
00213 Service_Participant*
00214 Service_Participant::instance()
00215 {
00216   // Hide the template instantiation to prevent multiple instances
00217   // from being created.
00218 
00219   return ACE_Singleton<Service_Participant, ACE_SYNCH_MUTEX>::instance();
00220 }
00221 
00222 int
00223 Service_Participant::ReactorTask::svc()
00224 {
00225   Service_Participant* sp = instance();
00226   sp->reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
00227   sp->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00228   this->wait_for_startup();
00229   sp->reactor_->run_reactor_event_loop();
00230   return 0;
00231 }
00232 
00233 ACE_Reactor_Timer_Interface*
00234 Service_Participant::timer() const
00235 {
00236   return reactor_;
00237 }
00238 
00239 ACE_Reactor*
00240 Service_Participant::reactor() const
00241 {
00242   return reactor_;
00243 }
00244 
00245 ACE_thread_t
00246 Service_Participant::reactor_owner() const
00247 {
00248   return reactor_owner_;
00249 }
00250 
00251 void
00252 Service_Participant::shutdown()
00253 {
00254   shut_down_ = true;
00255   try {
00256     TransportRegistry::instance()->release();
00257     ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00258 
00259     domainRepoMap_.clear();
00260     discoveryMap_.clear();
00261 
00262     if (0 != reactor_) {
00263       reactor_->end_reactor_event_loop();
00264       reactor_task_.wait();
00265     }
00266 
00267     dp_factory_ = DDS::DomainParticipantFactory::_nil();
00268 
00269 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00270     transient_data_cache_.reset();
00271     persistent_data_cache_.reset();
00272 #endif
00273 
00274     typedef OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter;
00275     for (iter i = discovery_types_.begin(); i != discovery_types_.end(); ++i) {
00276       delete i->second;
00277     }
00278     discovery_types_.clear();
00279   } catch (const CORBA::Exception& ex) {
00280     ex._tao_print_exception("ERROR: Service_Participant::shutdown");
00281   }
00282 }
00283 
00284 #ifdef ACE_USES_WCHAR
00285 DDS::DomainParticipantFactory_ptr
00286 Service_Participant::get_domain_participant_factory(int &argc,
00287                                                     char *argv[])
00288 {
00289   ACE_Argv_Type_Converter converter(argc, argv);
00290   return get_domain_participant_factory(converter.get_argc(),
00291                                         converter.get_TCHAR_argv());
00292 }
00293 #endif
00294 
00295 DDS::DomainParticipantFactory_ptr
00296 Service_Participant::get_domain_participant_factory(int &argc,
00297                                                     ACE_TCHAR *argv[])
00298 {
00299   if (CORBA::is_nil(dp_factory_.in())) {
00300     ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
00301                      guard,
00302                      this->factory_lock_,
00303                      DDS::DomainParticipantFactory::_nil());
00304 
00305     if (CORBA::is_nil(dp_factory_.in())) {
00306       // This used to be a call to ORB_init().  Since the ORB is now managed
00307       // by InfoRepoDiscovery, just save the -ORB* args for later use.
00308       // The exceptions are -ORBLogFile and -ORBVerboseLogging, which
00309       // are processed by the service participant. This allows log control
00310       // even if an ORB is not being used.
00311 #ifndef OPENDDS_SAFETY_PROFILE
00312       ORB_argv_.add(ACE_TEXT("unused_arg_0"));
00313 #endif
00314       ACE_Arg_Shifter shifter(argc, argv);
00315       while (shifter.is_anything_left()) {
00316         if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBLogFile")) == 0) {
00317           shifter.ignore_arg();
00318         } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBVerboseLogging")) == 0) {
00319           shifter.ignore_arg();
00320         } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) < 0) {
00321           shifter.ignore_arg();
00322         } else {
00323 #ifndef OPENDDS_SAFETY_PROFILE
00324           ORB_argv_.add(shifter.get_current());
00325 #endif
00326           shifter.consume_arg();
00327           if (shifter.is_parameter_next()) {
00328 #ifndef OPENDDS_SAFETY_PROFILE
00329             ORB_argv_.add(shifter.get_current(), true /*quote_arg*/);
00330 #endif
00331             shifter.consume_arg();
00332           }
00333         }
00334       }
00335 
00336       if (parse_args(argc, argv) != 0) {
00337         return DDS::DomainParticipantFactory::_nil();
00338       }
00339 
00340       if (config_fname == ACE_TEXT("")) {
00341         if (DCPS_debug_level) {
00342           ACE_DEBUG((LM_NOTICE,
00343                      ACE_TEXT("(%P|%t) NOTICE: not using file configuration - no configuration ")
00344                      ACE_TEXT("file specified.\n")));
00345         }
00346 
00347       } else {
00348         // Load configuration only if the configuration
00349         // file exists.
00350         FILE* in = ACE_OS::fopen(config_fname.c_str(),
00351                                  ACE_TEXT("r"));
00352 
00353         if (!in) {
00354           ACE_DEBUG((LM_WARNING,
00355                      ACE_TEXT("(%P|%t) WARNING: not using file configuration - ")
00356                      ACE_TEXT("can not open \"%s\" for reading. %p\n"),
00357                      config_fname.c_str(), ACE_TEXT("fopen")));
00358 
00359         } else {
00360           ACE_OS::fclose(in);
00361 
00362           if (this->load_configuration() != 0) {
00363             ACE_ERROR((LM_ERROR,
00364                        ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: ")
00365                        ACE_TEXT("load_configuration() failed.\n")));
00366             return DDS::DomainParticipantFactory::_nil();
00367           }
00368         }
00369       }
00370 
00371 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00372       // For non-FACE tests, configure pool
00373       configure_pool();
00374 #endif
00375 
00376       // Establish the default scheduling mechanism and
00377       // priority here.  Sadly, the ORB is already
00378       // initialized so we have no influence over its
00379       // scheduling or thread priority(ies).
00380 
00381       /// @TODO: Move ORB intitialization to after the
00382       ///        configuration file is processed and the
00383       ///        initial scheduling policy and priority are
00384       ///        established.
00385       this->initializeScheduling();
00386 
00387       ACE_NEW_RETURN(dp_factory_servant_,
00388                      DomainParticipantFactoryImpl(),
00389                      DDS::DomainParticipantFactory::_nil());
00390 
00391       dp_factory_ = dp_factory_servant_;
00392 
00393       if (CORBA::is_nil(dp_factory_.in())) {
00394         ACE_ERROR((LM_ERROR,
00395                    ACE_TEXT("(%P|%t) ERROR: ")
00396                    ACE_TEXT("Service_Participant::get_domain_participant_factory, ")
00397                    ACE_TEXT("nil DomainParticipantFactory. \n")));
00398         return DDS::DomainParticipantFactory::_nil();
00399       }
00400 
00401       if (!reactor_)
00402         reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
00403 
00404       if (reactor_task_.activate(THR_NEW_LWP | THR_JOINABLE) == -1) {
00405         ACE_ERROR((LM_ERROR,
00406                    ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00407                    ACE_TEXT("Failed to activate the reactor task.")));
00408         return DDS::DomainParticipantFactory::_nil();
00409       }
00410 
00411       reactor_task_.wait_for_startup();
00412 
00413       this->monitor_factory_ =
00414         ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor");
00415       if (this->monitor_factory_ == 0) {
00416         // Use the stubbed factory
00417         this->monitor_factory_ =
00418           ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor_Default");
00419       }
00420       this->monitor_ = this->monitor_factory_->create_sp_monitor(this);
00421     }
00422   }
00423 
00424   return DDS::DomainParticipantFactory::_duplicate(dp_factory_.in());
00425 }
00426 
00427 int
00428 Service_Participant::parse_args(int &argc, ACE_TCHAR *argv[])
00429 {
00430   ACE_Arg_Shifter arg_shifter(argc, argv);
00431 
00432   while (arg_shifter.is_anything_left()) {
00433     const ACE_TCHAR *currentArg = 0;
00434 
00435     if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDebugLevel"))) != 0) {
00436       set_DCPS_debug_level(ACE_OS::atoi(currentArg));
00437       arg_shifter.consume_arg();
00438       got_debug_level = true;
00439 
00440     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0) {
00441       this->set_repo_ior(currentArg, Discovery::DEFAULT_REPO);
00442       arg_shifter.consume_arg();
00443       got_info = true;
00444 
00445     } else if (!arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-DCPSRTISerialization"))) {
00446       Serializer::set_use_rti_serialization(true);
00447       arg_shifter.consume_arg();
00448       got_use_rti_serialization = true;
00449 
00450     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunks"))) != 0) {
00451       n_chunks_ = ACE_OS::atoi(currentArg);
00452       arg_shifter.consume_arg();
00453       got_chunks = true;
00454 
00455     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunkAssociationMultiplier"))) != 0) {
00456       association_chunk_multiplier_ = ACE_OS::atoi(currentArg);
00457       arg_shifter.consume_arg();
00458       got_chunk_association_multiplier = true;
00459 
00460     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSConfigFile"))) != 0) {
00461       config_fname = currentArg;
00462       arg_shifter.consume_arg();
00463 
00464     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLivelinessFactor"))) != 0) {
00465       liveliness_factor_ = ACE_OS::atoi(currentArg);
00466       arg_shifter.consume_arg();
00467       got_liveliness_factor = true;
00468 
00469     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportPort"))) != 0) {
00470       /// No need to guard this insertion as we are still single
00471       /// threaded here.
00472       this->bit_transport_port_ = ACE_OS::atoi(currentArg);
00473       arg_shifter.consume_arg();
00474       got_bit_transport_port = true;
00475 
00476     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportIPAddress"))) != 0) {
00477       /// No need to guard this insertion as we are still single
00478       /// threaded here.
00479       this->bit_transport_ip_ = currentArg;
00480       arg_shifter.consume_arg();
00481       got_bit_transport_ip = true;
00482 
00483     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitLookupDurationMsec"))) != 0) {
00484       bit_lookup_duration_msec_ = ACE_OS::atoi(currentArg);
00485       arg_shifter.consume_arg();
00486       got_bit_lookup_duration_msec = true;
00487 
00488     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSGlobalTransportConfig"))) != 0) {
00489       global_transport_config_ = currentArg;
00490       arg_shifter.consume_arg();
00491       got_global_transport_config = true;
00492 
00493     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBit"))) != 0) {
00494       bit_enabled_ = ACE_OS::atoi(currentArg);
00495       arg_shifter.consume_arg();
00496       got_bit_flag = true;
00497 
00498     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTransportDebugLevel"))) != 0) {
00499       OpenDDS::DCPS::Transport_debug_level = ACE_OS::atoi(currentArg);
00500       arg_shifter.consume_arg();
00501       got_transport_debug_level = true;
00502 
00503 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00504     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPersistentDataDir"))) != 0) {
00505       this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00506       arg_shifter.consume_arg();
00507       got_persistent_data_dir = true;
00508 #endif
00509 
00510     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPendingTimeout"))) != 0) {
00511       this->pending_timeout_ = ACE_OS::atoi(currentArg);
00512       arg_shifter.consume_arg();
00513       got_pending_timeout = true;
00514 
00515     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPublisherContentFilter"))) != 0) {
00516       this->publisher_content_filter_ = ACE_OS::atoi(currentArg);
00517       arg_shifter.consume_arg();
00518       got_publisher_content_filter = true;
00519 
00520     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultDiscovery"))) != 0) {
00521       this->defaultDiscovery_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00522       arg_shifter.consume_arg();
00523       got_default_discovery = true;
00524 
00525     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationRecoveryDuration"))) != 0) {
00526       this->federation_recovery_duration_ = ACE_OS::atoi(currentArg);
00527       arg_shifter.consume_arg();
00528 
00529     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationInitialBackoffSeconds"))) != 0) {
00530       this->federation_initial_backoff_seconds_ = ACE_OS::atoi(currentArg);
00531       arg_shifter.consume_arg();
00532 
00533     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationBackoffMultiplier"))) != 0) {
00534       this->federation_backoff_multiplier_ = ACE_OS::atoi(currentArg);
00535       arg_shifter.consume_arg();
00536 
00537     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationLivelinessDuration"))) != 0) {
00538       this->federation_liveliness_ = ACE_OS::atoi(currentArg);
00539       arg_shifter.consume_arg();
00540 
00541     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBLogFile"))) != 0) {
00542       set_log_file_name(ACE_TEXT_ALWAYS_CHAR(currentArg));
00543       arg_shifter.consume_arg();
00544       got_log_fname = true;
00545 
00546     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBVerboseLogging"))) != 0) {
00547       set_log_verbose(ACE_OS::atoi(currentArg));
00548       arg_shifter.consume_arg();
00549       got_log_verbose = true;
00550 
00551     } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultAddress"))) != 0) {
00552       this->default_address_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00553       arg_shifter.consume_arg();
00554       got_default_address = true;
00555 
00556     } else {
00557       arg_shifter.ignore_arg();
00558     }
00559   }
00560 
00561   // Indicates sucessful parsing of the command line
00562   return 0;
00563 }
00564 
00565 void
00566 Service_Participant::initialize()
00567 {
00568   //NOTE: in the future these initial values may be configurable
00569   //      (to override the Specification's default values
00570   //       hmm - I guess that would be OK since the user
00571   //       is overriding them.)
00572   initial_TransportPriorityQosPolicy_.value = 0;
00573   initial_LifespanQosPolicy_.duration.sec = DDS::DURATION_INFINITE_SEC;
00574   initial_LifespanQosPolicy_.duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00575 
00576   initial_DurabilityQosPolicy_.kind = DDS::VOLATILE_DURABILITY_QOS;
00577 
00578   initial_DurabilityServiceQosPolicy_.service_cleanup_delay.sec =
00579     DDS::DURATION_ZERO_SEC;
00580   initial_DurabilityServiceQosPolicy_.service_cleanup_delay.nanosec =
00581     DDS::DURATION_ZERO_NSEC;
00582   initial_DurabilityServiceQosPolicy_.history_kind =
00583     DDS::KEEP_LAST_HISTORY_QOS;
00584   initial_DurabilityServiceQosPolicy_.history_depth = 1;
00585   initial_DurabilityServiceQosPolicy_.max_samples =
00586     DDS::LENGTH_UNLIMITED;
00587   initial_DurabilityServiceQosPolicy_.max_instances =
00588     DDS::LENGTH_UNLIMITED;
00589   initial_DurabilityServiceQosPolicy_.max_samples_per_instance =
00590     DDS::LENGTH_UNLIMITED;
00591 
00592   initial_PresentationQosPolicy_.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
00593   initial_PresentationQosPolicy_.coherent_access = false;
00594   initial_PresentationQosPolicy_.ordered_access = false;
00595 
00596   initial_DeadlineQosPolicy_.period.sec = DDS::DURATION_INFINITE_SEC;
00597   initial_DeadlineQosPolicy_.period.nanosec = DDS::DURATION_INFINITE_NSEC;
00598 
00599   initial_LatencyBudgetQosPolicy_.duration.sec = DDS::DURATION_ZERO_SEC;
00600   initial_LatencyBudgetQosPolicy_.duration.nanosec = DDS::DURATION_ZERO_NSEC;
00601 
00602   initial_OwnershipQosPolicy_.kind = DDS::SHARED_OWNERSHIP_QOS;
00603 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00604   initial_OwnershipStrengthQosPolicy_.value = 0;
00605 #endif
00606 
00607   initial_LivelinessQosPolicy_.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00608   initial_LivelinessQosPolicy_.lease_duration.sec = DDS::DURATION_INFINITE_SEC;
00609   initial_LivelinessQosPolicy_.lease_duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00610 
00611   initial_TimeBasedFilterQosPolicy_.minimum_separation.sec = DDS::DURATION_ZERO_SEC;
00612   initial_TimeBasedFilterQosPolicy_.minimum_separation.nanosec = DDS::DURATION_ZERO_NSEC;
00613 
00614   initial_ReliabilityQosPolicy_.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00615   initial_ReliabilityQosPolicy_.max_blocking_time.sec = DDS::DURATION_INFINITE_SEC;
00616   initial_ReliabilityQosPolicy_.max_blocking_time.nanosec = DDS::DURATION_INFINITE_NSEC;
00617 
00618   initial_DestinationOrderQosPolicy_.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00619 
00620   initial_HistoryQosPolicy_.kind = DDS::KEEP_LAST_HISTORY_QOS;
00621   initial_HistoryQosPolicy_.depth = 1;
00622 
00623   initial_ResourceLimitsQosPolicy_.max_samples = DDS::LENGTH_UNLIMITED;
00624   initial_ResourceLimitsQosPolicy_.max_instances = DDS::LENGTH_UNLIMITED;
00625   initial_ResourceLimitsQosPolicy_.max_samples_per_instance = DDS::LENGTH_UNLIMITED;
00626 
00627   initial_EntityFactoryQosPolicy_.autoenable_created_entities = true;
00628 
00629   initial_WriterDataLifecycleQosPolicy_.autodispose_unregistered_instances = true;
00630 
00631   initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00632   initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00633   initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00634   initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00635 
00636   initial_DomainParticipantQos_.user_data = initial_UserDataQosPolicy_;
00637   initial_DomainParticipantQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00638   initial_DomainParticipantFactoryQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00639 
00640   initial_TopicQos_.topic_data = initial_TopicDataQosPolicy_;
00641   initial_TopicQos_.durability = initial_DurabilityQosPolicy_;
00642   initial_TopicQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00643   initial_TopicQos_.deadline = initial_DeadlineQosPolicy_;
00644   initial_TopicQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00645   initial_TopicQos_.liveliness = initial_LivelinessQosPolicy_;
00646   initial_TopicQos_.reliability = initial_ReliabilityQosPolicy_;
00647   initial_TopicQos_.destination_order = initial_DestinationOrderQosPolicy_;
00648   initial_TopicQos_.history = initial_HistoryQosPolicy_;
00649   initial_TopicQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00650   initial_TopicQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00651   initial_TopicQos_.lifespan = initial_LifespanQosPolicy_;
00652   initial_TopicQos_.ownership = initial_OwnershipQosPolicy_;
00653 
00654   initial_DataWriterQos_.durability = initial_DurabilityQosPolicy_;
00655   initial_DataWriterQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00656   initial_DataWriterQos_.deadline = initial_DeadlineQosPolicy_;
00657   initial_DataWriterQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00658   initial_DataWriterQos_.liveliness = initial_LivelinessQosPolicy_;
00659   initial_DataWriterQos_.reliability = initial_ReliabilityQosPolicy_;
00660   initial_DataWriterQos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00661   initial_DataWriterQos_.reliability.max_blocking_time.sec = 0;
00662   initial_DataWriterQos_.reliability.max_blocking_time.nanosec = 100000000;
00663   initial_DataWriterQos_.destination_order = initial_DestinationOrderQosPolicy_;
00664   initial_DataWriterQos_.history = initial_HistoryQosPolicy_;
00665   initial_DataWriterQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00666   initial_DataWriterQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00667   initial_DataWriterQos_.lifespan = initial_LifespanQosPolicy_;
00668   initial_DataWriterQos_.user_data = initial_UserDataQosPolicy_;
00669   initial_DataWriterQos_.ownership = initial_OwnershipQosPolicy_;
00670 #ifdef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00671   initial_DataWriterQos_.ownership_strength.value = 0;
00672 #else
00673   initial_DataWriterQos_.ownership_strength = initial_OwnershipStrengthQosPolicy_;
00674 #endif
00675   initial_DataWriterQos_.writer_data_lifecycle = initial_WriterDataLifecycleQosPolicy_;
00676 
00677   initial_PublisherQos_.presentation = initial_PresentationQosPolicy_;
00678   initial_PublisherQos_.partition = initial_PartitionQosPolicy_;
00679   initial_PublisherQos_.group_data = initial_GroupDataQosPolicy_;
00680   initial_PublisherQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00681 
00682   initial_DataReaderQos_.durability = initial_DurabilityQosPolicy_;
00683   initial_DataReaderQos_.deadline = initial_DeadlineQosPolicy_;
00684   initial_DataReaderQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00685   initial_DataReaderQos_.liveliness = initial_LivelinessQosPolicy_;
00686   initial_DataReaderQos_.reliability = initial_ReliabilityQosPolicy_;
00687   initial_DataReaderQos_.destination_order = initial_DestinationOrderQosPolicy_;
00688   initial_DataReaderQos_.history = initial_HistoryQosPolicy_;
00689   initial_DataReaderQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00690   initial_DataReaderQos_.user_data = initial_UserDataQosPolicy_;
00691   initial_DataReaderQos_.time_based_filter = initial_TimeBasedFilterQosPolicy_;
00692   initial_DataReaderQos_.ownership = initial_OwnershipQosPolicy_;
00693   initial_DataReaderQos_.reader_data_lifecycle = initial_ReaderDataLifecycleQosPolicy_;
00694 
00695   initial_SubscriberQos_.presentation = initial_PresentationQosPolicy_;
00696   initial_SubscriberQos_.partition = initial_PartitionQosPolicy_;
00697   initial_SubscriberQos_.group_data = initial_GroupDataQosPolicy_;
00698   initial_SubscriberQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00699 }
00700 
00701 void
00702 Service_Participant::initializeScheduling()
00703 {
00704   //
00705   // Establish the scheduler if specified.
00706   //
00707   if (this->schedulerString_.length() == 0) {
00708     if (DCPS_debug_level > 0) {
00709       ACE_DEBUG((LM_NOTICE,
00710                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::intializeScheduling() - ")
00711                  ACE_TEXT("no scheduling policy specified, not setting policy.\n")));
00712     }
00713 
00714   } else {
00715     //
00716     // Translate the scheduling policy to a usable value.
00717     //
00718     int ace_scheduler = ACE_SCHED_OTHER;
00719     this->scheduler_  = THR_SCHED_DEFAULT;
00720 
00721     if (this->schedulerString_ == ACE_TEXT("SCHED_RR")) {
00722       this->scheduler_ = THR_SCHED_RR;
00723       ace_scheduler    = ACE_SCHED_RR;
00724 
00725     } else if (this->schedulerString_ == ACE_TEXT("SCHED_FIFO")) {
00726       this->scheduler_ = THR_SCHED_FIFO;
00727       ace_scheduler    = ACE_SCHED_FIFO;
00728 
00729     } else if (this->schedulerString_ == ACE_TEXT("SCHED_OTHER")) {
00730       this->scheduler_ = THR_SCHED_DEFAULT;
00731       ace_scheduler    = ACE_SCHED_OTHER;
00732 
00733     } else {
00734       ACE_DEBUG((LM_WARNING,
00735                  ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00736                  ACE_TEXT("unrecognized scheduling policy: %s, set to SCHED_OTHER.\n"),
00737                  this->schedulerString_.c_str()));
00738     }
00739 
00740     //
00741     // Attempt to set the scheduling policy.
00742     //
00743 #ifdef ACE_WIN32
00744     ACE_DEBUG((LM_NOTICE,
00745                ACE_TEXT("(%P|%t) NOTICE: Service_Participant::initializeScheduling() - ")
00746                ACE_TEXT("scheduling is not implemented on Win32.\n")));
00747     ACE_UNUSED_ARG(ace_scheduler);
00748 #else
00749     ACE_Sched_Params params(
00750       ace_scheduler,
00751       ACE_Sched_Params::priority_min(ace_scheduler),
00752       ACE_SCOPE_THREAD,
00753       this->schedulerQuantum_);
00754 
00755     if (ACE_OS::sched_params(params) != 0) {
00756       if (ACE_OS::last_error() == EPERM) {
00757         ACE_DEBUG((LM_WARNING,
00758                    ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00759                    ACE_TEXT("user is not superuser, requested scheduler not set.\n")));
00760 
00761       } else {
00762         ACE_ERROR((LM_ERROR,
00763                    ACE_TEXT("(%P|%t) ERROR: Service_Participant::initializeScheduling() - ")
00764                    ACE_TEXT("sched_params failed: %m.\n")));
00765       }
00766 
00767       // Reset the scheduler value(s) if we did not succeed.
00768       this->scheduler_ = -1;
00769       ace_scheduler    = ACE_SCHED_OTHER;
00770 
00771     } else if (DCPS_debug_level > 0) {
00772       ACE_DEBUG((LM_DEBUG,
00773                  ACE_TEXT("(%P|%t) Service_Participant::initializeScheduling() - ")
00774                  ACE_TEXT("scheduling policy set to %s(%d).\n"),
00775                  this->schedulerString_.c_str()));
00776     }
00777 
00778     //
00779     // Setup some scheduler specific information for later use.
00780     //
00781     this->priority_min_ = ACE_Sched_Params::priority_min(ace_scheduler, ACE_SCOPE_THREAD);
00782     this->priority_max_ = ACE_Sched_Params::priority_max(ace_scheduler, ACE_SCOPE_THREAD);
00783 #endif // ACE_WIN32
00784   }
00785 }
00786 
00787 #ifdef DDS_HAS_WCHAR
00788 bool
00789 Service_Participant::set_repo_ior(const wchar_t* ior,
00790                                   Discovery::RepoKey key,
00791                                   bool attach_participant)
00792 {
00793   return set_repo_ior(ACE_Wide_To_Ascii(ior).char_rep(), key, attach_participant);
00794 }
00795 #endif
00796 
00797 bool
00798 Service_Participant::set_repo_ior(const char* ior,
00799                                   Discovery::RepoKey key,
00800                                   bool attach_participant)
00801 {
00802   if (DCPS_debug_level > 0) {
00803     ACE_DEBUG((LM_DEBUG,
00804                ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior: Repo[ %C] == %C\n"),
00805                key.c_str(), ior));
00806   }
00807 
00808   // This is a global used for the bizzare commandline/configfile
00809   // processing done for this class.
00810   got_info = true;
00811 
00812   if (key == "-1") {
00813     key = Discovery::DEFAULT_REPO;
00814   }
00815 
00816   const OPENDDS_STRING repo_type = ACE_TEXT_ALWAYS_CHAR(REPO_SECTION_NAME);
00817   if (!discovery_types_.count(repo_type)) {
00818     // Re-use a transport registry function to attempt a dynamic load of the
00819     // library that implements the 'repo_type' (InfoRepoDiscovery)
00820     TheTransportRegistry->load_transport_lib(repo_type);
00821   }
00822 
00823   if (discovery_types_.count(repo_type)) {
00824     ACE_Configuration_Heap cf;
00825     cf.open();
00826     ACE_Configuration_Section_Key sect_key;
00827     ACE_TString section = REPO_SECTION_NAME;
00828     section += ACE_TEXT('\\');
00829     section += ACE_TEXT_CHAR_TO_TCHAR(key.c_str());
00830     cf.open_section(cf.root_section(), section.c_str(), 1 /*create*/, sect_key);
00831     cf.set_string_value(sect_key, ACE_TEXT("RepositoryIor"),
00832                         ACE_TEXT_CHAR_TO_TCHAR(ior));
00833 
00834     discovery_types_[repo_type]->discovery_config(cf);
00835 
00836     this->remap_domains(key, key, attach_participant);
00837     return true;
00838   }
00839 
00840   ACE_ERROR_RETURN((LM_ERROR,
00841                     ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior ")
00842                     ACE_TEXT("ERROR - no discovery type registered for ")
00843                     ACE_TEXT("InfoRepoDiscovery\n")),
00844                    false);
00845 }
00846 
00847 void
00848 Service_Participant::remap_domains(Discovery::RepoKey oldKey,
00849                                    Discovery::RepoKey newKey,
00850                                    bool attach_participant)
00851 {
00852   // Search the mappings for any domains mapped to this repository.
00853   OPENDDS_VECTOR(DDS::DomainId_t) domainList;
00854   {
00855     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00856 
00857     for (DomainRepoMap::const_iterator current = this->domainRepoMap_.begin();
00858          current != this->domainRepoMap_.end();
00859          ++current) {
00860       if (current->second == oldKey) {
00861         domainList.push_back(current->first);
00862       }
00863     }
00864   }
00865 
00866   // Remap the domains that were attached to this repository.
00867   for (unsigned int index = 0; index < domainList.size(); ++index) {
00868     // For mapped domains, attach their participants by setting the
00869     // mapping again.
00870     this->set_repo_domain(domainList[ index], newKey, attach_participant);
00871   }
00872 }
00873 
00874 void
00875 Service_Participant::set_repo_domain(const DDS::DomainId_t domain,
00876                                      Discovery::RepoKey key,
00877                                      bool attach_participant)
00878 {
00879   typedef std::pair<Discovery_rch, RepoId> DiscRepoPair;
00880   OPENDDS_VECTOR(DiscRepoPair) repoList;
00881   {
00882     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00883     DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
00884 
00885     if (key == "-1") {
00886       key = Discovery::DEFAULT_REPO;
00887     }
00888 
00889     if ((where == this->domainRepoMap_.end()) || (where->second != key)) {
00890       // Only assign entries into the map when they change the
00891       // contents.
00892       this->domainRepoMap_[ domain] = key;
00893 
00894       if (DCPS_debug_level > 0) {
00895         ACE_DEBUG((LM_DEBUG,
00896                    ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00897                    ACE_TEXT("Domain[ %d] = Repo[ %C].\n"),
00898                    domain, key.c_str()));
00899       }
00900     }
00901 
00902     //
00903     // Make sure that we mark each DomainParticipant for this domain
00904     // using this repository as attached to this repository.
00905     //
00906     // @TODO: Move this note into user documentation.
00907     // N.B. Calling set_repo() or set_repo_ior() will result in this
00908     //      code executing again with the new repository.  It is best
00909     //      to call those routines first when making changes.
00910     //
00911 
00912     // No servant means no participant.  No worries.
00913     if (0 != this->dp_factory_servant_) {
00914       // Map of domains to sets of participants.
00915       const DomainParticipantFactoryImpl::DPMap& participants
00916       = this->dp_factory_servant_->participants();
00917 
00918       // Extract the set of participants for the current domain.
00919       DomainParticipantFactoryImpl::DPMap::const_iterator
00920       which  = participants.find(domain);
00921 
00922       if (which != participants.end()) {
00923         // Extract the repository to attach this domain to.
00924         RepoKeyDiscoveryMap::const_iterator disc_iter = this->discoveryMap_.find(key);
00925 
00926         if (disc_iter != this->discoveryMap_.end()) {
00927           for (DomainParticipantFactoryImpl::DPSet::const_iterator
00928                current  = which->second.begin();
00929                current != which->second.end();
00930                ++current) {
00931             try {
00932               // Attach each DomainParticipant in this domain to this
00933               // repository.
00934               RepoId id = current->svt_->get_id();
00935               repoList.push_back(std::make_pair(disc_iter->second, id));
00936 
00937               if (DCPS_debug_level > 0) {
00938                 GuidConverter converter(id);
00939                 ACE_DEBUG((LM_DEBUG,
00940                            ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00941                            ACE_TEXT("participant %C attached to Repo[ %C].\n"),
00942                            OPENDDS_STRING(converter).c_str(),
00943                            key.c_str()));
00944               }
00945 
00946             } catch (const CORBA::Exception& ex) {
00947               ex._tao_print_exception(
00948                 "ERROR: Service_Participant::set_repo_domain: failed to attach repository - ");
00949               return;
00950             }
00951           }
00952         }
00953       }
00954     }
00955   } // End of GUARD scope.
00956 
00957   // Make all of the remote calls after releasing the lock.
00958   for (unsigned int index = 0; index < repoList.size(); ++index) {
00959     if (DCPS_debug_level > 0) {
00960       GuidConverter converter(repoList[ index].second);
00961       ACE_DEBUG((LM_DEBUG,
00962                  ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00963                  ACE_TEXT("(%d of %d) attaching domain %d participant %C to Repo[ %C].\n"),
00964                  (1+index), repoList.size(), domain,
00965                  OPENDDS_STRING(converter).c_str(),
00966                  key.c_str()));
00967     }
00968 
00969     if (attach_participant)
00970     {
00971       repoList[ index].first->attach_participant(domain, repoList[ index].second);
00972     }
00973   }
00974 }
00975 
00976 void
00977 Service_Participant::repository_lost(Discovery::RepoKey key)
00978 {
00979   // Find the lost repository.
00980   RepoKeyDiscoveryMap::iterator initialLocation = this->discoveryMap_.find(key);
00981   RepoKeyDiscoveryMap::iterator current         = initialLocation;
00982 
00983   if (current == this->discoveryMap_.end()) {
00984     ACE_DEBUG((LM_WARNING,
00985                ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
00986                ACE_TEXT("lost repository %C was not present, ")
00987                ACE_TEXT("finding another anyway.\n"),
00988                key.c_str()));
00989 
00990   } else {
00991     // Start with the repository *after* the lost one.
00992     ++current;
00993   }
00994 
00995   // Calculate the bounding end time for attempts.
00996   ACE_Time_Value recoveryFailedTime
00997   = ACE_OS::gettimeofday()
00998     + ACE_Time_Value(this->federation_recovery_duration(), 0);
00999 
01000   // Backoff delay.
01001   int backoff = this->federation_initial_backoff_seconds();
01002 
01003   // Keep trying until the total recovery time specified is exceeded.
01004   while (recoveryFailedTime > ACE_OS::gettimeofday()) {
01005 
01006     // Wrap to the beginning at the end of the list.
01007     if (current == this->discoveryMap_.end()) {
01008       // Continue to traverse the list.
01009       current = this->discoveryMap_.begin();
01010     }
01011 
01012     // Handle reaching the lost repository by waiting before trying
01013     // again.
01014     if (current == initialLocation) {
01015       if (DCPS_debug_level > 0) {
01016         ACE_DEBUG((LM_DEBUG,
01017                    ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01018                    ACE_TEXT("waiting %d seconds to traverse the ")
01019                    ACE_TEXT("repository list another time ")
01020                    ACE_TEXT("for lost key %C.\n"),
01021                    backoff,
01022                    key.c_str()));
01023       }
01024 
01025       // Wait to traverse the list and try again.
01026       ACE_OS::sleep(backoff);
01027 
01028       // Exponentially backoff delay.
01029       backoff *= this->federation_backoff_multiplier();
01030 
01031       // Don't increment current to allow us to reattach to the
01032       // original repository if it is restarted.
01033     }
01034 
01035     // Check the availability of the current repository.
01036     if (current->second->active()) {
01037 
01038       if (DCPS_debug_level > 0) {
01039         ACE_DEBUG((LM_DEBUG,
01040                    ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01041                    ACE_TEXT("replacing repository %C with %C.\n"),
01042                    key.c_str(),
01043                    current->first.c_str()));
01044       }
01045 
01046       // If we reach here, the validate_connection() call succeeded
01047       // and the repository is reachable.
01048       this->remap_domains(key, current->first);
01049 
01050       // Now we are done.  This is the only non-failure exit from
01051       // this method.
01052       return;
01053 
01054     } else {
01055       ACE_DEBUG((LM_WARNING,
01056                  ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01057                  ACE_TEXT("repository %C was not available to replace %C, ")
01058                  ACE_TEXT("looking for another.\n"),
01059                  current->first.c_str(),
01060                  key.c_str()));
01061     }
01062 
01063     // Move to the next candidate repository.
01064     ++current;
01065   }
01066 
01067   // If we reach here, we have exceeded the total recovery time
01068   // specified.
01069   ACE_ASSERT(recoveryFailedTime == ACE_Time_Value::zero);
01070 }
01071 
01072 void
01073 Service_Participant::set_default_discovery(const Discovery::RepoKey& key)
01074 {
01075   this->defaultDiscovery_ = key;
01076 }
01077 
01078 Discovery::RepoKey
01079 Service_Participant::get_default_discovery()
01080 {
01081   return this->defaultDiscovery_;
01082 }
01083 
01084 Discovery_rch
01085 Service_Participant::get_discovery(const DDS::DomainId_t domain)
01086 {
01087   // Default to the Default InfoRepo-based discovery unless the user has
01088   // changed defaultDiscovery_ using the API or config file
01089   Discovery::RepoKey repo = defaultDiscovery_;
01090 
01091   // Find if this domain has a repo key (really a discovery key)
01092   // mapped to it.
01093   DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
01094   if (where != this->domainRepoMap_.end()) {
01095     repo = where->second;
01096   }
01097 
01098   RepoKeyDiscoveryMap::const_iterator location = this->discoveryMap_.find(repo);
01099 
01100   if (location == this->discoveryMap_.end()) {
01101     if ((repo == Discovery::DEFAULT_REPO) ||
01102         (repo == "-1")) {
01103       // Set the default repository IOR if it hasn't already happened
01104       // by this point.  This is why this can't be const.
01105       bool ok = this->set_repo_ior(DEFAULT_REPO_IOR, Discovery::DEFAULT_REPO);
01106 
01107       if (!ok) {
01108         if (DCPS_debug_level > 0) {
01109           ACE_DEBUG((LM_DEBUG,
01110                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01111                      ACE_TEXT("failed attempt to set default IOR for domain %d.\n"),
01112                      domain));
01113         }
01114 
01115       } else {
01116         // Found the default!
01117         if (DCPS_debug_level > 4) {
01118           ACE_DEBUG((LM_DEBUG,
01119                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01120                      ACE_TEXT("returning default repository for domain %d.\n"),
01121                      domain));
01122         }
01123 
01124       }
01125       return this->discoveryMap_[Discovery::DEFAULT_REPO];
01126 
01127     } else if (repo == Discovery::DEFAULT_RTPS) {
01128 
01129       ACE_Configuration_Heap cf;
01130       cf.open();
01131       ACE_Configuration_Section_Key k;
01132       cf.open_section(cf.root_section(), RTPS_SECTION_NAME, 1 /*create*/, k);
01133       this->load_discovery_configuration(cf, RTPS_SECTION_NAME);
01134 
01135       // Try to find it again
01136       location = this->discoveryMap_.find(Discovery::DEFAULT_RTPS);
01137 
01138       if (location == this->discoveryMap_.end()) {
01139         // Unable to load DEFAULT_RTPS
01140         if (DCPS_debug_level > 0) {
01141           ACE_DEBUG((LM_DEBUG,
01142                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01143                      ACE_TEXT("failed attempt to set default RTPS discovery for domain %d.\n"),
01144                      domain));
01145         }
01146 
01147         return 0;
01148 
01149       } else {
01150         // Found the default!
01151         if (DCPS_debug_level > 4) {
01152           ACE_DEBUG((LM_DEBUG,
01153                      ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01154                      ACE_TEXT("returning default RTPS discovery for domain %d.\n"),
01155                      domain));
01156         }
01157 
01158         return location->second;
01159       }
01160 
01161     } else {
01162       // Non-default repositories _must_ be loaded by application.
01163       if (DCPS_debug_level > 4) {
01164         ACE_DEBUG((LM_DEBUG,
01165                    ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01166                    ACE_TEXT("repository for domain %d was not set.\n"),
01167                    domain));
01168       }
01169 
01170       return 0;
01171     }
01172   }
01173 
01174   if (DCPS_debug_level > 4) {
01175     ACE_DEBUG((LM_DEBUG,
01176                ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01177                ACE_TEXT("returning repository for domain %d, repo %C.\n"),
01178                domain, repo.c_str()));
01179   }
01180 
01181   return location->second;
01182 }
01183 
01184 OPENDDS_STRING
01185 Service_Participant::bit_transport_ip() const
01186 {
01187   return ACE_TEXT_ALWAYS_CHAR(this->bit_transport_ip_.c_str());
01188 }
01189 
01190 int
01191 Service_Participant::bit_transport_port() const
01192 {
01193   return this->bit_transport_port_;
01194 }
01195 
01196 void
01197 Service_Participant::bit_transport_port(int port)
01198 {
01199   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01200   this->bit_transport_port_ = port;
01201   got_bit_transport_port = true;
01202 }
01203 
01204 int
01205 Service_Participant::bit_lookup_duration_msec() const
01206 {
01207   return bit_lookup_duration_msec_;
01208 }
01209 
01210 void
01211 Service_Participant::bit_lookup_duration_msec(int sec)
01212 {
01213   bit_lookup_duration_msec_ = sec;
01214   got_bit_lookup_duration_msec = true;
01215 }
01216 
01217 size_t
01218 Service_Participant::n_chunks() const
01219 {
01220   return n_chunks_;
01221 }
01222 
01223 void
01224 Service_Participant::n_chunks(size_t chunks)
01225 {
01226   n_chunks_ = chunks;
01227   got_chunks = true;
01228 }
01229 
01230 size_t
01231 Service_Participant::association_chunk_multiplier() const
01232 {
01233   return association_chunk_multiplier_;
01234 }
01235 
01236 void
01237 Service_Participant::association_chunk_multiplier(size_t multiplier)
01238 {
01239   association_chunk_multiplier_ = multiplier;
01240   got_chunk_association_multiplier = true;
01241 }
01242 
01243 void
01244 Service_Participant::liveliness_factor(int factor)
01245 {
01246   liveliness_factor_ = factor;
01247   got_liveliness_factor = true;
01248 }
01249 
01250 int
01251 Service_Participant::liveliness_factor() const
01252 {
01253   return liveliness_factor_;
01254 }
01255 
01256 void
01257 Service_Participant::register_discovery_type(const char* section_name,
01258                                              Discovery::Config* cfg)
01259 {
01260   delete discovery_types_[section_name];
01261   discovery_types_[section_name] = cfg;
01262 }
01263 
01264 int
01265 Service_Participant::load_configuration()
01266 {
01267   int status = 0;
01268 
01269   if ((status = this->cf_.open()) != 0)
01270     ACE_ERROR_RETURN((LM_ERROR,
01271                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01272                       ACE_TEXT("open() returned %d\n"),
01273                       status),
01274                      -1);
01275 
01276   ACE_Ini_ImpExp import(this->cf_);
01277   status = import.import_config(config_fname.c_str());
01278 
01279   if (status != 0) {
01280     ACE_ERROR_RETURN((LM_ERROR,
01281                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01282                       ACE_TEXT("import_config () returned %d\n"),
01283                       status),
01284                      -1);
01285   } else {
01286     status = this->load_configuration(this->cf_, config_fname.c_str());
01287   }
01288   return status;
01289 }
01290 
01291 int
01292 Service_Participant::load_configuration(
01293   ACE_Configuration_Heap& config,
01294   const ACE_TCHAR* filename)
01295 {
01296   int status = 0;
01297 
01298   status = this->load_common_configuration(config, filename);
01299 
01300   if (status != 0) {
01301     ACE_ERROR_RETURN((LM_ERROR,
01302                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01303                       ACE_TEXT("load_common_configuration () returned %d\n"),
01304                       status),
01305                      -1);
01306   }
01307 
01308   // Register static discovery.
01309   this->add_discovery(static_rchandle_cast<Discovery>(StaticDiscovery::instance()));
01310 
01311   status = this->load_discovery_configuration(config, RTPS_SECTION_NAME);
01312 
01313   if (status != 0) {
01314     ACE_ERROR_RETURN((LM_ERROR,
01315                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01316                       ACE_TEXT("load_discovery_configuration() returned %d\n"),
01317                       status),
01318                      -1);
01319   }
01320 
01321   status = this->load_discovery_configuration(config, REPO_SECTION_NAME);
01322 
01323   if (status != 0) {
01324     ACE_ERROR_RETURN((LM_ERROR,
01325                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01326                       ACE_TEXT("load_discovery_configuration() returned %d\n"),
01327                       status),
01328                      -1);
01329   }
01330 
01331   status = TransportRegistry::instance()->load_transport_configuration(
01332              ACE_TEXT_ALWAYS_CHAR(filename), config);
01333   if (this->global_transport_config_ != ACE_TEXT("")) {
01334     TransportConfig_rch config = TransportRegistry::instance()->get_config(
01335       ACE_TEXT_ALWAYS_CHAR(this->global_transport_config_.c_str()));
01336     if (config == 0) {
01337       ACE_ERROR_RETURN((LM_ERROR,
01338                         ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01339                         ACE_TEXT("Unable to locate specified global transport config: %s\n"),
01340                         this->global_transport_config_.c_str()),
01341                        -1);
01342     }
01343     TransportRegistry::instance()->global_config(config);
01344   }
01345 
01346   if (status != 0) {
01347     ACE_ERROR_RETURN((LM_ERROR,
01348                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01349                       ACE_TEXT("load_transport_configuration () returned %d\n"),
01350                       status),
01351                      -1);
01352   }
01353 
01354   // Needs to be loaded after the [rtps_discovery/*] and [repository/*]
01355   // sections to allow error reporting on bad discovery config names.
01356   // Also loaded after the transport configuration so that
01357   // DefaultTransportConfig within [domain/*] can use TransportConfig objects.
01358   status = this->load_domain_configuration(config, filename);
01359 
01360   if (status != 0) {
01361     ACE_ERROR_RETURN((LM_ERROR,
01362                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01363                       ACE_TEXT("load_domain_configuration () returned %d\n"),
01364                       status),
01365                      -1);
01366   }
01367 
01368   // Needs to be loaded after transport configs and instances and domains.
01369   status = StaticDiscovery::instance()->load_configuration(config);
01370 
01371   if (status != 0) {
01372     ACE_ERROR_RETURN((LM_ERROR,
01373                       ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01374                       ACE_TEXT("load_discovery_configuration() returned %d\n"),
01375                       status),
01376                      -1);
01377   }
01378 
01379   return 0;
01380 }
01381 
01382 int
01383 Service_Participant::load_common_configuration(ACE_Configuration_Heap& cf,
01384                                                const ACE_TCHAR* filename)
01385 {
01386   const ACE_Configuration_Section_Key &root = cf.root_section();
01387   ACE_Configuration_Section_Key sect;
01388 
01389   if (cf.open_section(root, COMMON_SECTION_NAME, 0, sect) != 0) {
01390     if (DCPS_debug_level > 0) {
01391       // This is not an error if the configuration file does not have
01392       // a common section. The code default configuration will be used.
01393       ACE_DEBUG((LM_NOTICE,
01394                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_common_configuration ")
01395                  ACE_TEXT("failed to open section %s\n"),
01396                  COMMON_SECTION_NAME));
01397     }
01398 
01399     return 0;
01400 
01401   } else {
01402     if (got_debug_level) {
01403       ACE_DEBUG((LM_NOTICE,
01404                  ACE_TEXT("(%P|%t) NOTICE: using DCPSDebugLevel value from command option (overrides value if it's in config file).\n")));
01405     } else {
01406       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSDebugLevel"), DCPS_debug_level, int)
01407     }
01408 
01409     if (got_info) {
01410       ACE_DEBUG((LM_NOTICE,
01411                  ACE_TEXT("(%P|%t) NOTICE: using DCPSInfoRepo value from command option (overrides value if it's in config file).\n")));
01412     } else {
01413       ACE_TString value;
01414       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSInfoRepo"), value)
01415       if (!value.empty()) {
01416         this->set_repo_ior(value.c_str(), Discovery::DEFAULT_REPO);
01417       }
01418     }
01419 
01420     if (got_use_rti_serialization) {
01421       ACE_DEBUG((LM_NOTICE,
01422                  ACE_TEXT("(%P|%t) NOTICE: using DCPSRTISerialization value from command option (overrides value if it's in config file).\n")));
01423     } else {
01424       bool should_use = false;
01425       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSRTISerialization"), should_use, bool)
01426       Serializer::set_use_rti_serialization(should_use);
01427     }
01428 
01429     if (got_chunks) {
01430       ACE_DEBUG((LM_NOTICE,
01431                  ACE_TEXT("(%P|%t) NOTICE: using DCPSChunks value from command option (overrides value if it's in config file).\n")));
01432     } else {
01433       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunks"), this->n_chunks_, size_t)
01434     }
01435 
01436     if (got_chunk_association_multiplier) {
01437       ACE_DEBUG((LM_NOTICE,
01438                  ACE_TEXT("(%P|%t) NOTICE: using DCPSChunkAssociationMutltiplier value from command option (overrides value if it's in config file).\n")));
01439     } else {
01440       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMutltiplier"), this->association_chunk_multiplier_, size_t)
01441     }
01442 
01443     if (got_bit_transport_port) {
01444       ACE_DEBUG((LM_NOTICE,
01445                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportPort value from command option (overrides value if it's in config file).\n")));
01446     } else {
01447       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportPort"), this->bit_transport_port_, int)
01448     }
01449 
01450     if (got_bit_transport_ip) {
01451       ACE_DEBUG((LM_DEBUG,
01452                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportIPAddress value from command option (overrides value if it's in config file).\n")));
01453     } else {
01454       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportIPAddress"), this->bit_transport_ip_)
01455     }
01456 
01457     if (got_liveliness_factor) {
01458       ACE_DEBUG((LM_DEBUG,
01459                  ACE_TEXT("(%P|%t) NOTICE: using DCPSLivelinessFactor value from command option (overrides value if it's in config file).\n")));
01460     } else {
01461       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLivelinessFactor"), this->liveliness_factor_, int)
01462     }
01463 
01464     if (got_bit_lookup_duration_msec) {
01465       ACE_DEBUG((LM_DEBUG,
01466                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBitLookupDurationMsec value from command option (overrides value if it's in config file).\n")));
01467     } else {
01468       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitLookupDurationMsec"), this->bit_lookup_duration_msec_, int)
01469     }
01470 
01471     if (got_global_transport_config) {
01472       ACE_DEBUG((LM_DEBUG,
01473                  ACE_TEXT("(%P|%t) NOTICE: using DCPSGlobalTransportConfig value from command option (overrides value if it's in config file).\n")));
01474     } else {
01475       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSGlobalTransportConfig"), this->global_transport_config_);
01476       if (this->global_transport_config_ == ACE_TEXT("$file")) {
01477         // When the special string of "$file" is used, substitute the file name
01478         this->global_transport_config_ = filename;
01479       }
01480     }
01481 
01482     if (got_bit_flag) {
01483       ACE_DEBUG((LM_DEBUG,
01484                  ACE_TEXT("(%P|%t) NOTICE: using DCPSBit value from command option (overrides value if it's in config file).\n")));
01485     } else {
01486       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBit"), this->bit_enabled_, int)
01487     }
01488 
01489     if (got_transport_debug_level) {
01490       ACE_DEBUG((LM_NOTICE,
01491                  ACE_TEXT("(%P|%t) NOTICE: using DCPSTransportDebugLevel value from command option (overrides value if it's in config file).\n")));
01492     } else {
01493       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSTransportDebugLevel"), OpenDDS::DCPS::Transport_debug_level, int)
01494     }
01495 
01496 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01497     if (got_persistent_data_dir) {
01498       ACE_DEBUG((LM_NOTICE,
01499                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPersistentDataDir value from command option (overrides value if it's in config file).\n")));
01500     } else {
01501       ACE_TString value;
01502       GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSPersistentDataDir"), value)
01503       this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(value.c_str());
01504     }
01505 #endif
01506 
01507     if (got_pending_timeout) {
01508       ACE_DEBUG((LM_NOTICE,
01509                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPendingTimeout value from command option (overrides value if it's in config file).\n")));
01510     } else {
01511       int timeout = 0;
01512       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPendingTimeout"), timeout, int)
01513       this->pending_timeout_ = timeout;
01514     }
01515 
01516     if (got_publisher_content_filter) {
01517       ACE_DEBUG((LM_DEBUG,
01518                  ACE_TEXT("(%P|%t) NOTICE: using DCPSPublisherContentFilter ")
01519                  ACE_TEXT("value from command option (overrides value if it's ")
01520                  ACE_TEXT("in config file).\n")));
01521     } else {
01522       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPublisherContentFilter"),
01523         this->publisher_content_filter_, bool)
01524     }
01525 
01526     if (got_default_discovery) {
01527       ACE_Configuration::VALUETYPE type;
01528       if (cf.find_value(sect, ACE_TEXT("DCPSDefaultDiscovery"), type) != -1) {
01529         ACE_DEBUG((LM_NOTICE,
01530                    ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultDiscovery value ")
01531                    ACE_TEXT("from command option, overriding config file\n")));
01532       }
01533     } else {
01534       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultDiscovery"),
01535         this->defaultDiscovery_);
01536     }
01537 
01538     ACE_Configuration::VALUETYPE type;
01539     if (got_log_fname) {
01540       if (cf.find_value(sect, ACE_TEXT("ORBLogFile"), type) != -1) {
01541         ACE_DEBUG((LM_NOTICE,
01542                    ACE_TEXT("(%P|%t) NOTICE: using ORBLogFile value ")
01543                    ACE_TEXT("from command option, overriding config file\n")));
01544       }
01545     } else {
01546       OPENDDS_STRING log_fname;
01547       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("ORBLogFile"), log_fname);
01548       if (!log_fname.empty()) {
01549         set_log_file_name(log_fname.c_str());
01550       }
01551     }
01552 
01553     if (got_log_verbose) {
01554       if (cf.find_value(sect, ACE_TEXT("ORBVerboseLogging"), type) != -1) {
01555         ACE_DEBUG((LM_NOTICE,
01556                    ACE_TEXT("(%P|%t) NOTICE: using ORBVerboseLogging value ")
01557                    ACE_TEXT("from command option, overriding config file\n")));
01558       }
01559     } else {
01560       unsigned long verbose_logging = 0;
01561       GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ORBVerboseLogging"), verbose_logging, unsigned long);
01562       set_log_verbose(verbose_logging);
01563     }
01564 
01565     if (got_default_address) {
01566       ACE_DEBUG((LM_DEBUG,
01567                  ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultAddress value from command option (overrides value if it's in config file).\n")));
01568     } else {
01569       GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultAddress"), this->default_address_)
01570     }
01571 
01572     // These are not handled on the command line.
01573     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationRecoveryDuration"), this->federation_recovery_duration_, int)
01574     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationInitialBackoffSeconds"), this->federation_initial_backoff_seconds_, int)
01575     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationBackoffMultiplier"), this->federation_backoff_multiplier_, int)
01576     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationLivelinessDuration"), this->federation_liveliness_, int)
01577 
01578 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01579     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_size"), pool_size_, size_t)
01580     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_granularity"), pool_granularity_, size_t)
01581 #endif
01582 
01583     //
01584     // Establish the scheduler if specified.
01585     //
01586     GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("scheduler"), this->schedulerString_)
01587 
01588     suseconds_t usec(0);
01589 
01590     GET_CONFIG_VALUE(cf, sect, ACE_TEXT("scheduler_slice"), usec, suseconds_t)
01591 
01592     if (usec > 0)
01593       this->schedulerQuantum_.usec(usec);
01594   }
01595 
01596   return 0;
01597 }
01598 
01599 int
01600 Service_Participant::load_domain_configuration(ACE_Configuration_Heap& cf,
01601                                                const ACE_TCHAR* filename)
01602 {
01603   const ACE_Configuration_Section_Key& root = cf.root_section();
01604   ACE_Configuration_Section_Key domain_sect;
01605 
01606   if (cf.open_section(root, DOMAIN_SECTION_NAME, 0, domain_sect) != 0) {
01607     if (DCPS_debug_level > 0) {
01608       // This is not an error if the configuration file does not have
01609       // any domain (sub)section. The code default configuration will be used.
01610       ACE_DEBUG((LM_NOTICE,
01611                  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_configuration ")
01612                  ACE_TEXT("failed to open [%s] section - using code default.\n"),
01613                  DOMAIN_SECTION_NAME));
01614     }
01615 
01616     return 0;
01617 
01618   } else {
01619     // Ensure there are no properties in this section
01620     ValueMap vm;
01621     if (pullValues(cf, domain_sect, vm) > 0) {
01622       // There are values inside [domain]
01623       ACE_ERROR_RETURN((LM_ERROR,
01624                         ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01625                         ACE_TEXT("domain sections must have a subsection name\n")),
01626                        -1);
01627     }
01628     // Process the subsections of this section (the individual domains)
01629     KeyList keys;
01630     if (processSections(cf, domain_sect, keys) != 0) {
01631       ACE_ERROR_RETURN((LM_ERROR,
01632                         ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01633                         ACE_TEXT("too many nesting layers in the [domain] section.\n")),
01634                        -1);
01635     }
01636 
01637     // Loop through the [domain/*] sections
01638     for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01639       OPENDDS_STRING domain_name = it->first;
01640 
01641       ValueMap values;
01642       pullValues(cf, it->second, values);
01643       DDS::DomainId_t domainId = -1;
01644       Discovery::RepoKey repoKey;
01645       OPENDDS_STRING perDomainDefaultTportConfig;
01646       for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01647         OPENDDS_STRING name = it->first;
01648         if (name == "DomainId") {
01649           OPENDDS_STRING value = it->second;
01650           if (!convertToInteger(value, domainId)) {
01651             ACE_ERROR_RETURN((LM_ERROR,
01652                               ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01653                               ACE_TEXT("Illegal integer value for DomainId (%C) in [domain/%C] section.\n"),
01654                               value.c_str(), domain_name.c_str()),
01655                              -1);
01656           }
01657           if (DCPS_debug_level > 0) {
01658             ACE_DEBUG((LM_DEBUG,
01659                        ACE_TEXT("(%P|%t) [domain/%C]: DomainId == %d\n"),
01660                        domain_name.c_str(), domainId));
01661           }
01662         } else if (name == "DomainRepoKey") {
01663           // We will still process this for backward compatibility, but
01664           // it can now be replaced by "DiscoveryConfig=REPO:<key>"
01665           repoKey = it->second;
01666           if (repoKey == "-1") {
01667             repoKey = Discovery::DEFAULT_REPO;
01668           }
01669 
01670           if (DCPS_debug_level > 0) {
01671             ACE_DEBUG((LM_DEBUG,
01672                        ACE_TEXT("(%P|%t) [domain/%C]: DomainRepoKey == %C\n"),
01673                        domain_name.c_str(), repoKey.c_str()));
01674           }
01675         } else if (name == "DiscoveryConfig") {
01676           repoKey = it->second;
01677 
01678         } else if (name == "DefaultTransportConfig") {
01679           if (it->second == "$file") {
01680             // When the special string of "$file" is used, substitute the file name
01681             perDomainDefaultTportConfig = ACE_TEXT_ALWAYS_CHAR(filename);
01682 
01683           } else {
01684             perDomainDefaultTportConfig = it->second;
01685           }
01686 
01687         } else {
01688           ACE_ERROR_RETURN((LM_ERROR,
01689                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01690                             ACE_TEXT("Unexpected entry (%C) in [domain/%C] section.\n"),
01691                             name.c_str(), domain_name.c_str()),
01692                            -1);
01693         }
01694       }
01695 
01696       if (domainId == -1) {
01697         // DomainId parameter is not set, try using the domain name as an ID
01698         if (!convertToInteger(domain_name, domainId)) {
01699           ACE_ERROR_RETURN((LM_ERROR,
01700                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01701                             ACE_TEXT("Missing DomainId value in [domain/%C] section.\n"),
01702                             domain_name.c_str()),
01703                            -1);
01704         }
01705       }
01706 
01707       if (!perDomainDefaultTportConfig.empty()) {
01708         TransportRegistry* const reg = TransportRegistry::instance();
01709         TransportConfig_rch tc = reg->get_config(perDomainDefaultTportConfig);
01710         if (tc.is_nil()) {
01711           ACE_ERROR_RETURN((LM_ERROR,
01712             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01713             ACE_TEXT("Unknown transport config %C in [domain/%C] section.\n"),
01714             perDomainDefaultTportConfig.c_str(), domain_name.c_str()), -1);
01715         } else {
01716           reg->domain_default_config(domainId, tc);
01717         }
01718       }
01719 
01720       // Check to see if the specified discovery configuration has been defined
01721       if (!repoKey.empty()) {
01722         if ((repoKey != Discovery::DEFAULT_REPO) &&
01723             (repoKey != Discovery::DEFAULT_RTPS) &&
01724             (repoKey != Discovery::DEFAULT_STATIC) &&
01725             (this->discoveryMap_.find(repoKey) == this->discoveryMap_.end())) {
01726           ACE_ERROR_RETURN((LM_ERROR,
01727                             ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01728                             ACE_TEXT("Specified configuration (%C) not found.  Referenced in [domain/%C] section.\n"),
01729                             repoKey.c_str(), domain_name.c_str()),
01730                            -1);
01731         }
01732         this->set_repo_domain(domainId, repoKey);
01733       }
01734     }
01735   }
01736 
01737   return 0;
01738 }
01739 
01740 int
01741 Service_Participant::load_discovery_configuration(ACE_Configuration_Heap& cf,
01742                                                   const ACE_TCHAR* section_name)
01743 {
01744   const ACE_Configuration_Section_Key &root = cf.root_section();
01745   ACE_Configuration_Section_Key sect;
01746   if (cf.open_section(root, section_name, 0, sect) == 0) {
01747 
01748     const OPENDDS_STRING sect_name = ACE_TEXT_ALWAYS_CHAR(section_name);
01749     OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter =
01750       this->discovery_types_.find(sect_name);
01751 
01752     if (iter == this->discovery_types_.end()) {
01753       // See if we can dynamically load the required libraries
01754       TheTransportRegistry->load_transport_lib(sect_name);
01755       iter = this->discovery_types_.find(sect_name);
01756     }
01757 
01758     if (iter != this->discovery_types_.end()) {
01759       // discovery code is loaded, process options
01760       return iter->second->discovery_config(cf);
01761     } else {
01762       // No discovery code can be loaded, report an error
01763       ACE_ERROR_RETURN((LM_ERROR,
01764                         ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
01765                         ACE_TEXT("load_discovery_configuration ")
01766                         ACE_TEXT("Unable to load libraries for %s\n"),
01767                         section_name),
01768                        -1);
01769     }
01770   }
01771   return 0;
01772 }
01773 
01774 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01775 void
01776 Service_Participant::configure_pool()
01777 {
01778   if (pool_size_) {
01779     SafetyProfilePool::instance()->configure_pool(pool_size_, pool_granularity_);
01780     SafetyProfilePool::instance()->install();
01781   }
01782 }
01783 #endif
01784 
01785 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01786 DataDurabilityCache *
01787 Service_Participant::get_data_durability_cache(
01788   DDS::DurabilityQosPolicy const & durability)
01789 {
01790   DDS::DurabilityQosPolicyKind const kind =
01791     durability.kind;
01792 
01793   DataDurabilityCache * cache = 0;
01794 
01795   if (kind == DDS::TRANSIENT_DURABILITY_QOS) {
01796     {
01797       ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01798                        guard,
01799                        this->factory_lock_,
01800                        0);
01801 
01802       if (this->transient_data_cache_.get() == 0) {
01803         ACE_auto_ptr_reset(this->transient_data_cache_,
01804                            new DataDurabilityCache(kind));
01805       }
01806     }
01807 
01808     cache = this->transient_data_cache_.get();
01809 
01810   } else if (kind == DDS::PERSISTENT_DURABILITY_QOS) {
01811     {
01812       ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01813                        guard,
01814                        this->factory_lock_,
01815                        0);
01816 
01817       try {
01818         if (this->persistent_data_cache_.get() == 0) {
01819           ACE_auto_ptr_reset(this->persistent_data_cache_,
01820                              new DataDurabilityCache(kind,
01821                                                      this->persistent_data_dir_));
01822         }
01823 
01824       } catch (const std::exception& ex) {
01825         if (DCPS_debug_level > 0) {
01826           ACE_ERROR((LM_WARNING,
01827                      ACE_TEXT("(%P|%t) WARNING: Service_Participant::get_data_durability_cache ")
01828                      ACE_TEXT("failed to create PERSISTENT cache, falling back on ")
01829                      ACE_TEXT("TRANSIENT behavior: %C\n"), ex.what()));
01830         }
01831 
01832         ACE_auto_ptr_reset(this->persistent_data_cache_,
01833                            new DataDurabilityCache(DDS::TRANSIENT_DURABILITY_QOS));
01834       }
01835     }
01836 
01837     cache = this->persistent_data_cache_.get();
01838   }
01839 
01840   return cache;
01841 }
01842 #endif
01843 
01844 void
01845 Service_Participant::add_discovery(Discovery_rch discovery)
01846 {
01847   if (discovery != 0) {
01848     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01849     this->discoveryMap_[discovery->key()] = discovery;
01850   }
01851 }
01852 
01853 const Service_Participant::RepoKeyDiscoveryMap&
01854 Service_Participant::discoveryMap() const
01855 {
01856   return this->discoveryMap_;
01857 }
01858 
01859 const Service_Participant::DomainRepoMap&
01860 Service_Participant::domainRepoMap() const
01861 {
01862   return this->domainRepoMap_;
01863 }
01864 
01865 Recorder_ptr
01866 Service_Participant::create_recorder(DDS::DomainParticipant_ptr participant,
01867                                      DDS::Topic_ptr a_topic,
01868                                      const DDS::SubscriberQos& subscriber_qos,
01869                                      const DDS::DataReaderQos& datareader_qos,
01870                                      const RecorderListener_rch& a_listener)
01871 {
01872   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01873   if (participant_servant)
01874     return participant_servant->create_recorder(a_topic, subscriber_qos, datareader_qos, a_listener, 0);
01875   return 0;
01876 }
01877 
01878 DDS::ReturnCode_t
01879 Service_Participant::delete_recorder(Recorder_ptr recorder)
01880 {
01881   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01882   RecorderImpl* impl = static_cast<RecorderImpl*>(recorder);
01883   if (impl){
01884     ret = impl->cleanup();
01885     impl->participant()->delete_recorder(recorder);
01886   }
01887   return ret;
01888 }
01889 
01890 Replayer_ptr
01891 Service_Participant::create_replayer(DDS::DomainParticipant_ptr participant,
01892                                      DDS::Topic_ptr a_topic,
01893                                      const DDS::PublisherQos& publisher_qos,
01894                                      const DDS::DataWriterQos& datawriter_qos,
01895                                      const ReplayerListener_rch& a_listener)
01896 {
01897   ACE_DEBUG((LM_DEBUG, "Service_Participant::create_replayer\n"));
01898   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01899   if (participant_servant)
01900     return participant_servant->create_replayer(a_topic, publisher_qos, datawriter_qos, a_listener, 0);
01901   return 0;
01902 }
01903 
01904 DDS::ReturnCode_t
01905 Service_Participant::delete_replayer(Replayer_ptr replayer)
01906 {
01907   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01908   ReplayerImpl* impl = static_cast<ReplayerImpl*>(replayer);
01909   if (impl) {
01910     ret = impl->cleanup();
01911     impl->participant()->delete_replayer(replayer);
01912   }
01913   return ret;
01914 }
01915 
01916 DDS::Topic_ptr
01917 Service_Participant::create_typeless_topic(DDS::DomainParticipant_ptr participant,
01918                                      const char * topic_name,
01919                                      const char * type_name,
01920                                      bool type_has_keys,
01921                                      const DDS::TopicQos & qos,
01922                                      DDS::TopicListener_ptr a_listener,
01923                                      DDS::StatusMask mask)
01924 {
01925   DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01926   if (! participant_servant) {
01927     return 0;
01928   }
01929   return participant_servant->create_typeless_topic(topic_name, type_name, type_has_keys, qos, a_listener, mask);
01930 }
01931 
01932 } // namespace DCPS
01933 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7