00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "debug.h"
00010 #include "Service_Participant.h"
00011 #include "BuiltInTopicUtils.h"
00012 #include "DataDurabilityCache.h"
00013 #include "GuidConverter.h"
00014 #include "MonitorFactory.h"
00015 #include "ConfigUtils.h"
00016
00017 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00018
00019 #include "tao/ORB_Core.h"
00020
00021 #include "ace/Singleton.h"
00022 #include "ace/Arg_Shifter.h"
00023 #include "ace/Reactor.h"
00024 #include "ace/Select_Reactor.h"
00025 #include "ace/Configuration_Import_Export.h"
00026 #include "ace/Service_Config.h"
00027 #include "ace/Argv_Type_Converter.h"
00028 #include "ace/Auto_Ptr.h"
00029 #include "ace/Sched_Params.h"
00030 #include "ace/Malloc_Allocator.h"
00031
00032 #include "RecorderImpl.h"
00033 #include "ReplayerImpl.h"
00034 #include "StaticDiscovery.h"
00035
00036 #ifdef OPENDDS_SAFETY_PROFILE
00037 #include <stdio.h>
00038 #else
00039 #include <fstream>
00040 #endif
00041
00042 #if !defined (__ACE_INLINE__)
00043 #include "Service_Participant.inl"
00044 #endif
00045
00046 namespace {
00047
00048 void set_log_file_name(const char* fname)
00049 {
00050 #ifdef OPENDDS_SAFETY_PROFILE
00051 ACE_LOG_MSG->msg_ostream(fopen(fname, "a"), true);
00052 #else
00053 std::ofstream* output_stream = new std::ofstream(fname, ios::app);
00054 if (output_stream->bad()) {
00055 delete output_stream;
00056 } else {
00057 ACE_LOG_MSG->msg_ostream(output_stream, true);
00058 }
00059 #endif
00060 ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::LOGGER);
00061 ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
00062 }
00063
00064
00065 void set_log_verbose(unsigned long verbose_logging)
00066 {
00067
00068
00069
00070 typedef void (ACE_Log_Msg::*PTMF)(u_long);
00071 PTMF flagop = &ACE_Log_Msg::set_flags;
00072 u_long value;
00073
00074 switch (verbose_logging)
00075 {
00076 case 0:
00077 flagop = &ACE_Log_Msg::clr_flags;
00078 value = ACE_Log_Msg::VERBOSE | ACE_Log_Msg::VERBOSE_LITE;
00079 break;
00080 case 1:
00081 value = ACE_Log_Msg::VERBOSE_LITE; break;
00082 default:
00083 value = ACE_Log_Msg::VERBOSE; break;
00084 }
00085
00086 (ACE_LOG_MSG->*flagop)(value);
00087
00088 }
00089
00090
00091 }
00092
00093 namespace OpenDDS {
00094 namespace DCPS {
00095
00096 int Service_Participant::zero_argc = 0;
00097
00098 const size_t DEFAULT_NUM_CHUNKS = 20;
00099
00100 const size_t DEFAULT_CHUNK_MULTIPLIER = 10;
00101
00102 const int DEFAULT_FEDERATION_RECOVERY_DURATION = 900;
00103 const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS = 1;
00104 const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER = 2;
00105 const int DEFAULT_FEDERATION_LIVELINESS = 60;
00106
00107 const int BIT_LOOKUP_DURATION_MSEC = 2000;
00108
00109 static ACE_TString config_fname(ACE_TEXT(""));
00110
00111 static const ACE_TCHAR DEFAULT_REPO_IOR[] = ACE_TEXT("file://repo.ior");
00112
00113 static const ACE_CString DEFAULT_PERSISTENT_DATA_DIR = "OpenDDS-durable-data-dir";
00114
00115 static const ACE_TCHAR COMMON_SECTION_NAME[] = ACE_TEXT("common");
00116 static const ACE_TCHAR DOMAIN_SECTION_NAME[] = ACE_TEXT("domain");
00117 static const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
00118 static const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00119
00120 static bool got_debug_level = false;
00121 static bool got_use_rti_serialization = false;
00122 static bool got_info = false;
00123 static bool got_chunks = false;
00124 static bool got_chunk_association_multiplier = false;
00125 static bool got_liveliness_factor = false;
00126 static bool got_bit_transport_port = false;
00127 static bool got_bit_transport_ip = false;
00128 static bool got_bit_lookup_duration_msec = false;
00129 static bool got_global_transport_config = false;
00130 static bool got_bit_flag = false;
00131 static bool got_publisher_content_filter = false;
00132 static bool got_transport_debug_level = false;
00133 static bool got_pending_timeout = false;
00134 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00135 static bool got_persistent_data_dir = false;
00136 #endif
00137 static bool got_default_discovery = false;
00138 #ifndef DDS_DEFAULT_DISCOVERY_METHOD
00139 # ifdef OPENDDS_SAFETY_PROFILE
00140 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_RTPS
00141 # else
00142 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_REPO
00143 # endif
00144 #endif
00145 static bool got_log_fname = false;
00146 static bool got_log_verbose = false;
00147 static bool got_default_address = false;
00148
00149 Service_Participant::Service_Participant()
00150 :
00151 #ifndef OPENDDS_SAFETY_PROFILE
00152 ORB_argv_(false ),
00153 #endif
00154 reactor_(0),
00155 dp_factory_servant_(0),
00156 defaultDiscovery_(DDS_DEFAULT_DISCOVERY_METHOD),
00157 n_chunks_(DEFAULT_NUM_CHUNKS),
00158 association_chunk_multiplier_(DEFAULT_CHUNK_MULTIPLIER),
00159 liveliness_factor_(80),
00160 bit_transport_port_(0),
00161 bit_enabled_(
00162 #ifdef DDS_HAS_MINIMUM_BIT
00163 false
00164 #else
00165 true
00166 #endif
00167 ),
00168 bit_lookup_duration_msec_(BIT_LOOKUP_DURATION_MSEC),
00169 global_transport_config_(ACE_TEXT("")),
00170 monitor_factory_(0),
00171 monitor_(0),
00172 federation_recovery_duration_(DEFAULT_FEDERATION_RECOVERY_DURATION),
00173 federation_initial_backoff_seconds_(DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS),
00174 federation_backoff_multiplier_(DEFAULT_FEDERATION_BACKOFF_MULTIPLIER),
00175 federation_liveliness_(DEFAULT_FEDERATION_LIVELINESS),
00176 schedulerQuantum_(ACE_Time_Value::zero),
00177 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00178 pool_size_(1024*1024*16),
00179 pool_granularity_(8),
00180 #endif
00181 scheduler_(-1),
00182 priority_min_(0),
00183 priority_max_(0),
00184 publisher_content_filter_(true),
00185 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00186 transient_data_cache_(),
00187 persistent_data_cache_(),
00188 persistent_data_dir_(DEFAULT_PERSISTENT_DATA_DIR),
00189 #endif
00190 pending_timeout_(ACE_Time_Value::zero),
00191 shut_down_(false)
00192 {
00193 initialize();
00194 }
00195
00196 Service_Participant::~Service_Participant()
00197 {
00198 shutdown();
00199 ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00200 typedef OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter_t;
00201 for (iter_t it = discovery_types_.begin(); it != discovery_types_.end(); ++it) {
00202 delete it->second;
00203 }
00204 delete monitor_;
00205 delete reactor_;
00206
00207 if (DCPS_debug_level > 0) {
00208 ACE_DEBUG((LM_DEBUG,
00209 "%T (%P|%t) Service_Participant::~Service_Participant()\n"));
00210 }
00211 }
00212
00213 Service_Participant*
00214 Service_Participant::instance()
00215 {
00216
00217
00218
00219 return ACE_Singleton<Service_Participant, ACE_SYNCH_MUTEX>::instance();
00220 }
00221
00222 int
00223 Service_Participant::ReactorTask::svc()
00224 {
00225 Service_Participant* sp = instance();
00226 sp->reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
00227 sp->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00228 this->wait_for_startup();
00229 sp->reactor_->run_reactor_event_loop();
00230 return 0;
00231 }
00232
00233 ACE_Reactor_Timer_Interface*
00234 Service_Participant::timer() const
00235 {
00236 return reactor_;
00237 }
00238
00239 ACE_Reactor*
00240 Service_Participant::reactor() const
00241 {
00242 return reactor_;
00243 }
00244
00245 ACE_thread_t
00246 Service_Participant::reactor_owner() const
00247 {
00248 return reactor_owner_;
00249 }
00250
00251 void
00252 Service_Participant::shutdown()
00253 {
00254 shut_down_ = true;
00255 try {
00256 TransportRegistry::instance()->release();
00257 ACE_GUARD(TAO_SYNCH_MUTEX, guard, this->factory_lock_);
00258
00259 domainRepoMap_.clear();
00260 discoveryMap_.clear();
00261
00262 if (0 != reactor_) {
00263 reactor_->end_reactor_event_loop();
00264 reactor_task_.wait();
00265 }
00266
00267 dp_factory_ = DDS::DomainParticipantFactory::_nil();
00268
00269 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00270 transient_data_cache_.reset();
00271 persistent_data_cache_.reset();
00272 #endif
00273
00274 typedef OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter;
00275 for (iter i = discovery_types_.begin(); i != discovery_types_.end(); ++i) {
00276 delete i->second;
00277 }
00278 discovery_types_.clear();
00279 } catch (const CORBA::Exception& ex) {
00280 ex._tao_print_exception("ERROR: Service_Participant::shutdown");
00281 }
00282 }
00283
00284 #ifdef ACE_USES_WCHAR
00285 DDS::DomainParticipantFactory_ptr
00286 Service_Participant::get_domain_participant_factory(int &argc,
00287 char *argv[])
00288 {
00289 ACE_Argv_Type_Converter converter(argc, argv);
00290 return get_domain_participant_factory(converter.get_argc(),
00291 converter.get_TCHAR_argv());
00292 }
00293 #endif
00294
00295 DDS::DomainParticipantFactory_ptr
00296 Service_Participant::get_domain_participant_factory(int &argc,
00297 ACE_TCHAR *argv[])
00298 {
00299 if (CORBA::is_nil(dp_factory_.in())) {
00300 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
00301 guard,
00302 this->factory_lock_,
00303 DDS::DomainParticipantFactory::_nil());
00304
00305 if (CORBA::is_nil(dp_factory_.in())) {
00306
00307
00308
00309
00310
00311 #ifndef OPENDDS_SAFETY_PROFILE
00312 ORB_argv_.add(ACE_TEXT("unused_arg_0"));
00313 #endif
00314 ACE_Arg_Shifter shifter(argc, argv);
00315 while (shifter.is_anything_left()) {
00316 if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBLogFile")) == 0) {
00317 shifter.ignore_arg();
00318 } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBVerboseLogging")) == 0) {
00319 shifter.ignore_arg();
00320 } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) < 0) {
00321 shifter.ignore_arg();
00322 } else {
00323 #ifndef OPENDDS_SAFETY_PROFILE
00324 ORB_argv_.add(shifter.get_current());
00325 #endif
00326 shifter.consume_arg();
00327 if (shifter.is_parameter_next()) {
00328 #ifndef OPENDDS_SAFETY_PROFILE
00329 ORB_argv_.add(shifter.get_current(), true );
00330 #endif
00331 shifter.consume_arg();
00332 }
00333 }
00334 }
00335
00336 if (parse_args(argc, argv) != 0) {
00337 return DDS::DomainParticipantFactory::_nil();
00338 }
00339
00340 if (config_fname == ACE_TEXT("")) {
00341 if (DCPS_debug_level) {
00342 ACE_DEBUG((LM_NOTICE,
00343 ACE_TEXT("(%P|%t) NOTICE: not using file configuration - no configuration ")
00344 ACE_TEXT("file specified.\n")));
00345 }
00346
00347 } else {
00348
00349
00350 FILE* in = ACE_OS::fopen(config_fname.c_str(),
00351 ACE_TEXT("r"));
00352
00353 if (!in) {
00354 ACE_DEBUG((LM_WARNING,
00355 ACE_TEXT("(%P|%t) WARNING: not using file configuration - ")
00356 ACE_TEXT("can not open \"%s\" for reading. %p\n"),
00357 config_fname.c_str(), ACE_TEXT("fopen")));
00358
00359 } else {
00360 ACE_OS::fclose(in);
00361
00362 if (this->load_configuration() != 0) {
00363 ACE_ERROR((LM_ERROR,
00364 ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: ")
00365 ACE_TEXT("load_configuration() failed.\n")));
00366 return DDS::DomainParticipantFactory::_nil();
00367 }
00368 }
00369 }
00370
00371 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00372
00373 configure_pool();
00374 #endif
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385 this->initializeScheduling();
00386
00387 ACE_NEW_RETURN(dp_factory_servant_,
00388 DomainParticipantFactoryImpl(),
00389 DDS::DomainParticipantFactory::_nil());
00390
00391 dp_factory_ = dp_factory_servant_;
00392
00393 if (CORBA::is_nil(dp_factory_.in())) {
00394 ACE_ERROR((LM_ERROR,
00395 ACE_TEXT("(%P|%t) ERROR: ")
00396 ACE_TEXT("Service_Participant::get_domain_participant_factory, ")
00397 ACE_TEXT("nil DomainParticipantFactory. \n")));
00398 return DDS::DomainParticipantFactory::_nil();
00399 }
00400
00401 if (!reactor_)
00402 reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
00403
00404 if (reactor_task_.activate(THR_NEW_LWP | THR_JOINABLE) == -1) {
00405 ACE_ERROR((LM_ERROR,
00406 ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
00407 ACE_TEXT("Failed to activate the reactor task.")));
00408 return DDS::DomainParticipantFactory::_nil();
00409 }
00410
00411 reactor_task_.wait_for_startup();
00412
00413 this->monitor_factory_ =
00414 ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor");
00415 if (this->monitor_factory_ == 0) {
00416
00417 this->monitor_factory_ =
00418 ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor_Default");
00419 }
00420 this->monitor_ = this->monitor_factory_->create_sp_monitor(this);
00421 }
00422 }
00423
00424 return DDS::DomainParticipantFactory::_duplicate(dp_factory_.in());
00425 }
00426
00427 int
00428 Service_Participant::parse_args(int &argc, ACE_TCHAR *argv[])
00429 {
00430 ACE_Arg_Shifter arg_shifter(argc, argv);
00431
00432 while (arg_shifter.is_anything_left()) {
00433 const ACE_TCHAR *currentArg = 0;
00434
00435 if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDebugLevel"))) != 0) {
00436 set_DCPS_debug_level(ACE_OS::atoi(currentArg));
00437 arg_shifter.consume_arg();
00438 got_debug_level = true;
00439
00440 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0) {
00441 this->set_repo_ior(currentArg, Discovery::DEFAULT_REPO);
00442 arg_shifter.consume_arg();
00443 got_info = true;
00444
00445 } else if (!arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-DCPSRTISerialization"))) {
00446 Serializer::set_use_rti_serialization(true);
00447 arg_shifter.consume_arg();
00448 got_use_rti_serialization = true;
00449
00450 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunks"))) != 0) {
00451 n_chunks_ = ACE_OS::atoi(currentArg);
00452 arg_shifter.consume_arg();
00453 got_chunks = true;
00454
00455 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunkAssociationMultiplier"))) != 0) {
00456 association_chunk_multiplier_ = ACE_OS::atoi(currentArg);
00457 arg_shifter.consume_arg();
00458 got_chunk_association_multiplier = true;
00459
00460 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSConfigFile"))) != 0) {
00461 config_fname = currentArg;
00462 arg_shifter.consume_arg();
00463
00464 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLivelinessFactor"))) != 0) {
00465 liveliness_factor_ = ACE_OS::atoi(currentArg);
00466 arg_shifter.consume_arg();
00467 got_liveliness_factor = true;
00468
00469 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportPort"))) != 0) {
00470
00471
00472 this->bit_transport_port_ = ACE_OS::atoi(currentArg);
00473 arg_shifter.consume_arg();
00474 got_bit_transport_port = true;
00475
00476 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportIPAddress"))) != 0) {
00477
00478
00479 this->bit_transport_ip_ = currentArg;
00480 arg_shifter.consume_arg();
00481 got_bit_transport_ip = true;
00482
00483 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitLookupDurationMsec"))) != 0) {
00484 bit_lookup_duration_msec_ = ACE_OS::atoi(currentArg);
00485 arg_shifter.consume_arg();
00486 got_bit_lookup_duration_msec = true;
00487
00488 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSGlobalTransportConfig"))) != 0) {
00489 global_transport_config_ = currentArg;
00490 arg_shifter.consume_arg();
00491 got_global_transport_config = true;
00492
00493 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBit"))) != 0) {
00494 bit_enabled_ = ACE_OS::atoi(currentArg);
00495 arg_shifter.consume_arg();
00496 got_bit_flag = true;
00497
00498 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTransportDebugLevel"))) != 0) {
00499 OpenDDS::DCPS::Transport_debug_level = ACE_OS::atoi(currentArg);
00500 arg_shifter.consume_arg();
00501 got_transport_debug_level = true;
00502
00503 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00504 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPersistentDataDir"))) != 0) {
00505 this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00506 arg_shifter.consume_arg();
00507 got_persistent_data_dir = true;
00508 #endif
00509
00510 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPendingTimeout"))) != 0) {
00511 this->pending_timeout_ = ACE_OS::atoi(currentArg);
00512 arg_shifter.consume_arg();
00513 got_pending_timeout = true;
00514
00515 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPublisherContentFilter"))) != 0) {
00516 this->publisher_content_filter_ = ACE_OS::atoi(currentArg);
00517 arg_shifter.consume_arg();
00518 got_publisher_content_filter = true;
00519
00520 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultDiscovery"))) != 0) {
00521 this->defaultDiscovery_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00522 arg_shifter.consume_arg();
00523 got_default_discovery = true;
00524
00525 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationRecoveryDuration"))) != 0) {
00526 this->federation_recovery_duration_ = ACE_OS::atoi(currentArg);
00527 arg_shifter.consume_arg();
00528
00529 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationInitialBackoffSeconds"))) != 0) {
00530 this->federation_initial_backoff_seconds_ = ACE_OS::atoi(currentArg);
00531 arg_shifter.consume_arg();
00532
00533 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationBackoffMultiplier"))) != 0) {
00534 this->federation_backoff_multiplier_ = ACE_OS::atoi(currentArg);
00535 arg_shifter.consume_arg();
00536
00537 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationLivelinessDuration"))) != 0) {
00538 this->federation_liveliness_ = ACE_OS::atoi(currentArg);
00539 arg_shifter.consume_arg();
00540
00541 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBLogFile"))) != 0) {
00542 set_log_file_name(ACE_TEXT_ALWAYS_CHAR(currentArg));
00543 arg_shifter.consume_arg();
00544 got_log_fname = true;
00545
00546 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-ORBVerboseLogging"))) != 0) {
00547 set_log_verbose(ACE_OS::atoi(currentArg));
00548 arg_shifter.consume_arg();
00549 got_log_verbose = true;
00550
00551 } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultAddress"))) != 0) {
00552 this->default_address_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
00553 arg_shifter.consume_arg();
00554 got_default_address = true;
00555
00556 } else {
00557 arg_shifter.ignore_arg();
00558 }
00559 }
00560
00561
00562 return 0;
00563 }
00564
00565 void
00566 Service_Participant::initialize()
00567 {
00568
00569
00570
00571
00572 initial_TransportPriorityQosPolicy_.value = 0;
00573 initial_LifespanQosPolicy_.duration.sec = DDS::DURATION_INFINITE_SEC;
00574 initial_LifespanQosPolicy_.duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00575
00576 initial_DurabilityQosPolicy_.kind = DDS::VOLATILE_DURABILITY_QOS;
00577
00578 initial_DurabilityServiceQosPolicy_.service_cleanup_delay.sec =
00579 DDS::DURATION_ZERO_SEC;
00580 initial_DurabilityServiceQosPolicy_.service_cleanup_delay.nanosec =
00581 DDS::DURATION_ZERO_NSEC;
00582 initial_DurabilityServiceQosPolicy_.history_kind =
00583 DDS::KEEP_LAST_HISTORY_QOS;
00584 initial_DurabilityServiceQosPolicy_.history_depth = 1;
00585 initial_DurabilityServiceQosPolicy_.max_samples =
00586 DDS::LENGTH_UNLIMITED;
00587 initial_DurabilityServiceQosPolicy_.max_instances =
00588 DDS::LENGTH_UNLIMITED;
00589 initial_DurabilityServiceQosPolicy_.max_samples_per_instance =
00590 DDS::LENGTH_UNLIMITED;
00591
00592 initial_PresentationQosPolicy_.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
00593 initial_PresentationQosPolicy_.coherent_access = false;
00594 initial_PresentationQosPolicy_.ordered_access = false;
00595
00596 initial_DeadlineQosPolicy_.period.sec = DDS::DURATION_INFINITE_SEC;
00597 initial_DeadlineQosPolicy_.period.nanosec = DDS::DURATION_INFINITE_NSEC;
00598
00599 initial_LatencyBudgetQosPolicy_.duration.sec = DDS::DURATION_ZERO_SEC;
00600 initial_LatencyBudgetQosPolicy_.duration.nanosec = DDS::DURATION_ZERO_NSEC;
00601
00602 initial_OwnershipQosPolicy_.kind = DDS::SHARED_OWNERSHIP_QOS;
00603 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00604 initial_OwnershipStrengthQosPolicy_.value = 0;
00605 #endif
00606
00607 initial_LivelinessQosPolicy_.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00608 initial_LivelinessQosPolicy_.lease_duration.sec = DDS::DURATION_INFINITE_SEC;
00609 initial_LivelinessQosPolicy_.lease_duration.nanosec = DDS::DURATION_INFINITE_NSEC;
00610
00611 initial_TimeBasedFilterQosPolicy_.minimum_separation.sec = DDS::DURATION_ZERO_SEC;
00612 initial_TimeBasedFilterQosPolicy_.minimum_separation.nanosec = DDS::DURATION_ZERO_NSEC;
00613
00614 initial_ReliabilityQosPolicy_.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00615 initial_ReliabilityQosPolicy_.max_blocking_time.sec = DDS::DURATION_INFINITE_SEC;
00616 initial_ReliabilityQosPolicy_.max_blocking_time.nanosec = DDS::DURATION_INFINITE_NSEC;
00617
00618 initial_DestinationOrderQosPolicy_.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00619
00620 initial_HistoryQosPolicy_.kind = DDS::KEEP_LAST_HISTORY_QOS;
00621 initial_HistoryQosPolicy_.depth = 1;
00622
00623 initial_ResourceLimitsQosPolicy_.max_samples = DDS::LENGTH_UNLIMITED;
00624 initial_ResourceLimitsQosPolicy_.max_instances = DDS::LENGTH_UNLIMITED;
00625 initial_ResourceLimitsQosPolicy_.max_samples_per_instance = DDS::LENGTH_UNLIMITED;
00626
00627 initial_EntityFactoryQosPolicy_.autoenable_created_entities = true;
00628
00629 initial_WriterDataLifecycleQosPolicy_.autodispose_unregistered_instances = true;
00630
00631 initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00632 initial_ReaderDataLifecycleQosPolicy_.autopurge_nowriter_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00633 initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.sec = DDS::DURATION_INFINITE_SEC;
00634 initial_ReaderDataLifecycleQosPolicy_.autopurge_disposed_samples_delay.nanosec = DDS::DURATION_INFINITE_NSEC;
00635
00636 initial_DomainParticipantQos_.user_data = initial_UserDataQosPolicy_;
00637 initial_DomainParticipantQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00638 initial_DomainParticipantFactoryQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00639
00640 initial_TopicQos_.topic_data = initial_TopicDataQosPolicy_;
00641 initial_TopicQos_.durability = initial_DurabilityQosPolicy_;
00642 initial_TopicQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00643 initial_TopicQos_.deadline = initial_DeadlineQosPolicy_;
00644 initial_TopicQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00645 initial_TopicQos_.liveliness = initial_LivelinessQosPolicy_;
00646 initial_TopicQos_.reliability = initial_ReliabilityQosPolicy_;
00647 initial_TopicQos_.destination_order = initial_DestinationOrderQosPolicy_;
00648 initial_TopicQos_.history = initial_HistoryQosPolicy_;
00649 initial_TopicQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00650 initial_TopicQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00651 initial_TopicQos_.lifespan = initial_LifespanQosPolicy_;
00652 initial_TopicQos_.ownership = initial_OwnershipQosPolicy_;
00653
00654 initial_DataWriterQos_.durability = initial_DurabilityQosPolicy_;
00655 initial_DataWriterQos_.durability_service = initial_DurabilityServiceQosPolicy_;
00656 initial_DataWriterQos_.deadline = initial_DeadlineQosPolicy_;
00657 initial_DataWriterQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00658 initial_DataWriterQos_.liveliness = initial_LivelinessQosPolicy_;
00659 initial_DataWriterQos_.reliability = initial_ReliabilityQosPolicy_;
00660 initial_DataWriterQos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00661 initial_DataWriterQos_.reliability.max_blocking_time.sec = 0;
00662 initial_DataWriterQos_.reliability.max_blocking_time.nanosec = 100000000;
00663 initial_DataWriterQos_.destination_order = initial_DestinationOrderQosPolicy_;
00664 initial_DataWriterQos_.history = initial_HistoryQosPolicy_;
00665 initial_DataWriterQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00666 initial_DataWriterQos_.transport_priority = initial_TransportPriorityQosPolicy_;
00667 initial_DataWriterQos_.lifespan = initial_LifespanQosPolicy_;
00668 initial_DataWriterQos_.user_data = initial_UserDataQosPolicy_;
00669 initial_DataWriterQos_.ownership = initial_OwnershipQosPolicy_;
00670 #ifdef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00671 initial_DataWriterQos_.ownership_strength.value = 0;
00672 #else
00673 initial_DataWriterQos_.ownership_strength = initial_OwnershipStrengthQosPolicy_;
00674 #endif
00675 initial_DataWriterQos_.writer_data_lifecycle = initial_WriterDataLifecycleQosPolicy_;
00676
00677 initial_PublisherQos_.presentation = initial_PresentationQosPolicy_;
00678 initial_PublisherQos_.partition = initial_PartitionQosPolicy_;
00679 initial_PublisherQos_.group_data = initial_GroupDataQosPolicy_;
00680 initial_PublisherQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00681
00682 initial_DataReaderQos_.durability = initial_DurabilityQosPolicy_;
00683 initial_DataReaderQos_.deadline = initial_DeadlineQosPolicy_;
00684 initial_DataReaderQos_.latency_budget = initial_LatencyBudgetQosPolicy_;
00685 initial_DataReaderQos_.liveliness = initial_LivelinessQosPolicy_;
00686 initial_DataReaderQos_.reliability = initial_ReliabilityQosPolicy_;
00687 initial_DataReaderQos_.destination_order = initial_DestinationOrderQosPolicy_;
00688 initial_DataReaderQos_.history = initial_HistoryQosPolicy_;
00689 initial_DataReaderQos_.resource_limits = initial_ResourceLimitsQosPolicy_;
00690 initial_DataReaderQos_.user_data = initial_UserDataQosPolicy_;
00691 initial_DataReaderQos_.time_based_filter = initial_TimeBasedFilterQosPolicy_;
00692 initial_DataReaderQos_.ownership = initial_OwnershipQosPolicy_;
00693 initial_DataReaderQos_.reader_data_lifecycle = initial_ReaderDataLifecycleQosPolicy_;
00694
00695 initial_SubscriberQos_.presentation = initial_PresentationQosPolicy_;
00696 initial_SubscriberQos_.partition = initial_PartitionQosPolicy_;
00697 initial_SubscriberQos_.group_data = initial_GroupDataQosPolicy_;
00698 initial_SubscriberQos_.entity_factory = initial_EntityFactoryQosPolicy_;
00699 }
00700
00701 void
00702 Service_Participant::initializeScheduling()
00703 {
00704
00705
00706
00707 if (this->schedulerString_.length() == 0) {
00708 if (DCPS_debug_level > 0) {
00709 ACE_DEBUG((LM_NOTICE,
00710 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::intializeScheduling() - ")
00711 ACE_TEXT("no scheduling policy specified, not setting policy.\n")));
00712 }
00713
00714 } else {
00715
00716
00717
00718 int ace_scheduler = ACE_SCHED_OTHER;
00719 this->scheduler_ = THR_SCHED_DEFAULT;
00720
00721 if (this->schedulerString_ == ACE_TEXT("SCHED_RR")) {
00722 this->scheduler_ = THR_SCHED_RR;
00723 ace_scheduler = ACE_SCHED_RR;
00724
00725 } else if (this->schedulerString_ == ACE_TEXT("SCHED_FIFO")) {
00726 this->scheduler_ = THR_SCHED_FIFO;
00727 ace_scheduler = ACE_SCHED_FIFO;
00728
00729 } else if (this->schedulerString_ == ACE_TEXT("SCHED_OTHER")) {
00730 this->scheduler_ = THR_SCHED_DEFAULT;
00731 ace_scheduler = ACE_SCHED_OTHER;
00732
00733 } else {
00734 ACE_DEBUG((LM_WARNING,
00735 ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00736 ACE_TEXT("unrecognized scheduling policy: %s, set to SCHED_OTHER.\n"),
00737 this->schedulerString_.c_str()));
00738 }
00739
00740
00741
00742
00743 #ifdef ACE_WIN32
00744 ACE_DEBUG((LM_NOTICE,
00745 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::initializeScheduling() - ")
00746 ACE_TEXT("scheduling is not implemented on Win32.\n")));
00747 ACE_UNUSED_ARG(ace_scheduler);
00748 #else
00749 ACE_Sched_Params params(
00750 ace_scheduler,
00751 ACE_Sched_Params::priority_min(ace_scheduler),
00752 ACE_SCOPE_THREAD,
00753 this->schedulerQuantum_);
00754
00755 if (ACE_OS::sched_params(params) != 0) {
00756 if (ACE_OS::last_error() == EPERM) {
00757 ACE_DEBUG((LM_WARNING,
00758 ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
00759 ACE_TEXT("user is not superuser, requested scheduler not set.\n")));
00760
00761 } else {
00762 ACE_ERROR((LM_ERROR,
00763 ACE_TEXT("(%P|%t) ERROR: Service_Participant::initializeScheduling() - ")
00764 ACE_TEXT("sched_params failed: %m.\n")));
00765 }
00766
00767
00768 this->scheduler_ = -1;
00769 ace_scheduler = ACE_SCHED_OTHER;
00770
00771 } else if (DCPS_debug_level > 0) {
00772 ACE_DEBUG((LM_DEBUG,
00773 ACE_TEXT("(%P|%t) Service_Participant::initializeScheduling() - ")
00774 ACE_TEXT("scheduling policy set to %s(%d).\n"),
00775 this->schedulerString_.c_str()));
00776 }
00777
00778
00779
00780
00781 this->priority_min_ = ACE_Sched_Params::priority_min(ace_scheduler, ACE_SCOPE_THREAD);
00782 this->priority_max_ = ACE_Sched_Params::priority_max(ace_scheduler, ACE_SCOPE_THREAD);
00783 #endif // ACE_WIN32
00784 }
00785 }
00786
00787 #ifdef DDS_HAS_WCHAR
00788 bool
00789 Service_Participant::set_repo_ior(const wchar_t* ior,
00790 Discovery::RepoKey key,
00791 bool attach_participant)
00792 {
00793 return set_repo_ior(ACE_Wide_To_Ascii(ior).char_rep(), key, attach_participant);
00794 }
00795 #endif
00796
00797 bool
00798 Service_Participant::set_repo_ior(const char* ior,
00799 Discovery::RepoKey key,
00800 bool attach_participant)
00801 {
00802 if (DCPS_debug_level > 0) {
00803 ACE_DEBUG((LM_DEBUG,
00804 ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior: Repo[ %C] == %C\n"),
00805 key.c_str(), ior));
00806 }
00807
00808
00809
00810 got_info = true;
00811
00812 if (key == "-1") {
00813 key = Discovery::DEFAULT_REPO;
00814 }
00815
00816 const OPENDDS_STRING repo_type = ACE_TEXT_ALWAYS_CHAR(REPO_SECTION_NAME);
00817 if (!discovery_types_.count(repo_type)) {
00818
00819
00820 TheTransportRegistry->load_transport_lib(repo_type);
00821 }
00822
00823 if (discovery_types_.count(repo_type)) {
00824 ACE_Configuration_Heap cf;
00825 cf.open();
00826 ACE_Configuration_Section_Key sect_key;
00827 ACE_TString section = REPO_SECTION_NAME;
00828 section += ACE_TEXT('\\');
00829 section += ACE_TEXT_CHAR_TO_TCHAR(key.c_str());
00830 cf.open_section(cf.root_section(), section.c_str(), 1 , sect_key);
00831 cf.set_string_value(sect_key, ACE_TEXT("RepositoryIor"),
00832 ACE_TEXT_CHAR_TO_TCHAR(ior));
00833
00834 discovery_types_[repo_type]->discovery_config(cf);
00835
00836 this->remap_domains(key, key, attach_participant);
00837 return true;
00838 }
00839
00840 ACE_ERROR_RETURN((LM_ERROR,
00841 ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior ")
00842 ACE_TEXT("ERROR - no discovery type registered for ")
00843 ACE_TEXT("InfoRepoDiscovery\n")),
00844 false);
00845 }
00846
00847 void
00848 Service_Participant::remap_domains(Discovery::RepoKey oldKey,
00849 Discovery::RepoKey newKey,
00850 bool attach_participant)
00851 {
00852
00853 OPENDDS_VECTOR(DDS::DomainId_t) domainList;
00854 {
00855 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00856
00857 for (DomainRepoMap::const_iterator current = this->domainRepoMap_.begin();
00858 current != this->domainRepoMap_.end();
00859 ++current) {
00860 if (current->second == oldKey) {
00861 domainList.push_back(current->first);
00862 }
00863 }
00864 }
00865
00866
00867 for (unsigned int index = 0; index < domainList.size(); ++index) {
00868
00869
00870 this->set_repo_domain(domainList[ index], newKey, attach_participant);
00871 }
00872 }
00873
00874 void
00875 Service_Participant::set_repo_domain(const DDS::DomainId_t domain,
00876 Discovery::RepoKey key,
00877 bool attach_participant)
00878 {
00879 typedef std::pair<Discovery_rch, RepoId> DiscRepoPair;
00880 OPENDDS_VECTOR(DiscRepoPair) repoList;
00881 {
00882 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
00883 DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
00884
00885 if (key == "-1") {
00886 key = Discovery::DEFAULT_REPO;
00887 }
00888
00889 if ((where == this->domainRepoMap_.end()) || (where->second != key)) {
00890
00891
00892 this->domainRepoMap_[ domain] = key;
00893
00894 if (DCPS_debug_level > 0) {
00895 ACE_DEBUG((LM_DEBUG,
00896 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00897 ACE_TEXT("Domain[ %d] = Repo[ %C].\n"),
00898 domain, key.c_str()));
00899 }
00900 }
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913 if (0 != this->dp_factory_servant_) {
00914
00915 const DomainParticipantFactoryImpl::DPMap& participants
00916 = this->dp_factory_servant_->participants();
00917
00918
00919 DomainParticipantFactoryImpl::DPMap::const_iterator
00920 which = participants.find(domain);
00921
00922 if (which != participants.end()) {
00923
00924 RepoKeyDiscoveryMap::const_iterator disc_iter = this->discoveryMap_.find(key);
00925
00926 if (disc_iter != this->discoveryMap_.end()) {
00927 for (DomainParticipantFactoryImpl::DPSet::const_iterator
00928 current = which->second.begin();
00929 current != which->second.end();
00930 ++current) {
00931 try {
00932
00933
00934 RepoId id = current->svt_->get_id();
00935 repoList.push_back(std::make_pair(disc_iter->second, id));
00936
00937 if (DCPS_debug_level > 0) {
00938 GuidConverter converter(id);
00939 ACE_DEBUG((LM_DEBUG,
00940 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00941 ACE_TEXT("participant %C attached to Repo[ %C].\n"),
00942 OPENDDS_STRING(converter).c_str(),
00943 key.c_str()));
00944 }
00945
00946 } catch (const CORBA::Exception& ex) {
00947 ex._tao_print_exception(
00948 "ERROR: Service_Participant::set_repo_domain: failed to attach repository - ");
00949 return;
00950 }
00951 }
00952 }
00953 }
00954 }
00955 }
00956
00957
00958 for (unsigned int index = 0; index < repoList.size(); ++index) {
00959 if (DCPS_debug_level > 0) {
00960 GuidConverter converter(repoList[ index].second);
00961 ACE_DEBUG((LM_DEBUG,
00962 ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
00963 ACE_TEXT("(%d of %d) attaching domain %d participant %C to Repo[ %C].\n"),
00964 (1+index), repoList.size(), domain,
00965 OPENDDS_STRING(converter).c_str(),
00966 key.c_str()));
00967 }
00968
00969 if (attach_participant)
00970 {
00971 repoList[ index].first->attach_participant(domain, repoList[ index].second);
00972 }
00973 }
00974 }
00975
00976 void
00977 Service_Participant::repository_lost(Discovery::RepoKey key)
00978 {
00979
00980 RepoKeyDiscoveryMap::iterator initialLocation = this->discoveryMap_.find(key);
00981 RepoKeyDiscoveryMap::iterator current = initialLocation;
00982
00983 if (current == this->discoveryMap_.end()) {
00984 ACE_DEBUG((LM_WARNING,
00985 ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
00986 ACE_TEXT("lost repository %C was not present, ")
00987 ACE_TEXT("finding another anyway.\n"),
00988 key.c_str()));
00989
00990 } else {
00991
00992 ++current;
00993 }
00994
00995
00996 ACE_Time_Value recoveryFailedTime
00997 = ACE_OS::gettimeofday()
00998 + ACE_Time_Value(this->federation_recovery_duration(), 0);
00999
01000
01001 int backoff = this->federation_initial_backoff_seconds();
01002
01003
01004 while (recoveryFailedTime > ACE_OS::gettimeofday()) {
01005
01006
01007 if (current == this->discoveryMap_.end()) {
01008
01009 current = this->discoveryMap_.begin();
01010 }
01011
01012
01013
01014 if (current == initialLocation) {
01015 if (DCPS_debug_level > 0) {
01016 ACE_DEBUG((LM_DEBUG,
01017 ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01018 ACE_TEXT("waiting %d seconds to traverse the ")
01019 ACE_TEXT("repository list another time ")
01020 ACE_TEXT("for lost key %C.\n"),
01021 backoff,
01022 key.c_str()));
01023 }
01024
01025
01026 ACE_OS::sleep(backoff);
01027
01028
01029 backoff *= this->federation_backoff_multiplier();
01030
01031
01032
01033 }
01034
01035
01036 if (current->second->active()) {
01037
01038 if (DCPS_debug_level > 0) {
01039 ACE_DEBUG((LM_DEBUG,
01040 ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
01041 ACE_TEXT("replacing repository %C with %C.\n"),
01042 key.c_str(),
01043 current->first.c_str()));
01044 }
01045
01046
01047
01048 this->remap_domains(key, current->first);
01049
01050
01051
01052 return;
01053
01054 } else {
01055 ACE_DEBUG((LM_WARNING,
01056 ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
01057 ACE_TEXT("repository %C was not available to replace %C, ")
01058 ACE_TEXT("looking for another.\n"),
01059 current->first.c_str(),
01060 key.c_str()));
01061 }
01062
01063
01064 ++current;
01065 }
01066
01067
01068
01069 ACE_ASSERT(recoveryFailedTime == ACE_Time_Value::zero);
01070 }
01071
01072 void
01073 Service_Participant::set_default_discovery(const Discovery::RepoKey& key)
01074 {
01075 this->defaultDiscovery_ = key;
01076 }
01077
01078 Discovery::RepoKey
01079 Service_Participant::get_default_discovery()
01080 {
01081 return this->defaultDiscovery_;
01082 }
01083
01084 Discovery_rch
01085 Service_Participant::get_discovery(const DDS::DomainId_t domain)
01086 {
01087
01088
01089 Discovery::RepoKey repo = defaultDiscovery_;
01090
01091
01092
01093 DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
01094 if (where != this->domainRepoMap_.end()) {
01095 repo = where->second;
01096 }
01097
01098 RepoKeyDiscoveryMap::const_iterator location = this->discoveryMap_.find(repo);
01099
01100 if (location == this->discoveryMap_.end()) {
01101 if ((repo == Discovery::DEFAULT_REPO) ||
01102 (repo == "-1")) {
01103
01104
01105 bool ok = this->set_repo_ior(DEFAULT_REPO_IOR, Discovery::DEFAULT_REPO);
01106
01107 if (!ok) {
01108 if (DCPS_debug_level > 0) {
01109 ACE_DEBUG((LM_DEBUG,
01110 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01111 ACE_TEXT("failed attempt to set default IOR for domain %d.\n"),
01112 domain));
01113 }
01114
01115 } else {
01116
01117 if (DCPS_debug_level > 4) {
01118 ACE_DEBUG((LM_DEBUG,
01119 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01120 ACE_TEXT("returning default repository for domain %d.\n"),
01121 domain));
01122 }
01123
01124 }
01125 return this->discoveryMap_[Discovery::DEFAULT_REPO];
01126
01127 } else if (repo == Discovery::DEFAULT_RTPS) {
01128
01129 ACE_Configuration_Heap cf;
01130 cf.open();
01131 ACE_Configuration_Section_Key k;
01132 cf.open_section(cf.root_section(), RTPS_SECTION_NAME, 1 , k);
01133 this->load_discovery_configuration(cf, RTPS_SECTION_NAME);
01134
01135
01136 location = this->discoveryMap_.find(Discovery::DEFAULT_RTPS);
01137
01138 if (location == this->discoveryMap_.end()) {
01139
01140 if (DCPS_debug_level > 0) {
01141 ACE_DEBUG((LM_DEBUG,
01142 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01143 ACE_TEXT("failed attempt to set default RTPS discovery for domain %d.\n"),
01144 domain));
01145 }
01146
01147 return 0;
01148
01149 } else {
01150
01151 if (DCPS_debug_level > 4) {
01152 ACE_DEBUG((LM_DEBUG,
01153 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01154 ACE_TEXT("returning default RTPS discovery for domain %d.\n"),
01155 domain));
01156 }
01157
01158 return location->second;
01159 }
01160
01161 } else {
01162
01163 if (DCPS_debug_level > 4) {
01164 ACE_DEBUG((LM_DEBUG,
01165 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01166 ACE_TEXT("repository for domain %d was not set.\n"),
01167 domain));
01168 }
01169
01170 return 0;
01171 }
01172 }
01173
01174 if (DCPS_debug_level > 4) {
01175 ACE_DEBUG((LM_DEBUG,
01176 ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
01177 ACE_TEXT("returning repository for domain %d, repo %C.\n"),
01178 domain, repo.c_str()));
01179 }
01180
01181 return location->second;
01182 }
01183
01184 OPENDDS_STRING
01185 Service_Participant::bit_transport_ip() const
01186 {
01187 return ACE_TEXT_ALWAYS_CHAR(this->bit_transport_ip_.c_str());
01188 }
01189
01190 int
01191 Service_Participant::bit_transport_port() const
01192 {
01193 return this->bit_transport_port_;
01194 }
01195
01196 void
01197 Service_Participant::bit_transport_port(int port)
01198 {
01199 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01200 this->bit_transport_port_ = port;
01201 got_bit_transport_port = true;
01202 }
01203
01204 int
01205 Service_Participant::bit_lookup_duration_msec() const
01206 {
01207 return bit_lookup_duration_msec_;
01208 }
01209
01210 void
01211 Service_Participant::bit_lookup_duration_msec(int sec)
01212 {
01213 bit_lookup_duration_msec_ = sec;
01214 got_bit_lookup_duration_msec = true;
01215 }
01216
01217 size_t
01218 Service_Participant::n_chunks() const
01219 {
01220 return n_chunks_;
01221 }
01222
01223 void
01224 Service_Participant::n_chunks(size_t chunks)
01225 {
01226 n_chunks_ = chunks;
01227 got_chunks = true;
01228 }
01229
01230 size_t
01231 Service_Participant::association_chunk_multiplier() const
01232 {
01233 return association_chunk_multiplier_;
01234 }
01235
01236 void
01237 Service_Participant::association_chunk_multiplier(size_t multiplier)
01238 {
01239 association_chunk_multiplier_ = multiplier;
01240 got_chunk_association_multiplier = true;
01241 }
01242
01243 void
01244 Service_Participant::liveliness_factor(int factor)
01245 {
01246 liveliness_factor_ = factor;
01247 got_liveliness_factor = true;
01248 }
01249
01250 int
01251 Service_Participant::liveliness_factor() const
01252 {
01253 return liveliness_factor_;
01254 }
01255
01256 void
01257 Service_Participant::register_discovery_type(const char* section_name,
01258 Discovery::Config* cfg)
01259 {
01260 delete discovery_types_[section_name];
01261 discovery_types_[section_name] = cfg;
01262 }
01263
01264 int
01265 Service_Participant::load_configuration()
01266 {
01267 int status = 0;
01268
01269 if ((status = this->cf_.open()) != 0)
01270 ACE_ERROR_RETURN((LM_ERROR,
01271 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01272 ACE_TEXT("open() returned %d\n"),
01273 status),
01274 -1);
01275
01276 ACE_Ini_ImpExp import(this->cf_);
01277 status = import.import_config(config_fname.c_str());
01278
01279 if (status != 0) {
01280 ACE_ERROR_RETURN((LM_ERROR,
01281 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01282 ACE_TEXT("import_config () returned %d\n"),
01283 status),
01284 -1);
01285 } else {
01286 status = this->load_configuration(this->cf_, config_fname.c_str());
01287 }
01288 return status;
01289 }
01290
01291 int
01292 Service_Participant::load_configuration(
01293 ACE_Configuration_Heap& config,
01294 const ACE_TCHAR* filename)
01295 {
01296 int status = 0;
01297
01298 status = this->load_common_configuration(config, filename);
01299
01300 if (status != 0) {
01301 ACE_ERROR_RETURN((LM_ERROR,
01302 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01303 ACE_TEXT("load_common_configuration () returned %d\n"),
01304 status),
01305 -1);
01306 }
01307
01308
01309 this->add_discovery(static_rchandle_cast<Discovery>(StaticDiscovery::instance()));
01310
01311 status = this->load_discovery_configuration(config, RTPS_SECTION_NAME);
01312
01313 if (status != 0) {
01314 ACE_ERROR_RETURN((LM_ERROR,
01315 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01316 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01317 status),
01318 -1);
01319 }
01320
01321 status = this->load_discovery_configuration(config, REPO_SECTION_NAME);
01322
01323 if (status != 0) {
01324 ACE_ERROR_RETURN((LM_ERROR,
01325 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01326 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01327 status),
01328 -1);
01329 }
01330
01331 status = TransportRegistry::instance()->load_transport_configuration(
01332 ACE_TEXT_ALWAYS_CHAR(filename), config);
01333 if (this->global_transport_config_ != ACE_TEXT("")) {
01334 TransportConfig_rch config = TransportRegistry::instance()->get_config(
01335 ACE_TEXT_ALWAYS_CHAR(this->global_transport_config_.c_str()));
01336 if (config == 0) {
01337 ACE_ERROR_RETURN((LM_ERROR,
01338 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01339 ACE_TEXT("Unable to locate specified global transport config: %s\n"),
01340 this->global_transport_config_.c_str()),
01341 -1);
01342 }
01343 TransportRegistry::instance()->global_config(config);
01344 }
01345
01346 if (status != 0) {
01347 ACE_ERROR_RETURN((LM_ERROR,
01348 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01349 ACE_TEXT("load_transport_configuration () returned %d\n"),
01350 status),
01351 -1);
01352 }
01353
01354
01355
01356
01357
01358 status = this->load_domain_configuration(config, filename);
01359
01360 if (status != 0) {
01361 ACE_ERROR_RETURN((LM_ERROR,
01362 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01363 ACE_TEXT("load_domain_configuration () returned %d\n"),
01364 status),
01365 -1);
01366 }
01367
01368
01369 status = StaticDiscovery::instance()->load_configuration(config);
01370
01371 if (status != 0) {
01372 ACE_ERROR_RETURN((LM_ERROR,
01373 ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
01374 ACE_TEXT("load_discovery_configuration() returned %d\n"),
01375 status),
01376 -1);
01377 }
01378
01379 return 0;
01380 }
01381
01382 int
01383 Service_Participant::load_common_configuration(ACE_Configuration_Heap& cf,
01384 const ACE_TCHAR* filename)
01385 {
01386 const ACE_Configuration_Section_Key &root = cf.root_section();
01387 ACE_Configuration_Section_Key sect;
01388
01389 if (cf.open_section(root, COMMON_SECTION_NAME, 0, sect) != 0) {
01390 if (DCPS_debug_level > 0) {
01391
01392
01393 ACE_DEBUG((LM_NOTICE,
01394 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_common_configuration ")
01395 ACE_TEXT("failed to open section %s\n"),
01396 COMMON_SECTION_NAME));
01397 }
01398
01399 return 0;
01400
01401 } else {
01402 if (got_debug_level) {
01403 ACE_DEBUG((LM_NOTICE,
01404 ACE_TEXT("(%P|%t) NOTICE: using DCPSDebugLevel value from command option (overrides value if it's in config file).\n")));
01405 } else {
01406 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSDebugLevel"), DCPS_debug_level, int)
01407 }
01408
01409 if (got_info) {
01410 ACE_DEBUG((LM_NOTICE,
01411 ACE_TEXT("(%P|%t) NOTICE: using DCPSInfoRepo value from command option (overrides value if it's in config file).\n")));
01412 } else {
01413 ACE_TString value;
01414 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSInfoRepo"), value)
01415 if (!value.empty()) {
01416 this->set_repo_ior(value.c_str(), Discovery::DEFAULT_REPO);
01417 }
01418 }
01419
01420 if (got_use_rti_serialization) {
01421 ACE_DEBUG((LM_NOTICE,
01422 ACE_TEXT("(%P|%t) NOTICE: using DCPSRTISerialization value from command option (overrides value if it's in config file).\n")));
01423 } else {
01424 bool should_use = false;
01425 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSRTISerialization"), should_use, bool)
01426 Serializer::set_use_rti_serialization(should_use);
01427 }
01428
01429 if (got_chunks) {
01430 ACE_DEBUG((LM_NOTICE,
01431 ACE_TEXT("(%P|%t) NOTICE: using DCPSChunks value from command option (overrides value if it's in config file).\n")));
01432 } else {
01433 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunks"), this->n_chunks_, size_t)
01434 }
01435
01436 if (got_chunk_association_multiplier) {
01437 ACE_DEBUG((LM_NOTICE,
01438 ACE_TEXT("(%P|%t) NOTICE: using DCPSChunkAssociationMutltiplier value from command option (overrides value if it's in config file).\n")));
01439 } else {
01440 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMutltiplier"), this->association_chunk_multiplier_, size_t)
01441 }
01442
01443 if (got_bit_transport_port) {
01444 ACE_DEBUG((LM_NOTICE,
01445 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportPort value from command option (overrides value if it's in config file).\n")));
01446 } else {
01447 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportPort"), this->bit_transport_port_, int)
01448 }
01449
01450 if (got_bit_transport_ip) {
01451 ACE_DEBUG((LM_DEBUG,
01452 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitTransportIPAddress value from command option (overrides value if it's in config file).\n")));
01453 } else {
01454 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportIPAddress"), this->bit_transport_ip_)
01455 }
01456
01457 if (got_liveliness_factor) {
01458 ACE_DEBUG((LM_DEBUG,
01459 ACE_TEXT("(%P|%t) NOTICE: using DCPSLivelinessFactor value from command option (overrides value if it's in config file).\n")));
01460 } else {
01461 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLivelinessFactor"), this->liveliness_factor_, int)
01462 }
01463
01464 if (got_bit_lookup_duration_msec) {
01465 ACE_DEBUG((LM_DEBUG,
01466 ACE_TEXT("(%P|%t) NOTICE: using DCPSBitLookupDurationMsec value from command option (overrides value if it's in config file).\n")));
01467 } else {
01468 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitLookupDurationMsec"), this->bit_lookup_duration_msec_, int)
01469 }
01470
01471 if (got_global_transport_config) {
01472 ACE_DEBUG((LM_DEBUG,
01473 ACE_TEXT("(%P|%t) NOTICE: using DCPSGlobalTransportConfig value from command option (overrides value if it's in config file).\n")));
01474 } else {
01475 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSGlobalTransportConfig"), this->global_transport_config_);
01476 if (this->global_transport_config_ == ACE_TEXT("$file")) {
01477
01478 this->global_transport_config_ = filename;
01479 }
01480 }
01481
01482 if (got_bit_flag) {
01483 ACE_DEBUG((LM_DEBUG,
01484 ACE_TEXT("(%P|%t) NOTICE: using DCPSBit value from command option (overrides value if it's in config file).\n")));
01485 } else {
01486 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBit"), this->bit_enabled_, int)
01487 }
01488
01489 if (got_transport_debug_level) {
01490 ACE_DEBUG((LM_NOTICE,
01491 ACE_TEXT("(%P|%t) NOTICE: using DCPSTransportDebugLevel value from command option (overrides value if it's in config file).\n")));
01492 } else {
01493 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSTransportDebugLevel"), OpenDDS::DCPS::Transport_debug_level, int)
01494 }
01495
01496 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01497 if (got_persistent_data_dir) {
01498 ACE_DEBUG((LM_NOTICE,
01499 ACE_TEXT("(%P|%t) NOTICE: using DCPSPersistentDataDir value from command option (overrides value if it's in config file).\n")));
01500 } else {
01501 ACE_TString value;
01502 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSPersistentDataDir"), value)
01503 this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(value.c_str());
01504 }
01505 #endif
01506
01507 if (got_pending_timeout) {
01508 ACE_DEBUG((LM_NOTICE,
01509 ACE_TEXT("(%P|%t) NOTICE: using DCPSPendingTimeout value from command option (overrides value if it's in config file).\n")));
01510 } else {
01511 int timeout = 0;
01512 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPendingTimeout"), timeout, int)
01513 this->pending_timeout_ = timeout;
01514 }
01515
01516 if (got_publisher_content_filter) {
01517 ACE_DEBUG((LM_DEBUG,
01518 ACE_TEXT("(%P|%t) NOTICE: using DCPSPublisherContentFilter ")
01519 ACE_TEXT("value from command option (overrides value if it's ")
01520 ACE_TEXT("in config file).\n")));
01521 } else {
01522 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPublisherContentFilter"),
01523 this->publisher_content_filter_, bool)
01524 }
01525
01526 if (got_default_discovery) {
01527 ACE_Configuration::VALUETYPE type;
01528 if (cf.find_value(sect, ACE_TEXT("DCPSDefaultDiscovery"), type) != -1) {
01529 ACE_DEBUG((LM_NOTICE,
01530 ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultDiscovery value ")
01531 ACE_TEXT("from command option, overriding config file\n")));
01532 }
01533 } else {
01534 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultDiscovery"),
01535 this->defaultDiscovery_);
01536 }
01537
01538 ACE_Configuration::VALUETYPE type;
01539 if (got_log_fname) {
01540 if (cf.find_value(sect, ACE_TEXT("ORBLogFile"), type) != -1) {
01541 ACE_DEBUG((LM_NOTICE,
01542 ACE_TEXT("(%P|%t) NOTICE: using ORBLogFile value ")
01543 ACE_TEXT("from command option, overriding config file\n")));
01544 }
01545 } else {
01546 OPENDDS_STRING log_fname;
01547 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("ORBLogFile"), log_fname);
01548 if (!log_fname.empty()) {
01549 set_log_file_name(log_fname.c_str());
01550 }
01551 }
01552
01553 if (got_log_verbose) {
01554 if (cf.find_value(sect, ACE_TEXT("ORBVerboseLogging"), type) != -1) {
01555 ACE_DEBUG((LM_NOTICE,
01556 ACE_TEXT("(%P|%t) NOTICE: using ORBVerboseLogging value ")
01557 ACE_TEXT("from command option, overriding config file\n")));
01558 }
01559 } else {
01560 unsigned long verbose_logging = 0;
01561 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ORBVerboseLogging"), verbose_logging, unsigned long);
01562 set_log_verbose(verbose_logging);
01563 }
01564
01565 if (got_default_address) {
01566 ACE_DEBUG((LM_DEBUG,
01567 ACE_TEXT("(%P|%t) NOTICE: using DCPSDefaultAddress value from command option (overrides value if it's in config file).\n")));
01568 } else {
01569 GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultAddress"), this->default_address_)
01570 }
01571
01572
01573 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationRecoveryDuration"), this->federation_recovery_duration_, int)
01574 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationInitialBackoffSeconds"), this->federation_initial_backoff_seconds_, int)
01575 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationBackoffMultiplier"), this->federation_backoff_multiplier_, int)
01576 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationLivelinessDuration"), this->federation_liveliness_, int)
01577
01578 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01579 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_size"), pool_size_, size_t)
01580 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_granularity"), pool_granularity_, size_t)
01581 #endif
01582
01583
01584
01585
01586 GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("scheduler"), this->schedulerString_)
01587
01588 suseconds_t usec(0);
01589
01590 GET_CONFIG_VALUE(cf, sect, ACE_TEXT("scheduler_slice"), usec, suseconds_t)
01591
01592 if (usec > 0)
01593 this->schedulerQuantum_.usec(usec);
01594 }
01595
01596 return 0;
01597 }
01598
01599 int
01600 Service_Participant::load_domain_configuration(ACE_Configuration_Heap& cf,
01601 const ACE_TCHAR* filename)
01602 {
01603 const ACE_Configuration_Section_Key& root = cf.root_section();
01604 ACE_Configuration_Section_Key domain_sect;
01605
01606 if (cf.open_section(root, DOMAIN_SECTION_NAME, 0, domain_sect) != 0) {
01607 if (DCPS_debug_level > 0) {
01608
01609
01610 ACE_DEBUG((LM_NOTICE,
01611 ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_configuration ")
01612 ACE_TEXT("failed to open [%s] section - using code default.\n"),
01613 DOMAIN_SECTION_NAME));
01614 }
01615
01616 return 0;
01617
01618 } else {
01619
01620 ValueMap vm;
01621 if (pullValues(cf, domain_sect, vm) > 0) {
01622
01623 ACE_ERROR_RETURN((LM_ERROR,
01624 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01625 ACE_TEXT("domain sections must have a subsection name\n")),
01626 -1);
01627 }
01628
01629 KeyList keys;
01630 if (processSections(cf, domain_sect, keys) != 0) {
01631 ACE_ERROR_RETURN((LM_ERROR,
01632 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01633 ACE_TEXT("too many nesting layers in the [domain] section.\n")),
01634 -1);
01635 }
01636
01637
01638 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01639 OPENDDS_STRING domain_name = it->first;
01640
01641 ValueMap values;
01642 pullValues(cf, it->second, values);
01643 DDS::DomainId_t domainId = -1;
01644 Discovery::RepoKey repoKey;
01645 OPENDDS_STRING perDomainDefaultTportConfig;
01646 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01647 OPENDDS_STRING name = it->first;
01648 if (name == "DomainId") {
01649 OPENDDS_STRING value = it->second;
01650 if (!convertToInteger(value, domainId)) {
01651 ACE_ERROR_RETURN((LM_ERROR,
01652 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01653 ACE_TEXT("Illegal integer value for DomainId (%C) in [domain/%C] section.\n"),
01654 value.c_str(), domain_name.c_str()),
01655 -1);
01656 }
01657 if (DCPS_debug_level > 0) {
01658 ACE_DEBUG((LM_DEBUG,
01659 ACE_TEXT("(%P|%t) [domain/%C]: DomainId == %d\n"),
01660 domain_name.c_str(), domainId));
01661 }
01662 } else if (name == "DomainRepoKey") {
01663
01664
01665 repoKey = it->second;
01666 if (repoKey == "-1") {
01667 repoKey = Discovery::DEFAULT_REPO;
01668 }
01669
01670 if (DCPS_debug_level > 0) {
01671 ACE_DEBUG((LM_DEBUG,
01672 ACE_TEXT("(%P|%t) [domain/%C]: DomainRepoKey == %C\n"),
01673 domain_name.c_str(), repoKey.c_str()));
01674 }
01675 } else if (name == "DiscoveryConfig") {
01676 repoKey = it->second;
01677
01678 } else if (name == "DefaultTransportConfig") {
01679 if (it->second == "$file") {
01680
01681 perDomainDefaultTportConfig = ACE_TEXT_ALWAYS_CHAR(filename);
01682
01683 } else {
01684 perDomainDefaultTportConfig = it->second;
01685 }
01686
01687 } else {
01688 ACE_ERROR_RETURN((LM_ERROR,
01689 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01690 ACE_TEXT("Unexpected entry (%C) in [domain/%C] section.\n"),
01691 name.c_str(), domain_name.c_str()),
01692 -1);
01693 }
01694 }
01695
01696 if (domainId == -1) {
01697
01698 if (!convertToInteger(domain_name, domainId)) {
01699 ACE_ERROR_RETURN((LM_ERROR,
01700 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01701 ACE_TEXT("Missing DomainId value in [domain/%C] section.\n"),
01702 domain_name.c_str()),
01703 -1);
01704 }
01705 }
01706
01707 if (!perDomainDefaultTportConfig.empty()) {
01708 TransportRegistry* const reg = TransportRegistry::instance();
01709 TransportConfig_rch tc = reg->get_config(perDomainDefaultTportConfig);
01710 if (tc.is_nil()) {
01711 ACE_ERROR_RETURN((LM_ERROR,
01712 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01713 ACE_TEXT("Unknown transport config %C in [domain/%C] section.\n"),
01714 perDomainDefaultTportConfig.c_str(), domain_name.c_str()), -1);
01715 } else {
01716 reg->domain_default_config(domainId, tc);
01717 }
01718 }
01719
01720
01721 if (!repoKey.empty()) {
01722 if ((repoKey != Discovery::DEFAULT_REPO) &&
01723 (repoKey != Discovery::DEFAULT_RTPS) &&
01724 (repoKey != Discovery::DEFAULT_STATIC) &&
01725 (this->discoveryMap_.find(repoKey) == this->discoveryMap_.end())) {
01726 ACE_ERROR_RETURN((LM_ERROR,
01727 ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
01728 ACE_TEXT("Specified configuration (%C) not found. Referenced in [domain/%C] section.\n"),
01729 repoKey.c_str(), domain_name.c_str()),
01730 -1);
01731 }
01732 this->set_repo_domain(domainId, repoKey);
01733 }
01734 }
01735 }
01736
01737 return 0;
01738 }
01739
01740 int
01741 Service_Participant::load_discovery_configuration(ACE_Configuration_Heap& cf,
01742 const ACE_TCHAR* section_name)
01743 {
01744 const ACE_Configuration_Section_Key &root = cf.root_section();
01745 ACE_Configuration_Section_Key sect;
01746 if (cf.open_section(root, section_name, 0, sect) == 0) {
01747
01748 const OPENDDS_STRING sect_name = ACE_TEXT_ALWAYS_CHAR(section_name);
01749 OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*)::iterator iter =
01750 this->discovery_types_.find(sect_name);
01751
01752 if (iter == this->discovery_types_.end()) {
01753
01754 TheTransportRegistry->load_transport_lib(sect_name);
01755 iter = this->discovery_types_.find(sect_name);
01756 }
01757
01758 if (iter != this->discovery_types_.end()) {
01759
01760 return iter->second->discovery_config(cf);
01761 } else {
01762
01763 ACE_ERROR_RETURN((LM_ERROR,
01764 ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
01765 ACE_TEXT("load_discovery_configuration ")
01766 ACE_TEXT("Unable to load libraries for %s\n"),
01767 section_name),
01768 -1);
01769 }
01770 }
01771 return 0;
01772 }
01773
01774 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
01775 void
01776 Service_Participant::configure_pool()
01777 {
01778 if (pool_size_) {
01779 SafetyProfilePool::instance()->configure_pool(pool_size_, pool_granularity_);
01780 SafetyProfilePool::instance()->install();
01781 }
01782 }
01783 #endif
01784
01785 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01786 DataDurabilityCache *
01787 Service_Participant::get_data_durability_cache(
01788 DDS::DurabilityQosPolicy const & durability)
01789 {
01790 DDS::DurabilityQosPolicyKind const kind =
01791 durability.kind;
01792
01793 DataDurabilityCache * cache = 0;
01794
01795 if (kind == DDS::TRANSIENT_DURABILITY_QOS) {
01796 {
01797 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01798 guard,
01799 this->factory_lock_,
01800 0);
01801
01802 if (this->transient_data_cache_.get() == 0) {
01803 ACE_auto_ptr_reset(this->transient_data_cache_,
01804 new DataDurabilityCache(kind));
01805 }
01806 }
01807
01808 cache = this->transient_data_cache_.get();
01809
01810 } else if (kind == DDS::PERSISTENT_DURABILITY_QOS) {
01811 {
01812 ACE_GUARD_RETURN(TAO_SYNCH_MUTEX,
01813 guard,
01814 this->factory_lock_,
01815 0);
01816
01817 try {
01818 if (this->persistent_data_cache_.get() == 0) {
01819 ACE_auto_ptr_reset(this->persistent_data_cache_,
01820 new DataDurabilityCache(kind,
01821 this->persistent_data_dir_));
01822 }
01823
01824 } catch (const std::exception& ex) {
01825 if (DCPS_debug_level > 0) {
01826 ACE_ERROR((LM_WARNING,
01827 ACE_TEXT("(%P|%t) WARNING: Service_Participant::get_data_durability_cache ")
01828 ACE_TEXT("failed to create PERSISTENT cache, falling back on ")
01829 ACE_TEXT("TRANSIENT behavior: %C\n"), ex.what()));
01830 }
01831
01832 ACE_auto_ptr_reset(this->persistent_data_cache_,
01833 new DataDurabilityCache(DDS::TRANSIENT_DURABILITY_QOS));
01834 }
01835 }
01836
01837 cache = this->persistent_data_cache_.get();
01838 }
01839
01840 return cache;
01841 }
01842 #endif
01843
01844 void
01845 Service_Participant::add_discovery(Discovery_rch discovery)
01846 {
01847 if (discovery != 0) {
01848 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->maps_lock_);
01849 this->discoveryMap_[discovery->key()] = discovery;
01850 }
01851 }
01852
01853 const Service_Participant::RepoKeyDiscoveryMap&
01854 Service_Participant::discoveryMap() const
01855 {
01856 return this->discoveryMap_;
01857 }
01858
01859 const Service_Participant::DomainRepoMap&
01860 Service_Participant::domainRepoMap() const
01861 {
01862 return this->domainRepoMap_;
01863 }
01864
01865 Recorder_ptr
01866 Service_Participant::create_recorder(DDS::DomainParticipant_ptr participant,
01867 DDS::Topic_ptr a_topic,
01868 const DDS::SubscriberQos& subscriber_qos,
01869 const DDS::DataReaderQos& datareader_qos,
01870 const RecorderListener_rch& a_listener)
01871 {
01872 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01873 if (participant_servant)
01874 return participant_servant->create_recorder(a_topic, subscriber_qos, datareader_qos, a_listener, 0);
01875 return 0;
01876 }
01877
01878 DDS::ReturnCode_t
01879 Service_Participant::delete_recorder(Recorder_ptr recorder)
01880 {
01881 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01882 RecorderImpl* impl = static_cast<RecorderImpl*>(recorder);
01883 if (impl){
01884 ret = impl->cleanup();
01885 impl->participant()->delete_recorder(recorder);
01886 }
01887 return ret;
01888 }
01889
01890 Replayer_ptr
01891 Service_Participant::create_replayer(DDS::DomainParticipant_ptr participant,
01892 DDS::Topic_ptr a_topic,
01893 const DDS::PublisherQos& publisher_qos,
01894 const DDS::DataWriterQos& datawriter_qos,
01895 const ReplayerListener_rch& a_listener)
01896 {
01897 ACE_DEBUG((LM_DEBUG, "Service_Participant::create_replayer\n"));
01898 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01899 if (participant_servant)
01900 return participant_servant->create_replayer(a_topic, publisher_qos, datawriter_qos, a_listener, 0);
01901 return 0;
01902 }
01903
01904 DDS::ReturnCode_t
01905 Service_Participant::delete_replayer(Replayer_ptr replayer)
01906 {
01907 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01908 ReplayerImpl* impl = static_cast<ReplayerImpl*>(replayer);
01909 if (impl) {
01910 ret = impl->cleanup();
01911 impl->participant()->delete_replayer(replayer);
01912 }
01913 return ret;
01914 }
01915
01916 DDS::Topic_ptr
01917 Service_Participant::create_typeless_topic(DDS::DomainParticipant_ptr participant,
01918 const char * topic_name,
01919 const char * type_name,
01920 bool type_has_keys,
01921 const DDS::TopicQos & qos,
01922 DDS::TopicListener_ptr a_listener,
01923 DDS::StatusMask mask)
01924 {
01925 DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
01926 if (! participant_servant) {
01927 return 0;
01928 }
01929 return participant_servant->create_typeless_topic(topic_name, type_name, type_has_keys, qos, a_listener, mask);
01930 }
01931
01932 }
01933 }