00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "WaitSet.h"
00010 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00011 #include "debug.h"
00012 #include "Service_Participant.h"
00013 #include "BuiltInTopicUtils.h"
00014 #include "DataDurabilityCache.h"
00015 #include "GuidConverter.h"
00016 #include "MonitorFactory.h"
00017 #include "ConfigUtils.h"
00018 #include "RecorderImpl.h"
00019 #include "ReplayerImpl.h"
00020 #include "StaticDiscovery.h"
00021
00022 #include "ace/Singleton.h"
00023 #include "ace/Arg_Shifter.h"
00024 #include "ace/Reactor.h"
00025 #include "ace/Select_Reactor.h"
00026 #include "ace/Configuration_Import_Export.h"
00027 #include "ace/Service_Config.h"
00028 #include "ace/Argv_Type_Converter.h"
00029 #include "ace/Auto_Ptr.h"
00030 #include "ace/Sched_Params.h"
00031 #include "ace/Malloc_Allocator.h"
00032 #include "ace/OS_NS_unistd.h"
00033
00034 #ifdef OPENDDS_SAFETY_PROFILE
00035 #include <stdio.h>
00036 #else
00037 #include <fstream>
00038 #endif
00039
00040 #if !defined (__ACE_INLINE__)
00041 #include "Service_Participant.inl"
00042 #endif
00043
00044 namespace {
00045
00046 void set_log_file_name(const char* fname)
00047 {
00048 #ifdef OPENDDS_SAFETY_PROFILE
00049 ACE_LOG_MSG->msg_ostream(fopen(fname, "a"), true);
00050 #else
00051 std::ofstream* output_stream = new std::ofstream(fname, ios::app);
00052 if (output_stream->bad()) {
00053 delete output_stream;
00054 } else {
00055 ACE_LOG_MSG->msg_ostream(output_stream, true);
00056 }
00057 #endif
00058 ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::LOGGER);
00059 ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
00060 }
00061
00062
00063 void set_log_verbose(unsigned long verbose_logging)
00064 {
00065
00066
00067
00068 typedef void (ACE_Log_Msg::*PTMF)(u_long);
00069 PTMF flagop = &ACE_Log_Msg::set_flags;
00070 u_long value;
00071
00072 switch (verbose_logging)
00073 {
00074 case 0:
00075 flagop = &ACE_Log_Msg::clr_flags;
00076 value = ACE_Log_Msg::VERBOSE | ACE_Log_Msg::VERBOSE_LITE;
00077 break;
00078 case 1:
00079 value = ACE_Log_Msg::VERBOSE_LITE; break;
00080 default:
00081 value = ACE_Log_Msg::VERBOSE; break;
00082 }
00083
00084 (ACE_LOG_MSG->*flagop)(value);
00085 }
00086
00087
00088 }
00089
00090 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00091
00092 namespace OpenDDS {
00093 namespace DCPS {
00094
00095 int Service_Participant::zero_argc = 0;
00096
00097 const size_t DEFAULT_NUM_CHUNKS = 20;
00098
00099 const size_t DEFAULT_CHUNK_MULTIPLIER = 10;
00100
00101 const int DEFAULT_FEDERATION_RECOVERY_DURATION = 900;
00102 const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS = 1;
00103 const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER = 2;
00104 const int DEFAULT_FEDERATION_LIVELINESS = 60;
00105
00106 const int BIT_LOOKUP_DURATION_MSEC = 2000;
00107
00108 static ACE_TString config_fname(ACE_TEXT(""));
00109
00110 static const ACE_TCHAR DEFAULT_REPO_IOR[] = ACE_TEXT("file://repo.ior");
00111
00112 static const ACE_CString DEFAULT_PERSISTENT_DATA_DIR = "OpenDDS-durable-data-dir";
00113
00114 static const ACE_TCHAR COMMON_SECTION_NAME[] = ACE_TEXT("common");
00115 static const ACE_TCHAR DOMAIN_SECTION_NAME[] = ACE_TEXT("domain");
00116 static const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00117 static const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00118
00119 static bool got_debug_level = false;
00120 static bool got_use_rti_serialization = false;
00121 static bool got_info = false;
00122 static bool got_chunks = false;
00123 static bool got_chunk_association_multiplier = false;
00124 static bool got_liveliness_factor = false;
00125 static bool got_bit_transport_port = false;
00126 static bool got_bit_transport_ip = false;
00127 static bool got_bit_lookup_duration_msec = false;
00128 static bool got_global_transport_config = false;
00129 static bool got_bit_flag = false;
00130
00131 #if defined(OPENDDS_SECURITY)
00132 static bool got_security_flag = false;
00133 #endif
00134
00135 static bool got_publisher_content_filter = false;
00136 static bool got_transport_debug_level = false;
00137 static bool got_pending_timeout = false;
00138 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00139 static bool got_persistent_data_dir = false;
00140 #endif
00141 static bool got_default_discovery = false;
00142 #ifndef DDS_DEFAULT_DISCOVERY_METHOD
00143 # ifdef OPENDDS_SAFETY_PROFILE
00144 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_RTPS
00145 # else
00146 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_REPO
00147 # endif
00148 #endif
00149 static bool got_log_fname = false;
00150 static bool got_log_verbose = false;
00151 static bool got_default_address = false;
00152 static bool got_bidir_giop = false;
00153 static bool got_monitor = false;
00154
00155 Service_Participant::Service_Participant()
00156 :
00157 #ifndef OPENDDS_SAFETY_PROFILE
00158 ORB_argv_(false ),
00159 #endif
00160 reactor_owner_(ACE_OS::NULL_thread),
00161 defaultDiscovery_(DDS_DEFAULT_DISCOVERY_METHOD),
00162 n_chunks_(DEFAULT_NUM_CHUNKS),
00163 association_chunk_multiplier_(DEFAULT_CHUNK_MULTIPLIER),
00164 liveliness_factor_(80),
00165 bit_transport_port_(0),
00166 bit_enabled_(
00167 #ifdef DDS_HAS_MINIMUM_BIT
00168 false
00169 #else
00170 true
00171 #endif
00172 ),
00173 #if defined(OPENDDS_SECURITY)
00174 security_enabled_(false),
00175 #endif
00176 bit_lookup_duration_msec_(BIT_LOOKUP_DURATION_MSEC),
00177 global_transport_config_(ACE_TEXT("")),
00178 monitor_factory_(0),
00179 federation_recovery_duration_(DEFAULT_FEDERATION_RECOVERY_DURATION),
00180 federation_initial_backoff_seconds_(DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS),
00181 federation_backoff_multiplier_(DEFAULT_FEDERATION_BACKOFF_MULTIPLIER),
00182 federation_liveliness_(DEFAULT_FEDERATION_LIVELINESS),
00183 schedulerQuantum_(ACE_Time_Value::zero),
00184 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00185 pool_size_(1024*1024*16),
00186 pool_granularity_(8),
00187 #endif
00188 scheduler_(-1),
00189 priority_min_(0),
00190 priority_max_(0),
00191 publisher_content_filter_(true),
00192 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00193 persistent_data_dir_(DEFAULT_PERSISTENT_DATA_DIR),
00194 #endif
00195 pending_timeout_(ACE_Time_Value::zero),
00196 bidir_giop_(true),
00197 monitor_enabled_(false),
00198 shut_down_(false)
00199 {
00200 initialize();
00201 }
00202
00203 Service_Participant::~Service_Participant()
00204 {
00205 shutdown();
00206
00207 if (DCPS_debug_level > 0) {
00208 ACE_DEBUG((LM_DEBUG,
00209 "%T (%P|%t) Service_Participant::~Service_Participant()\n"));
00210 }
00211 }
00212
00213 Service_Participant*
00214 Service_Participant::instance()
00215 {
00216
00217
00218
00219 return ACE_Singleton<Service_Participant, ACE_SYNCH_MUTEX>::instance();
00220 }
00221
00222 int
00223 Service_Participant::ReactorTask::svc()
00224 {
00225 Service_Participant* sp = instance();
00226 sp->reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
00227 sp->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00228 this->wait_for_startup();
00229 sp->reactor_->run_reactor_event_loop();
00230 return 0;
00231 }
00232
00233 ACE_Reactor_Timer_Interface*
00234 Service_Participant::timer() const
00235 {
00236 return reactor_.get();
00237 }
00238
00239 ACE_Reactor*
00240 Service_Participant::reactor() const
00241 {
00242 return reactor_.get();
00243 }
00244
00245 ACE_thread_t
00246 Service_Participant::reactor_owner() const
00247 {
00248 return reactor_owner_;
00249 }
00250
00251 void
00252 Service_Participant::shutdown()
00253 {
00254
00255 if (shut_down_) {
00256 return;
00257 }
00258
00259 shut_down_ = true;
00260 try {
00261 TransportRegistry::instance()->release();
00262 {
00263 ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00264
00265 if (dp_factory_servant_)
00266 dp_factory_servant_->cleanup();
00267
00268 domainRepoMap_.clear();
00269
00270 if (reactor_) {
00271 reactor_->end_reactor_event_loop();
00272 reactor_task_.wait();
00273 }
00274
00275 discoveryMap_.clear();
00276
00277 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00278 transient_data_cache_.reset();
00279 persistent_data_cache_.reset();
00280 #endif
00281
00282 discovery_types_.clear();
00283 }
00284 TransportRegistry::close();
00285 } catch (const CORBA::Exception& ex) {
00286 ex._tao_print_exception("ERROR: Service_Participant::shutdown");
00287 }
00288 }
00289
00290 #ifdef ACE_USES_WCHAR
00291 DDS::DomainParticipantFactory_ptr
00292 Service_Participant::get_domain_participant_factory(int &argc,
00293 char *argv[])
00294 {
00295 ACE_Argv_Type_Converter converter(argc, argv);
00296 return get_domain_participant_factory(converter.get_argc(),
00297 converter.get_TCHAR_argv());
00298 }
00299 #endif
00300
00301 DDS::DomainParticipantFactory_ptr
00302 Service_Participant::get_domain_participant_factory(int &argc,
00303 ACE_TCHAR *argv[])
00304 {
00305 if (!dp_factory_servant_) {
00306 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
00307 guard,
00308 this->factory_lock_,
00309 DDS::DomainParticipantFactory::_nil());
00310
00311 if (!dp_factory_servant_) {
00312
00313
00314
00315
00316
00317 #ifndef OPENDDS_SAFETY_PROFILE
00318 ORB_argv_.add(ACE_TEXT("unused_arg_0"));
00319 #endif
00320 ACE_Arg_Shifter shifter(argc, argv);
00321 while (shifter.is_anything_left()) {
00322 if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBLogFile")) == 0) {
00323 shifter.ignore_arg();
00324 } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBVerboseLogging")) == 0) {
00325 shifter.ignore_arg();
00326 } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) < 0) {
00327 shifter.ignore_arg();
00328 } else {
00329 #ifndef OPENDDS_SAFETY_PROFILE
00330 ORB_argv_.add(shifter.get_current());
00331 #endif
00332 shifter.consume_arg();
00333 if (shifter.is_parameter_next()) {
00334 #ifndef OPENDDS_SAFETY_PROFILE
00335 ORB_argv_.add(shifter.get_current(), true );
00336 #endif
00337 shifter.consume_arg();
00338 }
00339 }
00340 }
00341
00342 if (parse_args(argc, argv) != 0) {
00343 return DDS::DomainParticipantFactory::_nil();
00344 }
00345
00346 if (config_fname == ACE_TEXT("")) {
00347 if (DCPS_debug_level) {
00348 ACE_DEBUG((LM_NOTICE,
00349 ACE_TEXT("(%P|%t) NOTICE: not using file configuration - no configuration ")
00350 ACE_TEXT("file specified.\n")));
00351 }
00352
00353 } else {
00354
00355
00356 FILE* in = ACE_OS::fopen(config_fname.c_str(),
00357 ACE_TEXT("r"));
00358
00359 if (!in) {
00360 ACE_DEBUG((LM_WARNING,
00361 ACE_TEXT("(%P|%t) WARNING: not using file configuration - ")
00362 ACE_TEXT("can not open \"%s\" for reading. %p\n"),
00363 config_fname.c_str(), ACE_TEXT("fopen")));
00364
00365 } else {
00366 ACE_OS::fclose(in);
00367
00368 if (this->load_configuration() != 0) {
00369 ACE_ERROR((LM_ERROR,
00370 ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: ")
00371 ACE_TEXT("load_configuration() failed.\n")));
00372 return DDS::DomainParticipantFactory::_nil();
00373 }
00374 }
00375 }
00376
00377 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00378
00379 configure_pool();
00380 #endif
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 this->initializeScheduling();
00392
00393 dp_factory_servant_ = make_rch<DomainParticipantFactoryImpl>();
00394
00395 if (!reactor_)
00396 reactor_.reset(new ACE_Reactor(new ACE_Select_Reactor, true));
00397
00398 if (reactor_task_.activate(THR_NEW_LWP | THR_JOINABLE) == -1) {
00399 ACE_ERROR((LM_ERROR,
00400 ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00401 ACE_TEXT("Failed to activate the reactor task.\n")));
00402 return DDS::DomainParticipantFactory::_nil();
00403 }
00404
00405 reactor_task_.wait_for_startup();
00406
00407 if (this->monitor_enabled_) {
00408 #if !defined(ACE_AS_STATIC_LIBS)
00409 ACE_TString directive = ACE_TEXT("dynamic OpenDDS_Monitor Service_Object * OpenDDS_monitor:_make_MonitorFactoryImpl()");
00410 ACE_Service_Config::process_directive(directive.c_str());
00411 #endif
00412 this->monitor_factory_ =
00413 ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor");
00414
00415 if (this->monitor_factory_ == 0) {
00416 if (this->monitor_enabled_) {
00417 ACE_ERROR((LM_ERROR,
00418 ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00419 ACE_TEXT("Unable to enable monitor factory.\n")));
00420 }
00421 }
00422 }
00423
00424 if (this->monitor_factory_ == 0) {
00425
00426 MonitorFactory::service_initialize();
00427 this->monitor_factory_ =
00428 ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor_Default");
00429 }
00430 if (this->monitor_enabled_) {
00431 this->monitor_factory_->initialize();
00432 }
00433
00434 this->monitor_.reset(this->monitor_factory_->create_sp_monitor(this));
00435 }
00436 }
00437
00438 return DDS::DomainParticipantFactory::_duplicate(dp_factory_servant_.in());
00439 }
00440
00441 int
00442 Service_Participant::parse_args(int &argc, ACE_TCHAR *argv[])
00443 {
00444 ACE_Arg_Shifter arg_shifter(argc, argv);
00445
00446 while (arg_shifter.is_anything_left()) {
00447 const ACE_TCHAR *currentArg = 0;
00448
00449 if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDebugLevel"))) != 0) {
00450 set_DCPS_debug_level(ACE_OS::atoi(currentArg));
00451 arg_shifter.consume_arg();
00452 got_debug_level = true;
00453
00454 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0) {
00455 this->set_repo_ior(currentArg, Discovery::DEFAULT_REPO);
00456 arg_shifter.consume_arg();
00457 got_info = true;
00458
00459 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSRTISerialization"))) != 0) {
00460 Serializer::set_use_rti_serialization(ACE_OS::atoi(currentArg));
00461 arg_shifter.consume_arg();
00462 got_use_rti_serialization = true;
00463
00464 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunks"))) != 0) {
00465 n_chunks_ = ACE_OS::atoi(currentArg);
00466 arg_shifter.consume_arg();
00467 got_chunks = true;
00468
00469 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunkAssociationMultiplier"))) != 0) {
00470 association_chunk_multiplier_ = ACE_OS::atoi(currentArg);
00471 arg_shifter.consume_arg();
00472 got_chunk_association_multiplier = true;
00473
00474 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSConfigFile"))) != 0) {
00475 config_fname = currentArg;
00476 arg_shifter.consume_arg();
00477
00478 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLivelinessFactor"))) != 0) {
00479 liveliness_factor_ = ACE_OS::atoi(currentArg);
00480 arg_shifter.consume_arg();
00481 got_liveliness_factor = true;
00482
00483 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportPort"))) != 0) {
00484
00485
00486 this->bit_transport_port_ = ACE_OS::atoi(currentArg);
00487 arg_shifter.consume_arg();
00488 got_bit_transport_port = true;
00489
00490 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportIPAddress"))) != 0) {
00491
00492
00493 this->bit_transport_ip_ = currentArg;
00494 arg_shifter.consume_arg();
00495 got_bit_transport_ip = true;
00496
00497 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitLookupDurationMsec"))) != 0) {
00498 bit_lookup_duration_msec_ = ACE_OS::atoi(currentArg);
00499 arg_shifter.consume_arg();
00500 got_bit_lookup_duration_msec = true;
00501
00502 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSGlobalTransportConfig"))) != 0) {
00503 global_transport_config_ = currentArg;
00504 arg_shifter.consume_arg();
00505 got_global_transport_config = true;
00506
00507 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBit"))) != 0) {
00508 bit_enabled_ = ACE_OS::atoi(currentArg);
00509 arg_shifter.consume_arg();
00510 got_bit_flag = true;
00511
00512 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTransportDebugLevel"))) != 0) {
00513 OpenDDS::DCPS::Transport_debug_level = ACE_OS::atoi(currentArg);
00514 arg_shifter.consume_arg();
00515 got_transport_debug_level = true;
00516
00517 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00518 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPersistentDataDir"))) != 0) {
00519 this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00520 arg_shifter.consume_arg();
00521 got_persistent_data_dir = true;
00522 #endif
00523
00524 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPendingTimeout"))) != 0) {
00525 this->pending_timeout_ = ACE_OS::atoi(currentArg);
00526 arg_shifter.consume_arg();
00527 got_pending_timeout = true;
00528
00529 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPublisherContentFilter"))) != 0) {
00530 this->publisher_content_filter_ = ACE_OS::atoi(currentArg);
00531 arg_shifter.consume_arg();
00532 got_publisher_content_filter = true;
00533
00534 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultDiscovery"))) != 0) {
00535 this->defaultDiscovery_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00536 arg_shifter.consume_arg();
00537 got_default_discovery = true;
00538
00539 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBidirGIOP"))) != 0) {
00540 bidir_giop_ = ACE_OS::atoi(currentArg);
00541 arg_shifter.consume_arg();
00542 got_bidir_giop = true;
00543
00544 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationRecoveryDuration"))) != 0) {
00545 this->federation_recovery_duration_ = ACE_OS::atoi(currentArg);
00546 arg_shifter.consume_arg();
00547
00548 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationInitialBackoffSeconds"))) != 0) {
00549 this->federation_initial_backoff_seconds_ = ACE_OS::atoi(currentArg);
00550 arg_shifter.consume_arg();
00551
00552 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationBackoffMultiplier"))) != 0) {
00553 this->federation_backoff_multiplier_ = ACE_OS::atoi(currentArg);
00554 arg_shifter.consume_arg();
00555
00556 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationLivelinessDuration"))) != 0) {
00557 this->federation_liveliness_ = ACE_OS::atoi(currentArg);
00558 arg_shifter.consume_arg();
00559
00560 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBLogFile"))) != 0) {
00561 set_log_file_name(ACE_TEXT_ALWAYS_CHAR(currentArg));
00562 arg_shifter.consume_arg();
00563 got_log_fname = true;
00564
00565 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBVerboseLogging"))) != 0) {
00566 set_log_verbose(ACE_OS::atoi(currentArg));
00567 arg_shifter.consume_arg();
00568 got_log_verbose = true;
00569
00570 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultAddress"))) != 0) {
00571 this->default_address_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00572 arg_shifter.consume_arg();
00573 got_default_address = true;
00574
00575 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSMonitor"))) != 0) {
00576 this->monitor_enabled_ = ACE_OS::atoi(currentArg);
00577 arg_shifter.consume_arg();
00578 got_monitor = true;
00579
00580 #if defined(OPENDDS_SECURITY)
00581 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurity"))) != 0) {
00582 security_enabled_ = ACE_OS::atoi(currentArg);
00583 arg_shifter.consume_arg();
00584 got_security_flag = true;
00585 #endif
00586
00587 } else {
00588 arg_shifter.ignore_arg();
00589 }
00590 }
00591
00592
00593 return 0;
00594 }
00595
00596 void
00597 Service_Participant::initialize()
00598 {
00599
00600
00601
00602
00603 initial_TransportPriorityQosPolicy_.value = 0;
00604 initial_LifespanQosPolicy_.duration.sec = DDS::DURATION_INFINITE_SEC;
00605 initial_LifespanQosPolicy_.duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00606
00607 initial_DurabilityQosPolicy_.kind = DDS::VOLATILE_DURABILITY_QOS;
00608
00609 initial_DurabilityServiceQosPolicy_.service_cleanup_delay.sec =
00610 DDS::DURATION_ZERO_SEC;
00611 initial_DurabilityServiceQosPolicy_.service_cleanup_delay.nanosec =
00612 DDS::DURATION_ZERO_NSEC;
00613 initial_DurabilityServiceQosPolicy_.history_kind =
00614 DDS::KEEP_LAST_HISTORY_QOS;
00615 initial_DurabilityServiceQosPolicy_.history_depth = 1;
00616 initial_DurabilityServiceQosPolicy_.max_samples =
00617 DDS::LENGTH_UNLIMITED;
00618 initial_DurabilityServiceQosPolicy_.max_instances =
00619 DDS::LENGTH_UNLIMITED;
00620 initial_DurabilityServiceQosPolicy_.max_samples_per_instance =
00621 DDS::LENGTH_UNLIMITED;
00622
00623 initial_PresentationQosPolicy_.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
00624 initial_PresentationQosPolicy_.coherent_access = false;
00625 initial_PresentationQosPolicy_.ordered_access = false;
00626
00627 initial_DeadlineQosPolicy_.period.sec = DDS::DURATION_INFINITE_SEC;
00628 initial_DeadlineQosPolicy_.period.nanosec = DDS::DURATION_INFINITE_NSEC;
00629
00630 initial_LatencyBudgetQosPolicy_.duration.sec = DDS::DURATION_ZERO_SEC;
00631 initial_LatencyBudgetQosPolicy_.duration.nanosec = DDS::DURATION_ZERO_NSEC;
00632
00633 initial_OwnershipQosPolicy_.kind = DDS::SHARED_OWNERSHIP_QOS;
00634 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00635 initial_OwnershipStrengthQosPolicy_.value = 0;
00636 #endif
00637
00638 initial_LivelinessQosPolicy_.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00639 initial_LivelinessQosPolicy_.lease_duration.sec = DDS::DURATION_INFINITE_SEC;
00640 initial_LivelinessQosPolicy_.lease_duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00641
00642 initial_TimeBasedFilterQosPolicy_.minimum_separation.sec = DDS::DURATION_ZERO_SEC;
00643 initial_TimeBasedFilterQosPolicy_.minimum_separation.nanosec = DDS::DURATION_ZERO_NSEC;
00644
00645 initial_ReliabilityQosPolicy_.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00646 initial_ReliabilityQosPolicy_.max_blocking_time.sec = DDS::DURATION_INFINITE_SEC;
00647 initial_ReliabilityQosPolicy_.max_blocking_time.nanosec = DDS::DURATION_INFINITE_NSEC;
00648
00649 initial_DestinationOrderQosPolicy_.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00650
00651 initial_HistoryQosPolicy_.kind = DDS::KEEP_LAST_HISTORY_QOS;
00652 initial_HistoryQosPolicy_.depth = 1;
00653
00654 initial_ResourceLimitsQosPolicy_.max_samples = DDS::LENGTH_UNLIMITED;
00655 initial_ResourceLimitsQosPolicy_.max_instances = DDS::LENGTH_UNLIMITED;
00656 initial_ResourceLimitsQosPolicy_.max_samples_per_instance = DDS::LENGTH_UNLIMITED;
00657
00658 initial_EntityFactoryQosPolicy_.autoenable_created_entities = true;
00659
00660 initial_WriterDataLifecycleQosPolicy_.autodispose_unregistered_instances = true;
00661
00662 initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00663 initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00664 initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00665 initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00666
00667 initial_DomainParticipantQos_.user_data = initial_UserDataQosPolicy_;
00668 initial_DomainParticipantQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00669 initial_DomainParticipantFactoryQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00670
00671 initial_TopicQos_.topic_data = initial_TopicDataQosPolicy_;
00672 initial_TopicQos_.durability = initial_DurabilityQosPolicy_;
00673 initial_TopicQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00674 initial_TopicQos_.deadline = initial_DeadlineQosPolicy_;
00675 initial_TopicQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00676 initial_TopicQos_.liveliness = initial_LivelinessQosPolicy_;
00677 initial_TopicQos_.reliability = initial_ReliabilityQosPolicy_;
00678 initial_TopicQos_.destination_order = initial_DestinationOrderQosPolicy_;
00679 initial_TopicQos_.history = initial_HistoryQosPolicy_;
00680 initial_TopicQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00681 initial_TopicQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00682 initial_TopicQos_.lifespan = initial_LifespanQosPolicy_;
00683 initial_TopicQos_.ownership = initial_OwnershipQosPolicy_;
00684
00685 initial_DataWriterQos_.durability = initial_DurabilityQosPolicy_;
00686 initial_DataWriterQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00687 initial_DataWriterQos_.deadline = initial_DeadlineQosPolicy_;
00688 initial_DataWriterQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00689 initial_DataWriterQos_.liveliness = initial_LivelinessQosPolicy_;
00690 initial_DataWriterQos_.reliability = initial_ReliabilityQosPolicy_;
00691 initial_DataWriterQos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00692 initial_DataWriterQos_.reliability.max_blocking_time.sec = 0;
00693 initial_DataWriterQos_.reliability.max_blocking_time.nanosec = 100000000;
00694 initial_DataWriterQos_.destination_order = initial_DestinationOrderQosPolicy_;
00695 initial_DataWriterQos_.history = initial_HistoryQosPolicy_;
00696 initial_DataWriterQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00697 initial_DataWriterQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00698 initial_DataWriterQos_.lifespan = initial_LifespanQosPolicy_;
00699 initial_DataWriterQos_.user_data = initial_UserDataQosPolicy_;
00700 initial_DataWriterQos_.ownership = initial_OwnershipQosPolicy_;
00701 #ifdef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00702 initial_DataWriterQos_.ownership_strength.value = 0;
00703 #else
00704 initial_DataWriterQos_.ownership_strength = initial_OwnershipStrengthQosPolicy_;
00705 #endif
00706 initial_DataWriterQos_.writer_data_lifecycle = initial_WriterDataLifecycleQosPolicy_;
00707
00708 initial_PublisherQos_.presentation = initial_PresentationQosPolicy_;
00709 initial_PublisherQos_.partition = initial_PartitionQosPolicy_;
00710 initial_PublisherQos_.group_data = initial_GroupDataQosPolicy_;
00711 initial_PublisherQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00712
00713 initial_DataReaderQos_.durability = initial_DurabilityQosPolicy_;
00714 initial_DataReaderQos_.deadline = initial_DeadlineQosPolicy_;
00715 initial_DataReaderQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00716 initial_DataReaderQos_.liveliness = initial_LivelinessQosPolicy_;
00717 initial_DataReaderQos_.reliability = initial_ReliabilityQosPolicy_;
00718 initial_DataReaderQos_.destination_order = initial_DestinationOrderQosPolicy_;
00719 initial_DataReaderQos_.history = initial_HistoryQosPolicy_;
00720 initial_DataReaderQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00721 initial_DataReaderQos_.user_data = initial_UserDataQosPolicy_;
00722 initial_DataReaderQos_.time_based_filter = initial_TimeBasedFilterQosPolicy_;
00723 initial_DataReaderQos_.ownership = initial_OwnershipQosPolicy_;
00724 initial_DataReaderQos_.reader_data_lifecycle = initial_ReaderDataLifecycleQosPolicy_;
00725
00726 initial_SubscriberQos_.presentation = initial_PresentationQosPolicy_;
00727 initial_SubscriberQos_.partition = initial_PartitionQosPolicy_;
00728 initial_SubscriberQos_.group_data = initial_GroupDataQosPolicy_;
00729 initial_SubscriberQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00730 }
00731
00732 void
00733 Service_Participant::initializeScheduling()
00734 {
00735
00736
00737
00738 if (this->schedulerString_.length() == 0) {
00739 if (DCPS_debug_level > 0) {
00740 ACE_DEBUG((LM_NOTICE,
00741 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::intializeScheduling() - ")
00742 ACE_TEXT("no scheduling policy specified, not setting policy.\n")));
00743 }
00744
00745 } else {
00746
00747
00748
00749 int ace_scheduler = ACE_SCHED_OTHER;
00750 this->scheduler_ = THR_SCHED_DEFAULT;
00751
00752 if (this->schedulerString_ == ACE_TEXT("SCHED_RR")) {
00753 this->scheduler_ = THR_SCHED_RR;
00754 ace_scheduler = ACE_SCHED_RR;
00755
00756 } else if (this->schedulerString_ == ACE_TEXT("SCHED_FIFO")) {
00757 this->scheduler_ = THR_SCHED_FIFO;
00758 ace_scheduler = ACE_SCHED_FIFO;
00759
00760 } else if (this->schedulerString_ == ACE_TEXT("SCHED_OTHER")) {
00761 this->scheduler_ = THR_SCHED_DEFAULT;
00762 ace_scheduler = ACE_SCHED_OTHER;
00763
00764 } else {
00765 ACE_DEBUG((LM_WARNING,
00766 ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00767 ACE_TEXT("unrecognized scheduling policy: %s, set to SCHED_OTHER.\n"),
00768 this->schedulerString_.c_str()));
00769 }
00770
00771
00772
00773
00774 #ifdef ACE_WIN32
00775 ACE_DEBUG((LM_NOTICE,
00776 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::initializeScheduling() - ")
00777 ACE_TEXT("scheduling is not implemented on Win32.\n")));
00778 ACE_UNUSED_ARG(ace_scheduler);
00779 #else
00780 ACE_Sched_Params params(
00781 ace_scheduler,
00782 ACE_Sched_Params::priority_min(ace_scheduler),
00783 ACE_SCOPE_THREAD,
00784 this->schedulerQuantum_);
00785
00786 if (ACE_OS::sched_params(params) != 0) {
00787 if (ACE_OS::last_error() == EPERM) {
00788 ACE_DEBUG((LM_WARNING,
00789 ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00790 ACE_TEXT("user is not superuser, requested scheduler not set.\n")));
00791
00792 } else {
00793 ACE_ERROR((LM_ERROR,
00794 ACE_TEXT("(%P|%t) ERROR: Service_Participant::initializeScheduling() - ")
00795 ACE_TEXT("sched_params failed: %m.\n")));
00796 }
00797
00798
00799 this->scheduler_ = -1;
00800 ace_scheduler = ACE_SCHED_OTHER;
00801
00802 } else if (DCPS_debug_level > 0) {
00803 ACE_DEBUG((LM_DEBUG,
00804 ACE_TEXT("(%P|%t) Service_Participant::initializeScheduling() - ")
00805 ACE_TEXT("scheduling policy set to %s(%d).\n"),
00806 this->schedulerString_.c_str()));
00807 }
00808
00809
00810
00811
00812 this->priority_min_ = ACE_Sched_Params::priority_min(ace_scheduler, ACE_SCOPE_THREAD);
00813 this->priority_max_ = ACE_Sched_Params::priority_max(ace_scheduler, ACE_SCOPE_THREAD);
00814 #endif // ACE_WIN32
00815 }
00816 }
00817
00818 #ifdef DDS_HAS_WCHAR
00819 bool
00820 Service_Participant::set_repo_ior(const wchar_t* ior,
00821 Discovery::RepoKey key,
00822 bool attach_participant)
00823 {
00824 return set_repo_ior(ACE_Wide_To_Ascii(ior).char_rep(), key, attach_participant);
00825 }
00826 #endif
00827
00828 bool
00829 Service_Participant::set_repo_ior(const char* ior,
00830 Discovery::RepoKey key,
00831 bool attach_participant)
00832 {
00833 if (DCPS_debug_level > 0) {
00834 ACE_DEBUG((LM_DEBUG,
00835 ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior: Repo[ %C] == %C\n"),
00836 key.c_str(), ior));
00837 }
00838
00839
00840
00841 got_info = true;
00842
00843 if (key == "-1") {
00844 key = Discovery::DEFAULT_REPO;
00845 }
00846
00847 const OPENDDS_STRING repo_type = ACE_TEXT_ALWAYS_CHAR(REPO_SECTION_NAME);
00848 if (!discovery_types_.count(repo_type)) {
00849
00850
00851 TheTransportRegistry->load_transport_lib(repo_type);
00852 }
00853
00854 if (discovery_types_.count(repo_type)) {
00855 ACE_Configuration_Heap cf;
00856 cf.open();
00857 ACE_Configuration_Section_Key sect_key;
00858 ACE_TString section = REPO_SECTION_NAME;
00859 section += ACE_TEXT('\\');
00860 section += ACE_TEXT_CHAR_TO_TCHAR(key.c_str());
00861 cf.open_section(cf.root_section(), section.c_str(), 1 , sect_key);
00862 cf.set_string_value(sect_key, ACE_TEXT("RepositoryIor"),
00863 ACE_TEXT_CHAR_TO_TCHAR(ior));
00864
00865 discovery_types_[repo_type]->discovery_config(cf);
00866
00867 this->remap_domains(key, key, attach_participant);
00868 return true;
00869 }
00870
00871 ACE_ERROR_RETURN((LM_ERROR,
00872 ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior ")
00873 ACE_TEXT("ERROR - no discovery type registered for ")
00874 ACE_TEXT("InfoRepoDiscovery\n")),
00875 false);
00876 }
00877
00878 void
00879 Service_Participant::remap_domains(Discovery::RepoKey oldKey,
00880 Discovery::RepoKey newKey,
00881 bool attach_participant)
00882 {
00883
00884 OPENDDS_VECTOR(DDS::DomainId_t) domainList;
00885 {
00886 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00887
00888 for (DomainRepoMap::const_iterator current = this->domainRepoMap_.begin();
00889 current != this->domainRepoMap_.end();
00890 ++current) {
00891 if (current->second == oldKey) {
00892 domainList.push_back(current->first);
00893 }
00894 }
00895 }
00896
00897
00898 for (unsigned int index = 0; index < domainList.size(); ++index) {
00899
00900
00901 this->set_repo_domain(domainList[ index], newKey, attach_participant);
00902 }
00903 }
00904
00905 void
00906 Service_Participant::set_repo_domain(const DDS::DomainId_t domain,
00907 Discovery::RepoKey key,
00908 bool attach_participant)
00909 {
00910 typedef std::pair<Discovery_rch, RepoId> DiscRepoPair;
00911 OPENDDS_VECTOR(DiscRepoPair) repoList;
00912 {
00913 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00914 DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
00915
00916 if (key == "-1") {
00917 key = Discovery::DEFAULT_REPO;
00918 }
00919
00920 if ((where == this->domainRepoMap_.end()) || (where->second != key)) {
00921
00922
00923 this->domainRepoMap_[ domain] = key;
00924
00925 if (DCPS_debug_level > 0) {
00926 ACE_DEBUG((LM_DEBUG,
00927 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00928 ACE_TEXT("Domain[ %d] = Repo[ %C].\n"),
00929 domain, key.c_str()));
00930 }
00931 }
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942
00943
00944 if (this->dp_factory_servant_) {
00945
00946 const DomainParticipantFactoryImpl::DPMap& participants
00947 = this->dp_factory_servant_->participants();
00948
00949
00950 DomainParticipantFactoryImpl::DPMap::const_iterator
00951 which = participants.find(domain);
00952
00953 if (which != participants.end()) {
00954
00955 RepoKeyDiscoveryMap::const_iterator disc_iter = this->discoveryMap_.find(key);
00956
00957 if (disc_iter != this->discoveryMap_.end()) {
00958 for (DomainParticipantFactoryImpl::DPSet::const_iterator
00959 current = which->second.begin();
00960 current != which->second.end();
00961 ++current) {
00962 try {
00963
00964
00965 RepoId id = (*current)->get_id();
00966 repoList.push_back(std::make_pair(disc_iter->second, id));
00967
00968 if (DCPS_debug_level > 0) {
00969 GuidConverter converter(id);
00970 ACE_DEBUG((LM_DEBUG,
00971 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00972 ACE_TEXT("participant %C attached to Repo[ %C].\n"),
00973 OPENDDS_STRING(converter).c_str(),
00974 key.c_str()));
00975 }
00976
00977 } catch (const CORBA::Exception& ex) {
00978 ex._tao_print_exception(
00979 "ERROR: Service_Participant::set_repo_domain: failed to attach repository - ");
00980 return;
00981 }
00982 }
00983 }
00984 }
00985 }
00986 }
00987
00988
00989 for (unsigned int index = 0; index < repoList.size(); ++index) {
00990 if (DCPS_debug_level > 0) {
00991 GuidConverter converter(repoList[ index].second);
00992 ACE_DEBUG((LM_DEBUG,
00993 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00994 ACE_TEXT("(%d of %d) attaching domain %d participant %C to Repo[ %C].\n"),
00995 (1+index), repoList.size(), domain,
00996 OPENDDS_STRING(converter).c_str(),
00997 key.c_str()));
00998 }
00999
01000 if (attach_participant)
01001 {
01002 repoList[ index].first->attach_participant(domain, repoList[ index].second);
01003 }
01004 }
01005 }
01006
01007 void
01008 Service_Participant::repository_lost(Discovery::RepoKey key)
01009 {
01010
01011 RepoKeyDiscoveryMap::iterator initialLocation = this->discoveryMap_.find(key);
01012 RepoKeyDiscoveryMap::iterator current = initialLocation;
01013
01014 if (current == this->discoveryMap_.end()) {
01015 ACE_DEBUG((LM_WARNING,
01016 ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01017 ACE_TEXT("lost repository %C was not present, ")
01018 ACE_TEXT("finding another anyway.\n"),
01019 key.c_str()));
01020
01021 } else {
01022
01023 ++current;
01024 }
01025
01026
01027 ACE_Time_Value recoveryFailedTime
01028 = ACE_OS::gettimeofday()
01029 + ACE_Time_Value(this->federation_recovery_duration(), 0);
01030
01031
01032 int backoff = this->federation_initial_backoff_seconds();
01033
01034
01035 while (recoveryFailedTime > ACE_OS::gettimeofday()) {
01036
01037
01038 if (current == this->discoveryMap_.end()) {
01039
01040 current = this->discoveryMap_.begin();
01041 }
01042
01043
01044
01045 if (current == initialLocation) {
01046 if (DCPS_debug_level > 0) {
01047 ACE_DEBUG((LM_DEBUG,
01048 ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01049 ACE_TEXT("waiting %d seconds to traverse the ")
01050 ACE_TEXT("repository list another time ")
01051 ACE_TEXT("for lost key %C.\n"),
01052 backoff,
01053 key.c_str()));
01054 }
01055
01056
01057 ACE_OS::sleep(backoff);
01058
01059
01060 backoff *= this->federation_backoff_multiplier();
01061
01062
01063
01064 }
01065
01066
01067 if (current->second->active()) {
01068
01069 if (DCPS_debug_level > 0) {
01070 ACE_DEBUG((LM_DEBUG,
01071 ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01072 ACE_TEXT("replacing repository %C with %C.\n"),
01073 key.c_str(),
01074 current->first.c_str()));
01075 }
01076
01077
01078
01079 this->remap_domains(key, current->first);
01080
01081
01082
01083 return;
01084
01085 } else {
01086 ACE_DEBUG((LM_WARNING,
01087 ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01088 ACE_TEXT("repository %C was not available to replace %C, ")
01089 ACE_TEXT("looking for another.\n"),
01090 current->first.c_str(),
01091 key.c_str()));
01092 }
01093
01094
01095 ++current;
01096 }
01097
01098
01099
01100 ACE_ASSERT(recoveryFailedTime == ACE_Time_Value::zero);
01101 }
01102
01103 void
01104 Service_Participant::set_default_discovery(const Discovery::RepoKey& key)
01105 {
01106 this->defaultDiscovery_ = key;
01107 }
01108
01109 Discovery::RepoKey
01110 Service_Participant::get_default_discovery()
01111 {
01112 return this->defaultDiscovery_;
01113 }
01114
01115 Discovery_rch
01116 Service_Participant::get_discovery(const DDS::DomainId_t domain)
01117 {
01118
01119
01120 Discovery::RepoKey repo = defaultDiscovery_;
01121
01122
01123
01124 DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
01125 if (where != this->domainRepoMap_.end()) {
01126 repo = where->second;
01127 }
01128
01129 RepoKeyDiscoveryMap::const_iterator location = this->discoveryMap_.find(repo);
01130
01131 if (location == this->discoveryMap_.end()) {
01132 if ((repo == Discovery::DEFAULT_REPO) ||
01133 (repo == "-1")) {
01134
01135
01136 bool ok = this->set_repo_ior(DEFAULT_REPO_IOR, Discovery::DEFAULT_REPO);
01137
01138 if (!ok) {
01139 if (DCPS_debug_level > 0) {
01140 ACE_DEBUG((LM_DEBUG,
01141 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01142 ACE_TEXT("failed attempt to set default IOR for domain %d.\n"),
01143 domain));
01144 }
01145
01146 } else {
01147
01148 if (DCPS_debug_level > 4) {
01149 ACE_DEBUG((LM_DEBUG,
01150 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01151 ACE_TEXT("returning default repository for domain %d.\n"),
01152 domain));
01153 }
01154
01155 }
01156 return this->discoveryMap_[Discovery::DEFAULT_REPO];
01157
01158 } else if (repo == Discovery::DEFAULT_RTPS) {
01159
01160 ACE_Configuration_Heap cf;
01161 cf.open();
01162 ACE_Configuration_Section_Key k;
01163 cf.open_section(cf.root_section(), RTPS_SECTION_NAME, 1 , k);
01164 this->load_discovery_configuration(cf, RTPS_SECTION_NAME);
01165
01166
01167 location = this->discoveryMap_.find(Discovery::DEFAULT_RTPS);
01168
01169 if (location == this->discoveryMap_.end()) {
01170
01171 if (DCPS_debug_level > 0) {
01172 ACE_DEBUG((LM_DEBUG,
01173 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01174 ACE_TEXT("failed attempt to set default RTPS discovery for domain %d.\n"),
01175 domain));
01176 }
01177
01178 return Discovery_rch();
01179
01180 } else {
01181
01182 if (DCPS_debug_level > 4) {
01183 ACE_DEBUG((LM_DEBUG,
01184 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01185 ACE_TEXT("returning default RTPS discovery for domain %d.\n"),
01186 domain));
01187 }
01188
01189 return location->second;
01190 }
01191
01192 } else {
01193
01194 if (DCPS_debug_level > 4) {
01195 ACE_DEBUG((LM_DEBUG,
01196 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01197 ACE_TEXT("repository for domain %d was not set.\n"),
01198 domain));
01199 }
01200
01201 return Discovery_rch();
01202 }
01203 }
01204
01205 if (DCPS_debug_level > 4) {
01206 ACE_DEBUG((LM_DEBUG,
01207 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01208 ACE_TEXT("returning repository for domain %d, repo %C.\n"),
01209 domain, repo.c_str()));
01210 }
01211
01212 return location->second;
01213 }
01214
01215 OPENDDS_STRING
01216 Service_Participant::bit_transport_ip() const
01217 {
01218 return ACE_TEXT_ALWAYS_CHAR(this->bit_transport_ip_.c_str());
01219 }
01220
01221 int
01222 Service_Participant::bit_transport_port() const
01223 {
01224 return this->bit_transport_port_;
01225 }
01226
01227 void
01228 Service_Participant::bit_transport_port(int port)
01229 {
01230 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01231 this->bit_transport_port_ = port;
01232 got_bit_transport_port = true;
01233 }
01234
01235 int
01236 Service_Participant::bit_lookup_duration_msec() const
01237 {
01238 return bit_lookup_duration_msec_;
01239 }
01240
01241 void
01242 Service_Participant::bit_lookup_duration_msec(int sec)
01243 {
01244 bit_lookup_duration_msec_ = sec;
01245 got_bit_lookup_duration_msec = true;
01246 }
01247
01248 size_t
01249 Service_Participant::n_chunks() const
01250 {
01251 return n_chunks_;
01252 }
01253
01254 void
01255 Service_Participant::n_chunks(size_t chunks)
01256 {
01257 n_chunks_ = chunks;
01258 got_chunks = true;
01259 }
01260
01261 size_t
01262 Service_Participant::association_chunk_multiplier() const
01263 {
01264 return association_chunk_multiplier_;
01265 }
01266
01267 void
01268 Service_Participant::association_chunk_multiplier(size_t multiplier)
01269 {
01270 association_chunk_multiplier_ = multiplier;
01271 got_chunk_association_multiplier = true;
01272 }
01273
01274 void
01275 Service_Participant::liveliness_factor(int factor)
01276 {
01277 liveliness_factor_ = factor;
01278 got_liveliness_factor = true;
01279 }
01280
01281 int
01282 Service_Participant::liveliness_factor() const
01283 {
01284 return liveliness_factor_;
01285 }
01286
01287 void
01288 Service_Participant::register_discovery_type(const char* section_name,
01289 Discovery::Config* cfg)
01290 {
01291 discovery_types_[section_name].reset(cfg);
01292 }
01293
01294 int
01295 Service_Participant::load_configuration()
01296 {
01297 int status = 0;
01298
01299 if ((status = this->cf_.open()) != 0)
01300 ACE_ERROR_RETURN((LM_ERROR,
01301 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01302 ACE_TEXT("open() returned %d\n"),
01303 status),
01304 -1);
01305
01306 ACE_Ini_ImpExp import(this->cf_);
01307 status = import.import_config(config_fname.c_str());
01308
01309 if (status != 0) {
01310 ACE_ERROR_RETURN((LM_ERROR,
01311 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01312 ACE_TEXT("import_config () returned %d\n"),
01313 status),
01314 -1);
01315 } else {
01316 status = this->load_configuration(this->cf_, config_fname.c_str());
01317 }
01318 return status;
01319 }
01320
01321 int
01322 Service_Participant::load_configuration(
01323 ACE_Configuration_Heap& config,
01324 const ACE_TCHAR* filename)
01325 {
01326 int status = 0;
01327
01328 status = this->load_common_configuration(config, filename);
01329
01330 if (status != 0) {
01331 ACE_ERROR_RETURN((LM_ERROR,
01332 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01333 ACE_TEXT("load_common_configuration () returned %d\n"),
01334 status),
01335 -1);
01336 }
01337
01338
01339 this->add_discovery(static_rchandle_cast<Discovery>(StaticDiscovery::instance()));
01340
01341 status = this->load_discovery_configuration(config, RTPS_SECTION_NAME);
01342
01343 if (status != 0) {
01344 ACE_ERROR_RETURN((LM_ERROR,
01345 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01346 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01347 status),
01348 -1);
01349 }
01350
01351 status = this->load_discovery_configuration(config, REPO_SECTION_NAME);
01352
01353 if (status != 0) {
01354 ACE_ERROR_RETURN((LM_ERROR,
01355 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01356 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01357 status),
01358 -1);
01359 }
01360
01361 status = TransportRegistry::instance()->load_transport_configuration(
01362 ACE_TEXT_ALWAYS_CHAR(filename), config);
01363 if (this->global_transport_config_ != ACE_TEXT("")) {
01364 TransportConfig_rch config = TransportRegistry::instance()->get_config(
01365 ACE_TEXT_ALWAYS_CHAR(this->global_transport_config_.c_str()));
01366 if (!config) {
01367 ACE_ERROR_RETURN((LM_ERROR,
01368 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01369 ACE_TEXT("Unable to locate specified global transport config: %s\n"),
01370 this->global_transport_config_.c_str()),
01371 -1);
01372 }
01373 TransportRegistry::instance()->global_config(config);
01374 }
01375
01376 if (status != 0) {
01377 ACE_ERROR_RETURN((LM_ERROR,
01378 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01379 ACE_TEXT("load_transport_configuration () returned %d\n"),
01380 status),
01381 -1);
01382 }
01383
01384
01385
01386
01387
01388 status = this->load_domain_configuration(config, filename);
01389
01390 if (status != 0) {
01391 ACE_ERROR_RETURN((LM_ERROR,
01392 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01393 ACE_TEXT("load_domain_configuration () returned %d\n"),
01394 status),
01395 -1);
01396 }
01397
01398
01399 try {
01400 status = StaticDiscovery::instance()->load_configuration(config);
01401
01402 if (status != 0) {
01403 ACE_ERROR_RETURN((LM_ERROR,
01404 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01405 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01406 status),
01407 -1);
01408 }
01409 } catch (const CORBA::BAD_PARAM& ex) {
01410 ex._tao_print_exception("Exception caught in Service_Participant::load_configuration: "
01411 "trying to load_discovery_configuration()");
01412 return -1;
01413 }
01414
01415 return 0;
01416 }
01417
01418 int
01419 Service_Participant::load_common_configuration(ACE_Configuration_Heap& cf,
01420 const ACE_TCHAR* filename)
01421 {
01422 const ACE_Configuration_Section_Key &root = cf.root_section();
01423 ACE_Configuration_Section_Key sect;
01424
01425 if (cf.open_section(root, COMMON_SECTION_NAME, 0, sect) != 0) {
01426 if (DCPS_debug_level > 0) {
01427
01428
01429 ACE_DEBUG((LM_NOTICE,
01430 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_common_configuration ")
01431 ACE_TEXT("failed to open section %s\n"),
01432 COMMON_SECTION_NAME));
01433 }
01434
01435 return 0;
01436
01437 } else {
01438 if (got_debug_level) {
01439 ACE_DEBUG((LM_NOTICE,
01440 ACE_TEXT("(%P|%t) NOTICE: using DCPSDebugLevel value from command option (overrides value if it's in config file).\n")));
01441 } else {
01442 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSDebugLevel"), DCPS_debug_level, int)
01443 }
01444
01445 if (got_info) {
01446 ACE_DEBUG((LM_NOTICE,
01447 ACE_TEXT("(%P|%t) NOTICE: using DCPSInfoRepo value from command option (overrides value if it's in config file).\n")));
01448 } else {
01449 ACE_TString value;
01450 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSInfoRepo"), value)
01451 if (!value.empty()) {
01452 this->set_repo_ior(value.c_str(), Discovery::DEFAULT_REPO);
01453 }
01454 }
01455
01456 if (got_use_rti_serialization) {
01457 ACE_DEBUG((LM_NOTICE,
01458 ACE_TEXT("(%P|%t) NOTICE: using DCPSRTISerialization value from command option (overrides value if it's in config file).\n")));
01459 } else {
01460 bool should_use = false;
01461 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSRTISerialization"), should_use, bool)
01462 Serializer::set_use_rti_serialization(should_use);
01463 }
01464
01465 if (got_chunks) {
01466 ACE_DEBUG((LM_NOTICE,
01467 ACE_TEXT("(%P|%t) NOTICE: using DCPSChunks value from command option (overrides value if it's in config file).\n")));
01468 } else {
01469 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunks"), this->n_chunks_, size_t)
01470 }
01471
01472 if (got_chunk_association_multiplier) {
01473 ACE_DEBUG((LM_NOTICE,
01474 ACE_TEXT("(%P|%t) NOTICE: using DCPSChunkAssociationMutltiplier value from command option (overrides value if it's in config file).\n")));
01475 } else {
01476 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMutltiplier"), this->association_chunk_multiplier_, size_t)
01477 }
01478
01479 if (got_bit_transport_port) {
01480 ACE_DEBUG((LM_NOTICE,
01481 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportPort value from command option (overrides value if it's in config file).\n")));
01482 } else {
01483 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportPort"), this->bit_transport_port_, int)
01484 }
01485
01486 if (got_bit_transport_ip) {
01487 ACE_DEBUG((LM_DEBUG,
01488 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportIPAddress value from command option (overrides value if it's in config file).\n")));
01489 } else {
01490 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportIPAddress"), this->bit_transport_ip_)
01491 }
01492
01493 if (got_liveliness_factor) {
01494 ACE_DEBUG((LM_DEBUG,
01495 ACE_TEXT("(%P|%t) NOTICE: using DCPSLivelinessFactor value from command option (overrides value if it's in config file).\n")));
01496 } else {
01497 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLivelinessFactor"), this->liveliness_factor_, int)
01498 }
01499
01500 if (got_bit_lookup_duration_msec) {
01501 ACE_DEBUG((LM_DEBUG,
01502 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitLookupDurationMsec value from command option (overrides value if it's in config file).\n")));
01503 } else {
01504 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitLookupDurationMsec"), this->bit_lookup_duration_msec_, int)
01505 }
01506
01507 if (got_global_transport_config) {
01508 ACE_DEBUG((LM_DEBUG,
01509 ACE_TEXT("(%P|%t) NOTICE: using DCPSGlobalTransportConfig value from command option (overrides value if it's in config file).\n")));
01510 } else {
01511 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSGlobalTransportConfig"), this->global_transport_config_);
01512 if (this->global_transport_config_ == ACE_TEXT("$file")) {
01513
01514 this->global_transport_config_ = filename;
01515 }
01516 }
01517
01518 if (got_bit_flag) {
01519 ACE_DEBUG((LM_DEBUG,
01520 ACE_TEXT("(%P|%t) NOTICE: using DCPSBit value from command option (overrides value if it's in config file).\n")));
01521 } else {
01522 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBit"), this->bit_enabled_, int)
01523 }
01524
01525 #if defined(OPENDDS_SECURITY)
01526 if (got_security_flag) {
01527 ACE_DEBUG((LM_DEBUG,
01528 ACE_TEXT("(%P|%t) NOTICE: using DCPSSecurity value from command option (overrides value if it's in config file).\n")));
01529 } else {
01530 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSSecurity"), this->security_enabled_, int)
01531 }
01532 #endif
01533
01534 if (got_transport_debug_level) {
01535 ACE_DEBUG((LM_NOTICE,
01536 ACE_TEXT("(%P|%t) NOTICE: using DCPSTransportDebugLevel value from command option (overrides value if it's in config file).\n")));
01537 } else {
01538 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSTransportDebugLevel"), OpenDDS::DCPS::Transport_debug_level, int)
01539 }
01540
01541 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01542 if (got_persistent_data_dir) {
01543 ACE_DEBUG((LM_NOTICE,
01544 ACE_TEXT("(%P|%t) NOTICE: using DCPSPersistentDataDir value from command option (overrides value if it's in config file).\n")));
01545 } else {
01546 ACE_TString value;
01547 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSPersistentDataDir"), value)
01548 this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(value.c_str());
01549 }
01550 #endif
01551
01552 if (got_pending_timeout) {
01553 ACE_DEBUG((LM_NOTICE,
01554 ACE_TEXT("(%P|%t) NOTICE: using DCPSPendingTimeout value from command option (overrides value if it's in config file).\n")));
01555 } else {
01556 int timeout = 0;
01557 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPendingTimeout"), timeout, int)
01558 this->pending_timeout_ = timeout;
01559 }
01560
01561 if (got_publisher_content_filter) {
01562 ACE_DEBUG((LM_DEBUG,
01563 ACE_TEXT("(%P|%t) NOTICE: using DCPSPublisherContentFilter ")
01564 ACE_TEXT("value from command option (overrides value if it's ")
01565 ACE_TEXT("in config file).\n")));
01566 } else {
01567 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPublisherContentFilter"),
01568 this->publisher_content_filter_, bool)
01569 }
01570
01571 if (got_default_discovery) {
01572 ACE_Configuration::VALUETYPE type;
01573 if (cf.find_value(sect, ACE_TEXT("DCPSDefaultDiscovery"), type) != -1) {
01574 ACE_DEBUG((LM_NOTICE,
01575 ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultDiscovery value ")
01576 ACE_TEXT("from command option, overriding config file\n")));
01577 }
01578 } else {
01579 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultDiscovery"),
01580 this->defaultDiscovery_);
01581 }
01582
01583 if (got_bidir_giop) {
01584 ACE_Configuration::VALUETYPE type;
01585 if (cf.find_value(sect, ACE_TEXT("DCPSBidirGIOP"), type) != -1) {
01586 ACE_DEBUG((LM_NOTICE,
01587 ACE_TEXT("(%P|%t) NOTICE: using DCPSBidirGIOP value ")
01588 ACE_TEXT("from command option, overriding config file\n")));
01589 }
01590 } else {
01591 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBidirGIOP"), bidir_giop_, bool)
01592 }
01593
01594 ACE_Configuration::VALUETYPE type;
01595 if (got_log_fname) {
01596 if (cf.find_value(sect, ACE_TEXT("ORBLogFile"), type) != -1) {
01597 ACE_DEBUG((LM_NOTICE,
01598 ACE_TEXT("(%P|%t) NOTICE: using ORBLogFile value ")
01599 ACE_TEXT("from command option, overriding config file\n")));
01600 }
01601 } else {
01602 OPENDDS_STRING log_fname;
01603 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("ORBLogFile"), log_fname);
01604 if (!log_fname.empty()) {
01605 set_log_file_name(log_fname.c_str());
01606 }
01607 }
01608
01609 if (got_log_verbose) {
01610 if (cf.find_value(sect, ACE_TEXT("ORBVerboseLogging"), type) != -1) {
01611 ACE_DEBUG((LM_NOTICE,
01612 ACE_TEXT("(%P|%t) NOTICE: using ORBVerboseLogging value ")
01613 ACE_TEXT("from command option, overriding config file\n")));
01614 }
01615 } else {
01616 unsigned long verbose_logging = 0;
01617 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ORBVerboseLogging"), verbose_logging, unsigned long);
01618 set_log_verbose(verbose_logging);
01619 }
01620
01621 if (got_default_address) {
01622 ACE_DEBUG((LM_DEBUG,
01623 ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultAddress value from command option (overrides value if it's in config file).\n")));
01624 } else {
01625 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultAddress"), this->default_address_)
01626 }
01627
01628 if (got_monitor) {
01629 ACE_DEBUG((LM_DEBUG,
01630 ACE_TEXT("(%P|%t) NOTICE: using DCPSMonitor value from command option (overrides value if it's in config file).\n")));
01631 } else {
01632 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSMonitor"), monitor_enabled_, bool)
01633 }
01634
01635
01636 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationRecoveryDuration"), this->federation_recovery_duration_, int)
01637 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationInitialBackoffSeconds"), this->federation_initial_backoff_seconds_, int)
01638 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationBackoffMultiplier"), this->federation_backoff_multiplier_, int)
01639 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationLivelinessDuration"), this->federation_liveliness_, int)
01640
01641 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01642 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_size"), pool_size_, size_t)
01643 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_granularity"), pool_granularity_, size_t)
01644 #endif
01645
01646
01647
01648
01649 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("scheduler"), this->schedulerString_)
01650
01651 suseconds_t usec(0);
01652
01653 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("scheduler_slice"), usec, suseconds_t)
01654
01655 if (usec > 0)
01656 this->schedulerQuantum_.usec(usec);
01657 }
01658
01659 return 0;
01660 }
01661
01662 int
01663 Service_Participant::load_domain_configuration(ACE_Configuration_Heap& cf,
01664 const ACE_TCHAR* filename)
01665 {
01666 const ACE_Configuration_Section_Key& root = cf.root_section();
01667 ACE_Configuration_Section_Key domain_sect;
01668
01669 if (cf.open_section(root, DOMAIN_SECTION_NAME, 0, domain_sect) != 0) {
01670 if (DCPS_debug_level > 0) {
01671
01672
01673 ACE_DEBUG((LM_NOTICE,
01674 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_configuration ")
01675 ACE_TEXT("failed to open [%s] section - using code default.\n"),
01676 DOMAIN_SECTION_NAME));
01677 }
01678
01679 return 0;
01680
01681 } else {
01682
01683 ValueMap vm;
01684 if (pullValues(cf, domain_sect, vm) > 0) {
01685
01686 ACE_ERROR_RETURN((LM_ERROR,
01687 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01688 ACE_TEXT("domain sections must have a subsection name\n")),
01689 -1);
01690 }
01691
01692 KeyList keys;
01693 if (processSections(cf, domain_sect, keys) != 0) {
01694 ACE_ERROR_RETURN((LM_ERROR,
01695 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01696 ACE_TEXT("too many nesting layers in the [domain] section.\n")),
01697 -1);
01698 }
01699
01700
01701 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01702 OPENDDS_STRING domain_name = it->first;
01703
01704 ValueMap values;
01705 pullValues(cf, it->second, values);
01706 DDS::DomainId_t domainId = -1;
01707 Discovery::RepoKey repoKey;
01708 OPENDDS_STRING perDomainDefaultTportConfig;
01709 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01710 OPENDDS_STRING name = it->first;
01711 if (name == "DomainId") {
01712 OPENDDS_STRING value = it->second;
01713 if (!convertToInteger(value, domainId)) {
01714 ACE_ERROR_RETURN((LM_ERROR,
01715 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01716 ACE_TEXT("Illegal integer value for DomainId (%C) in [domain/%C] section.\n"),
01717 value.c_str(), domain_name.c_str()),
01718 -1);
01719 }
01720 if (DCPS_debug_level > 0) {
01721 ACE_DEBUG((LM_DEBUG,
01722 ACE_TEXT("(%P|%t) [domain/%C]: DomainId == %d\n"),
01723 domain_name.c_str(), domainId));
01724 }
01725 } else if (name == "DomainRepoKey") {
01726
01727
01728 repoKey = it->second;
01729 if (repoKey == "-1") {
01730 repoKey = Discovery::DEFAULT_REPO;
01731 }
01732
01733 if (DCPS_debug_level > 0) {
01734 ACE_DEBUG((LM_DEBUG,
01735 ACE_TEXT("(%P|%t) [domain/%C]: DomainRepoKey == %C\n"),
01736 domain_name.c_str(), repoKey.c_str()));
01737 }
01738 } else if (name == "DiscoveryConfig") {
01739 repoKey = it->second;
01740
01741 } else if (name == "DefaultTransportConfig") {
01742 if (it->second == "$file") {
01743
01744 perDomainDefaultTportConfig = ACE_TEXT_ALWAYS_CHAR(filename);
01745
01746 } else {
01747 perDomainDefaultTportConfig = it->second;
01748 }
01749
01750 } else {
01751 ACE_ERROR_RETURN((LM_ERROR,
01752 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01753 ACE_TEXT("Unexpected entry (%C) in [domain/%C] section.\n"),
01754 name.c_str(), domain_name.c_str()),
01755 -1);
01756 }
01757 }
01758
01759 if (domainId == -1) {
01760
01761 if (!convertToInteger(domain_name, domainId)) {
01762 ACE_ERROR_RETURN((LM_ERROR,
01763 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01764 ACE_TEXT("Missing DomainId value in [domain/%C] section.\n"),
01765 domain_name.c_str()),
01766 -1);
01767 }
01768 }
01769
01770 if (!perDomainDefaultTportConfig.empty()) {
01771 TransportRegistry* const reg = TransportRegistry::instance();
01772 TransportConfig_rch tc = reg->get_config(perDomainDefaultTportConfig);
01773 if (tc.is_nil()) {
01774 ACE_ERROR_RETURN((LM_ERROR,
01775 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01776 ACE_TEXT("Unknown transport config %C in [domain/%C] section.\n"),
01777 perDomainDefaultTportConfig.c_str(), domain_name.c_str()), -1);
01778 } else {
01779 reg->domain_default_config(domainId, tc);
01780 }
01781 }
01782
01783
01784 if (!repoKey.empty()) {
01785 if ((repoKey != Discovery::DEFAULT_REPO) &&
01786 (repoKey != Discovery::DEFAULT_RTPS) &&
01787 (repoKey != Discovery::DEFAULT_STATIC) &&
01788 (this->discoveryMap_.find(repoKey) == this->discoveryMap_.end())) {
01789 ACE_ERROR_RETURN((LM_ERROR,
01790 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01791 ACE_TEXT("Specified configuration (%C) not found. Referenced in [domain/%C] section.\n"),
01792 repoKey.c_str(), domain_name.c_str()),
01793 -1);
01794 }
01795 this->set_repo_domain(domainId, repoKey);
01796 }
01797 }
01798 }
01799
01800 return 0;
01801 }
01802
01803 int
01804 Service_Participant::load_discovery_configuration(ACE_Configuration_Heap& cf,
01805 const ACE_TCHAR* section_name)
01806 {
01807 const ACE_Configuration_Section_Key &root = cf.root_section();
01808 ACE_Configuration_Section_Key sect;
01809 if (cf.open_section(root, section_name, 0, sect) == 0) {
01810
01811 const OPENDDS_STRING sect_name = ACE_TEXT_ALWAYS_CHAR(section_name);
01812 DiscoveryTypes::iterator iter =
01813 this->discovery_types_.find(sect_name);
01814
01815 if (iter == this->discovery_types_.end()) {
01816
01817 TheTransportRegistry->load_transport_lib(sect_name);
01818 iter = this->discovery_types_.find(sect_name);
01819 }
01820
01821 if (iter != this->discovery_types_.end()) {
01822
01823 return iter->second->discovery_config(cf);
01824 } else {
01825
01826 ACE_ERROR_RETURN((LM_ERROR,
01827 ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
01828 ACE_TEXT("load_discovery_configuration ")
01829 ACE_TEXT("Unable to load libraries for %s\n"),
01830 section_name),
01831 -1);
01832 }
01833 }
01834 return 0;
01835 }
01836
01837 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01838 void
01839 Service_Participant::configure_pool()
01840 {
01841 if (pool_size_) {
01842 SafetyProfilePool::instance()->configure_pool(pool_size_, pool_granularity_);
01843 SafetyProfilePool::instance()->install();
01844 }
01845 }
01846 #endif
01847
01848 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01849 DataDurabilityCache *
01850 Service_Participant::get_data_durability_cache(
01851 DDS::DurabilityQosPolicy const & durability)
01852 {
01853 DDS::DurabilityQosPolicyKind const kind =
01854 durability.kind;
01855
01856 DataDurabilityCache * cache = 0;
01857
01858 if (kind == DDS::TRANSIENT_DURABILITY_QOS) {
01859 {
01860 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01861 guard,
01862 this->factory_lock_,
01863 0);
01864
01865 if (!this->transient_data_cache_) {
01866 this->transient_data_cache_.reset(new DataDurabilityCache(kind));
01867 }
01868 }
01869
01870 cache = this->transient_data_cache_.get();
01871
01872 } else if (kind == DDS::PERSISTENT_DURABILITY_QOS) {
01873 {
01874 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01875 guard,
01876 this->factory_lock_,
01877 0);
01878
01879 try {
01880 if (!this->persistent_data_cache_) {
01881 this->persistent_data_cache_.reset(new DataDurabilityCache(kind,
01882 this->persistent_data_dir_));
01883 }
01884
01885 } catch (const std::exception& ex) {
01886 if (DCPS_debug_level > 0) {
01887 ACE_ERROR((LM_WARNING,
01888 ACE_TEXT("(%P|%t) WARNING: Service_Participant::get_data_durability_cache ")
01889 ACE_TEXT("failed to create PERSISTENT cache, falling back on ")
01890 ACE_TEXT("TRANSIENT behavior: %C\n"), ex.what()));
01891 }
01892
01893 this->persistent_data_cache_.reset(new DataDurabilityCache(DDS::TRANSIENT_DURABILITY_QOS));
01894 }
01895 }
01896
01897 cache = this->persistent_data_cache_.get();
01898 }
01899
01900 return cache;
01901 }
01902 #endif
01903
01904 void
01905 Service_Participant::add_discovery(Discovery_rch discovery)
01906 {
01907 if (discovery) {
01908 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01909 this->discoveryMap_[discovery->key()] = discovery;
01910 }
01911 }
01912
01913 const Service_Participant::RepoKeyDiscoveryMap&
01914 Service_Participant::discoveryMap() const
01915 {
01916 return this->discoveryMap_;
01917 }
01918
01919 const Service_Participant::DomainRepoMap&
01920 Service_Participant::domainRepoMap() const
01921 {
01922 return this->domainRepoMap_;
01923 }
01924
01925 Recorder_ptr
01926 Service_Participant::create_recorder(DDS::DomainParticipant_ptr participant,
01927 DDS::Topic_ptr a_topic,
01928 const DDS::SubscriberQos& subscriber_qos,
01929 const DDS::DataReaderQos& datareader_qos,
01930 const RecorderListener_rch& a_listener)
01931 {
01932 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01933 if (participant_servant)
01934 return participant_servant->create_recorder(a_topic, subscriber_qos, datareader_qos, a_listener, 0);
01935 return 0;
01936 }
01937
01938 DDS::ReturnCode_t
01939 Service_Participant::delete_recorder(Recorder_ptr recorder)
01940 {
01941 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01942 RecorderImpl* impl = dynamic_cast<RecorderImpl*>(recorder);
01943 if (impl){
01944 ret = impl->cleanup();
01945 impl->participant()->delete_recorder(recorder);
01946 }
01947 return ret;
01948 }
01949
01950 Replayer_ptr
01951 Service_Participant::create_replayer(DDS::DomainParticipant_ptr participant,
01952 DDS::Topic_ptr a_topic,
01953 const DDS::PublisherQos& publisher_qos,
01954 const DDS::DataWriterQos& datawriter_qos,
01955 const ReplayerListener_rch& a_listener)
01956 {
01957 ACE_DEBUG((LM_DEBUG, "Service_Participant::create_replayer\n"));
01958 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01959 if (participant_servant)
01960 return participant_servant->create_replayer(a_topic, publisher_qos, datawriter_qos, a_listener, 0);
01961 return 0;
01962 }
01963
01964 DDS::ReturnCode_t
01965 Service_Participant::delete_replayer(Replayer_ptr replayer)
01966 {
01967 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01968 ReplayerImpl* impl = static_cast<ReplayerImpl*>(replayer);
01969 if (impl) {
01970 ret = impl->cleanup();
01971 impl->participant()->delete_replayer(replayer);
01972 }
01973 return ret;
01974 }
01975
01976 DDS::Topic_ptr
01977 Service_Participant::create_typeless_topic(DDS::DomainParticipant_ptr participant,
01978 const char * topic_name,
01979 const char * type_name,
01980 bool type_has_keys,
01981 const DDS::TopicQos & qos,
01982 DDS::TopicListener_ptr a_listener,
01983 DDS::StatusMask mask)
01984 {
01985 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01986 if (! participant_servant) {
01987 return 0;
01988 }
01989 return participant_servant->create_typeless_topic(topic_name, type_name, type_has_keys, qos, a_listener, mask);
01990 }
01991
01992 }
01993 }
01994
01995 OPENDDS_END_VERSIONED_NAMESPACE_DECL